Skip to content

Worker Classes

This reference covers specialized worker classes in the Pythia framework.

HTTP Workers

HTTPWorker

::: pythia.brokers.http.HTTPWorker Base class for HTTP-based workers.

from pythia.brokers.http import HTTPWorker
import aiohttp

class APIWorker(HTTPWorker):
    def __init__(self, base_url: str, **kwargs):
        super().__init__(**kwargs)
        self.base_url = base_url
        self.session = None

    async def startup(self):
        await super().startup()
        self.session = aiohttp.ClientSession()

    async def shutdown(self):
        if self.session:
            await self.session.close()
        await super().shutdown()

Configuration

from pythia.config.http import HTTPConfig

http_config = HTTPConfig(
    timeout=30.0,
    max_retries=3,
    retry_delay=1.0,
    max_connections=100,
    enable_circuit_breaker=True,
    circuit_breaker_threshold=5,
    circuit_breaker_timeout=60.0,
)

PollerWorker

::: pythia.brokers.http.PollerWorker Worker that polls HTTP endpoints for data.

from pythia.brokers.http import PollerWorker

class APIPollerWorker(PollerWorker):
    def __init__(self, api_url: str, poll_interval: float = 30.0):
        super().__init__(
            url=api_url,
            poll_interval=poll_interval,
            headers={"Accept": "application/json"}
        )

    async def process(self, message):
        # Process data from API
        data = message.body
        processed = await self.transform_data(data)
        return processed

    async def transform_data(self, data):
        # Transform API response
        return {"processed": True, "count": len(data)}

Configuration Options

  • url: str - API endpoint to poll
  • poll_interval: float - Seconds between polls
  • headers: Dict[str, str] - HTTP headers
  • params: Dict[str, Any] - Query parameters
  • timeout: float - Request timeout
  • method: str - HTTP method (default: GET)

WebhookSenderWorker

::: pythia.brokers.http.WebhookSenderWorker Worker that sends webhooks to external services.

from pythia.brokers.http import WebhookSenderWorker
from pythia.brokers.kafka import KafkaConsumer

class OrderWebhookWorker(WebhookSenderWorker):
    source = KafkaConsumer(topic="orders")

    def __init__(self):
        super().__init__(
            webhook_urls=[
                "https://api.partner1.com/webhooks",
                "https://api.partner2.com/webhooks"
            ],
            headers={"Content-Type": "application/json"},
            timeout=10.0
        )

    async def process(self, message):
        order = message.body
        webhook_payload = {
            "event": "order.created",
            "data": order,
            "timestamp": message.timestamp.isoformat()
        }

        # Send to all webhooks
        await self.broadcast_webhook(webhook_payload)
        return {"webhook_sent": True}

Methods

async def send_webhook(self, data: Any, url: str = None) -> Dict:
    """Send webhook to specific URL"""
    pass

async def broadcast_webhook(self, data: Any) -> List[Dict]:
    """Send webhook to all configured URLs"""
    pass

async def send_webhook_from_message(self, message: Message) -> Dict:
    """Send webhook from message data"""
    pass

Database Workers

DatabaseWorker

::: pythia.brokers.database.DatabaseWorker Base class for database-based workers.

from pythia.brokers.database import DatabaseWorker
from sqlalchemy.ext.asyncio import create_async_engine

class DBWorker(DatabaseWorker):
    def __init__(self, db_url: str):
        super().__init__()
        self.engine = create_async_engine(db_url)

    async def startup(self):
        await super().startup()
        # Initialize database connection

    async def shutdown(self):
        await self.engine.dispose()
        await super().shutdown()

CDCWorker (Change Data Capture)

::: pythia.brokers.database.CDCWorker Worker for monitoring database changes.

from pythia.brokers.database import CDCWorker
from pythia.brokers.kafka import KafkaProducer

class UserCDCWorker(CDCWorker):
    sink = KafkaProducer(topic="user-changes")

    def __init__(self, db_url: str):
        super().__init__(
            db_url=db_url,
            tables=["users", "user_profiles"],
            poll_interval=5.0
        )

    async def process(self, message):
        change = message.body

        # Transform database change to event
        event = {
            "table": change["table"],
            "operation": change["operation"],  # INSERT, UPDATE, DELETE
            "old_values": change.get("old_values"),
            "new_values": change.get("new_values"),
            "timestamp": change["timestamp"]
        }

        await self.send_to_sink(event)
        return event

Configuration

from pythia.config.database import DatabaseConfig

db_config = DatabaseConfig(
    url="postgresql+asyncpg://user:pass@localhost/db",
    tables=["orders", "users"],
    poll_interval=10.0,
    batch_size=100,
    enable_snapshots=True
)

SyncWorker

::: pythia.brokers.database.SyncWorker Worker for synchronizing data between databases.

from pythia.brokers.database import SyncWorker

class DatabaseSyncWorker(SyncWorker):
    def __init__(self, source_url: str, target_url: str):
        super().__init__(
            source_db_url=source_url,
            target_db_url=target_url,
            sync_tables=["products", "inventory"],
            sync_interval=3600.0  # 1 hour
        )

    async def process(self, message):
        sync_data = message.body

        # Apply changes to target database
        await self.apply_changes(sync_data)

        return {"synced_records": len(sync_data["changes"])}

    async def apply_changes(self, sync_data):
        # Custom sync logic
        for change in sync_data["changes"]:
            await self.apply_single_change(change)

Background Job Workers

JobWorker

