Core Classes
This reference covers the core classes of the Pythia framework.
Worker
::: pythia.core.worker.Worker The main base class for creating message processing workers.
Constructor
from pythia import Worker
from pythia.config import WorkerConfig, MetricsConfig
worker = Worker(
config=WorkerConfig(worker_name="my-worker"),
metrics_config=MetricsConfig(enabled=True)
)
Class Attributes
- source: Optional[MessageBroker] - Single message source
- sink: Optional[MessageProducer] - Single message sink
- sources: Optional[List[MessageBroker]] - Multiple message sources
- sinks: Optional[List[MessageProducer]] - Multiple message sinks
Key Methods
Abstract Methods (Must Implement)
Lifecycle Methods
async def startup(self) -> None:
"""Called when worker starts"""
pass
async def shutdown(self) -> None:
"""Called when worker shuts down"""
pass
async def health_check(self) -> bool:
"""Check if worker is healthy"""
return True
Message Handling
async def send_to_sink(self, data: Any, sink_index: int = 0) -> None:
"""Send data to specific sink"""
pass
async def broadcast(self, data: Any) -> None:
"""Send data to all sinks"""
pass
Runtime Methods
def run(self) -> None:
"""Run worker synchronously"""
pass
async def run_async(self) -> None:
"""Run worker asynchronously"""
pass
def stop(self) -> None:
"""Stop the worker"""
pass
def get_stats(self) -> Dict[str, Any]:
"""Get worker statistics"""
pass
Example Usage
from pythia import Worker
from pythia.brokers.kafka import KafkaConsumer, KafkaProducer
class OrderProcessor(Worker):
source = KafkaConsumer(topic="orders")
sink = KafkaProducer(topic="processed-orders")
async def process(self, message):
order_data = message.body
processed_order = await self.process_order(order_data)
await self.send_to_sink(processed_order)
return processed_order
async def process_order(self, order):
# Business logic here
return {"id": order["id"], "status": "processed"}
# Run the worker
if __name__ == "__main__":
worker = OrderProcessor()
worker.run()
BatchWorker
::: pythia.core.worker.BatchWorker Worker for processing messages in batches.
from pythia import BatchWorker
from pythia.brokers.kafka import KafkaConsumer
class BatchOrderProcessor(BatchWorker):
source = KafkaConsumer(topic="orders")
batch_size = 100
batch_timeout = 5.0 # seconds
async def process_batch(self, messages: List[Message]) -> List[Any]:
"""Process multiple messages at once"""
orders = [msg.body for msg in messages]
processed = await self.bulk_process_orders(orders)
return processed
SimpleWorker
::: pythia.core.worker.SimpleWorker Simplified worker created from functions.
from pythia import simple_worker
from pythia.brokers.kafka import KafkaConsumer
@simple_worker(source=KafkaConsumer(topic="events"))
async def event_handler(message):
"""Simple function-based worker"""
print(f"Received event: {message.body}")
return {"processed": True}
# Run it
if __name__ == "__main__":
event_handler.run()
Message
::: pythia.core.message.Message Universal message abstraction for all brokers.
Constructor
from pythia.core.message import Message
from datetime import datetime
message = Message(
body={"user_id": 123, "action": "login"},
headers={"source": "auth-service"},
message_id="msg-123",
timestamp=datetime.utcnow(),
source_info={"topic": "user-events", "partition": 0}
)
Attributes
- body: Any - Message payload (dict, string, bytes, etc.)
- headers: Dict[str, Any] - Message headers/metadata
- message_id: str - Unique message identifier
- timestamp: datetime - Message timestamp
- source_info: Dict[str, Any] - Broker-specific information
- retry_count: int - Number of processing retries
- max_retries: int - Maximum allowed retries
Factory Methods
# Create from Kafka message
kafka_message = Message.from_kafka(kafka_record)
# Create from RabbitMQ message
rabbitmq_message = Message.from_rabbitmq(rabbitmq_message)
# Create from Redis message
redis_message = Message.from_redis(redis_data, stream_name)
Methods
def to_dict(self) -> Dict[str, Any]:
"""Convert message to dictionary"""
pass
def should_retry(self) -> bool:
"""Check if message should be retried"""
pass
def increment_retry(self) -> None:
"""Increment retry counter"""
pass
Example Usage
from pythia.core.message import Message
# Create a message
message = Message(
body={"order_id": 12345, "amount": 99.99},
headers={"priority": "high", "source": "web"},
message_id="order-12345"
)
# Check if retryable
if message.should_retry():
message.increment_retry()
await retry_processing(message)
# Convert to dict for serialization
message_dict = message.to_dict()
LifecycleManager
::: pythia.core.lifecycle.LifecycleManager Manages worker startup, shutdown, and signal handling.
Constructor
from pythia.core.lifecycle import LifecycleManager
lifecycle = LifecycleManager(
startup_timeout=30.0,
shutdown_timeout=30.0,
graceful_shutdown=True
)
Methods
async def startup(self, components: List[Any]) -> None:
"""Start all components"""
pass
async def shutdown(self, components: List[Any]) -> None:
"""Shutdown all components"""
pass
def setup_signal_handlers(self, stop_callback: Callable) -> None:
"""Setup SIGINT/SIGTERM handlers"""
pass
Example Usage
from pythia.core.lifecycle import LifecycleManager
class MyWorker(Worker):
def __init__(self):
super().__init__()
self.lifecycle = LifecycleManager()
self.lifecycle.setup_signal_handlers(self.stop)
async def startup(self):
await self.lifecycle.startup([self.source, self.sink])
async def shutdown(self):
await self.lifecycle.shutdown([self.source, self.sink])
WorkerRunner
::: pythia.core.lifecycle.WorkerRunner Utility for running workers with proper lifecycle management.
from pythia.core.lifecycle import WorkerRunner
# Run worker with proper lifecycle
runner = WorkerRunner(worker_instance)
await runner.run()
# Or run synchronously
runner.run_sync()
Configuration Classes
WorkerConfig
from pythia.config import WorkerConfig
config = WorkerConfig(
worker_name="order-processor",
worker_id="worker-1",
batch_size=100,
batch_timeout=5.0,
max_retries=3,
retry_delay=1.0,
log_level="INFO",
enable_metrics=True,
health_check_interval=30.0,
)
MetricsConfig
from pythia.monitoring import MetricsConfig
metrics_config = MetricsConfig(
enabled=True,
port=8000,
path="/metrics",
push_gateway_url="http://prometheus-pushgateway:9091",
job_name="pythia-worker"
)
Error Handling
PythiaError
from pythia.exceptions import PythiaError, WorkerError, BrokerError
try:
await worker.process(message)
except WorkerError as e:
logger.error(f"Worker error: {e}")
except BrokerError as e:
logger.error(f"Broker error: {e}")
except PythiaError as e:
logger.error(f"Framework error: {e}")
Retry Decorators
from pythia.utils.retry import retry_on_failure
class MyWorker(Worker):
@retry_on_failure(max_retries=3, delay=1.0)
async def process(self, message):
# This method will auto-retry on failure
result = await self.risky_operation(message.body)
return result
Testing Utilities
WorkerTestCase
from pythia.testing import WorkerTestCase
class TestMyWorker(WorkerTestCase):
def setUp(self):
self.worker = MyWorker()
async def test_message_processing(self):
message = self.create_message({"test": "data"})
result = await self.worker.process(message)
self.assertEqual(result["status"], "processed")
def test_worker_health(self):
self.assertTrue(self.worker.health_check())