::: pythia.jobs.worker.JobWorker Worker for processing background jobs.

from pythia.jobs import JobWorker, Job
from pythia.brokers.redis import RedisListConsumer

class BackgroundJobWorker(JobWorker):
    source = RedisListConsumer(queue="background_jobs")

    async def process(self, message):
        job_data = message.body
        job = Job.from_dict(job_data)

        # Execute the job
        result = await self.execute_job(job)

        # Update job status
        await self.update_job_status(job.id, "completed", result)

        return result

    async def execute_job(self, job: Job):
        if job.type == "send_email":
            return await self.send_email(job.payload)
        elif job.type == "process_image":
            return await self.process_image(job.payload)
        else:
            raise ValueError(f"Unknown job type: {job.type}")

ScheduledJobWorker

::: pythia.jobs.scheduler.ScheduledJobWorker Worker for scheduled/cron jobs.

from pythia.jobs import ScheduledJobWorker, CronSchedule

class CronWorker(ScheduledJobWorker):
    def __init__(self):
        super().__init__()

        # Add scheduled jobs
        self.add_job(
            func=self.daily_cleanup,
            schedule=CronSchedule(hour=2, minute=0),  # 2 AM daily
            job_id="daily_cleanup"
        )

        self.add_job(
            func=self.send_reports,
            schedule=CronSchedule(day_of_week=1, hour=9),  # Monday 9 AM
            job_id="weekly_reports"
        )

    async def daily_cleanup(self):
        """Daily cleanup task"""
        await self.cleanup_old_files()
        await self.cleanup_temp_data()
        return {"cleaned": True}

    async def send_reports(self):
        """Weekly reports task"""
        report = await self.generate_weekly_report()
        await self.send_email_report(report)
        return {"report_sent": True}

Cloud Workers

AWS Workers

from pythia.brokers.cloud.aws import SQSConsumer, SNSProducer

class AWSWorker(Worker):
    source = SQSConsumer(
        queue_url="https://sqs.us-east-1.amazonaws.com/123456789012/my-queue"
    )
    sink = SNSProducer(
        topic_arn="arn:aws:sns:us-east-1:123456789012:my-topic"
    )

    async def process(self, message):
        # Process AWS SQS message
        data = message.body
        processed = await self.handle_aws_message(data)

        # Send result to SNS
        await self.send_to_sink(processed)
        return processed

GCP Workers

from pythia.brokers.cloud.gcp import PubSubSubscriber, PubSubPublisher

class GCPWorker(Worker):
    source = PubSubSubscriber(
        subscription_name="projects/my-project/subscriptions/my-subscription"
    )
    sink = PubSubPublisher(
        topic_name="projects/my-project/topics/processed-events"
    )

    async def process(self, message):
        # Process GCP Pub/Sub message
        event_data = message.body
        result = await self.process_event(event_data)

        await self.send_to_sink(result)
        return result

Azure Workers

from pythia.brokers.cloud.azure import ServiceBusConsumer, ServiceBusProducer

class AzureWorker(Worker):
    source = ServiceBusConsumer(queue_name="input-queue")
    sink = ServiceBusProducer(queue_name="output-queue")

    async def process(self, message):
        # Process Azure Service Bus message
        data = message.body
        processed = await self.transform_data(data)

        await self.send_to_sink(processed)
        return processed

Specialized Patterns

Pipeline Worker

from pythia import Worker
from typing import List, Any

class PipelineWorker(Worker):
    """Worker that processes messages through multiple stages"""

    def __init__(self, stages: List[callable], **kwargs):
        super().__init__(**kwargs)
        self.stages = stages

    async def process(self, message):
        data = message.body

        # Process through each stage
        for i, stage in enumerate(self.stages):
            try:
                data = await self.run_stage(stage, data, i)
            except Exception as e:
                self.logger.error(f"Stage {i} failed: {e}")
                raise

        return data

    async def run_stage(self, stage: callable, data: Any, stage_num: int):
        self.logger.info(f"Running stage {stage_num}")

        if asyncio.iscoroutinefunction(stage):
            return await stage(data)
        else:
            return stage(data)

# Usage
def validate_data(data):
    # Validation logic
    return data

async def enrich_data(data):
    # Enrichment logic
    return data

def transform_data(data):
    # Transformation logic
    return data

pipeline = PipelineWorker(stages=[validate_data, enrich_data, transform_data])

Multi-Source Worker

from pythia import Worker
from pythia.brokers.kafka import KafkaConsumer
from pythia.brokers.redis import RedisStreamsConsumer

class MultiSourceWorker(Worker):
    """Worker that consumes from multiple sources"""

    sources = [
        KafkaConsumer(topic="orders"),
        RedisStreamsConsumer(stream="events"),
    ]

    async def process(self, message):
        # Handle message based on source
        source_type = message.source_info.get("broker_type")

        if source_type == "kafka":
            return await self.handle_kafka_message(message)
        elif source_type == "redis":
            return await self.handle_redis_message(message)
        else:
            return await self.handle_generic_message(message)

Circuit Breaker Worker

from pythia import Worker
from pythia.utils.circuit_breaker import CircuitBreaker

class CircuitBreakerWorker(Worker):
    """Worker with circuit breaker pattern"""

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=60,
            expected_exception=Exception
        )

    async def process(self, message):
        async with self.circuit_breaker:
            return await self.risky_operation(message)

    async def risky_operation(self, message):
        # Operation that might fail
        result = await self.external_api_call(message.body)
        return result