Skip to content

Configuration

Complete guide to configuring Pythia workers for different environments and use cases.

Overview

Pythia uses Pydantic for configuration management, providing type validation, environment variable support, and clear error messages. Configuration can be set via code, environment variables, or configuration files.

Configuration Hierarchy

from pythia.config import WorkerConfig

# 1. Code-based configuration (highest priority)
config = WorkerConfig(
    worker_name="my-worker",
    max_concurrent=10
)

# 2. Environment variables (medium priority)
# PYTHIA_WORKER_NAME=my-worker
# PYTHIA_MAX_CONCURRENT=10

# 3. Configuration files (lowest priority)
# config.yaml, config.json, or .env files

Core Worker Configuration

Basic Configuration

from pythia.config import WorkerConfig

config = WorkerConfig(
    # Worker identification
    worker_name="email-processor",          # Worker name for logs/metrics
    worker_id="email-processor-001",        # Unique worker ID (auto-generated if not provided)

    # Processing settings
    max_retries=3,                          # Maximum retry attempts for failed messages
    retry_delay=1.0,                        # Delay between retries in seconds
    batch_size=10,                          # Batch size for processing
    max_concurrent=20,                      # Maximum concurrent workers

    # Broker configuration
    broker_type="kafka",                    # Message broker type (kafka/redis/rabbitmq)
    multi_broker=False,                     # Enable multi-broker support

    # Logging configuration
    log_level="INFO",                       # Log level (DEBUG, INFO, WARNING, ERROR)
    log_format="json",                      # Log format (json/text)
    log_file="/var/log/pythia/worker.log", # Log file path (optional)

    # Health check configuration
    health_check_interval=30,               # Health check interval in seconds
    health_check_timeout=10                 # Health check timeout in seconds
)

Environment Variable Configuration

# Core settings
export PYTHIA_WORKER_NAME="production-worker"
export PYTHIA_WORKER_ID="prod-worker-001"
export PYTHIA_BROKER_TYPE="kafka"
export PYTHIA_MAX_CONCURRENT=50

# Retry configuration
export PYTHIA_MAX_RETRIES=5
export PYTHIA_RETRY_DELAY=2.0

# Logging
export PYTHIA_LOG_LEVEL="WARNING"
export PYTHIA_LOG_FORMAT="json"
export PYTHIA_LOG_FILE="/var/log/pythia/production.log"

# Health checks
export PYTHIA_HEALTH_CHECK_INTERVAL=60
export PYTHIA_HEALTH_CHECK_TIMEOUT=15
from pythia.config import WorkerConfig

# Automatically loads from environment variables
config = WorkerConfig()
print(f"Worker: {config.worker_name}")  # Output: production-worker

Broker-Specific Configuration

Redis Configuration

from pythia.config.redis import RedisConfig

# Basic Redis setup
redis_config = RedisConfig(
    host="localhost",
    port=6379,
    db=0,
    password=None,

    # Queue configuration
    queue="task-queue",                     # List-based queue name
    batch_size=50,                         # Messages per batch
    block_timeout_ms=1000,                 # Polling timeout

    # Connection pooling
    connection_pool_size=20,               # Pool size
    socket_keepalive=True,                 # Keep connections alive
    socket_timeout=30,                     # Socket timeout
    retry_on_timeout=True,                 # Retry on timeout

    # Health monitoring
    health_check_interval=30               # Health check frequency
)

# Stream configuration
redis_stream_config = RedisConfig(
    host="redis-cluster.internal",
    port=6379,

    # Stream settings
    stream="events-stream",                # Stream name
    consumer_group="workers",              # Consumer group
    batch_size=100,                       # Larger batches for streams
    max_stream_length=50000,              # Limit stream size

    # Consumer settings
    block_timeout_ms=5000,                # 5 second block timeout
    consumer_name="worker-001"            # Consumer identifier
)

# Environment variables for Redis
# REDIS_HOST=redis.example.com
# REDIS_PORT=6380
# REDIS_PASSWORD=secure_password
# REDIS_DB=1
# REDIS_QUEUE=production-queue

Kafka Configuration

from pythia.config.kafka import KafkaConfig

# Basic Kafka setup
kafka_config = KafkaConfig(
    bootstrap_servers="kafka1:9092,kafka2:9092,kafka3:9092",

    # Consumer settings
    group_id="email-processors",
    topics=["emails", "notifications"],
    auto_offset_reset="earliest",          # Start from beginning
    enable_auto_commit=False,              # Manual commits for reliability

    # Performance tuning
    max_poll_records=1000,                 # Messages per poll
    fetch_min_bytes=50000,                 # Wait for more data
    fetch_max_wait_ms=500,                 # But don't wait too long

    # Session management
    session_timeout_ms=30000,              # 30 second session timeout
    heartbeat_interval_ms=3000,            # Heartbeat every 3 seconds
    max_poll_interval_ms=600000,           # 10 minutes max processing time

    # Producer settings (for output)
    acks="all",                           # Wait for all replicas
    retries=5,                            # Retry failed sends
    batch_size=32768,                     # 32KB batches
    linger_ms=10                          # Wait 10ms for batching
)

# Security configuration
kafka_secure_config = KafkaConfig(
    bootstrap_servers="secure-kafka:9093",

    # SASL authentication
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_username="worker-user",
    sasl_password="secure-password",

    # SSL settings
    ssl_ca_location="/etc/ssl/ca.crt",
    ssl_certificate_location="/etc/ssl/client.crt",
    ssl_key_location="/etc/ssl/client.key",
    ssl_key_password="key-password"
)

# Environment variables for Kafka
# KAFKA_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092
# KAFKA_GROUP_ID=production-group
# KAFKA_TOPICS=orders,payments,notifications
# KAFKA_SECURITY_PROTOCOL=SASL_SSL
# KAFKA_SASL_USERNAME=prod-user
# KAFKA_SASL_PASSWORD=secure-password

RabbitMQ Configuration

from pythia.config.rabbitmq import RabbitMQConfig

# Basic RabbitMQ setup
rabbitmq_config = RabbitMQConfig(
    url="amqp://user:password@rabbitmq.internal:5672/production",

    # Queue configuration
    queue="notifications-queue",           # Queue name
    exchange="notifications-exchange",     # Exchange name
    routing_key="notification.*",         # Routing pattern

    # Durability settings
    durable=True,                         # Survive broker restart
    auto_ack=False,                       # Manual acknowledgment

    # Performance tuning
    prefetch_count=100,                   # Messages to prefetch

    # Connection settings
    heartbeat=600,                        # 10 minute heartbeat
    connection_attempts=5,                # Retry connection attempts
    retry_delay=2.0                       # Delay between attempts
)

# Advanced routing configuration
rabbitmq_routing_config = RabbitMQConfig(
    url="amqp://rabbitmq-cluster:5672/",

    # Topic exchange for complex routing
    queue="user-events-queue",
    exchange="events-topic-exchange",
    routing_key="user.*.created",         # Pattern matching

    # Exchange configuration
    exchange_type="topic",                # topic, direct, fanout, headers
    exchange_durable=True,

    # Queue arguments for advanced features
    queue_arguments={
        "x-dead-letter-exchange": "dlq-exchange",
        "x-dead-letter-routing-key": "failed",
        "x-message-ttl": 300000,          # 5 minute TTL
        "x-max-length": 10000             # Max queue length
    }
)

# Environment variables for RabbitMQ
# RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/vhost
# RABBITMQ_QUEUE=production-queue
# RABBITMQ_EXCHANGE=production-exchange
# RABBITMQ_ROUTING_KEY=task.process
# RABBITMQ_PREFETCH_COUNT=50

Advanced Configuration

Logging Configuration

from pythia.config import LogConfig

log_config = LogConfig(
    level="INFO",                         # Log level
    format="json",                        # Output format (json/text)
    file="/var/log/pythia/worker.log",    # Log file path
    rotation="100 MB",                    # Rotate when file reaches size
    retention="30 days",                  # Keep logs for 30 days

    # Structured logging
    add_timestamp=True,                   # Include timestamp
    add_worker_id=True,                   # Include worker ID
    add_correlation_id=True               # Include correlation ID for tracing
)

# Advanced logging setup
class CustomWorker(Worker):
    def __init__(self, config: WorkerConfig):
        super().__init__(config)

        # Custom log configuration
        self.logger.configure(
            handlers=[
                {
                    "sink": sys.stdout,
                    "format": "{time} | {level} | {name} | {message}",
                    "level": "INFO"
                },
                {
                    "sink": "/var/log/pythia/errors.log",
                    "format": "{time} | {level} | {name} | {message}",
                    "level": "ERROR",
                    "rotation": "10 MB"
                }
            ]
        )

Metrics Configuration

from pythia.config import MetricsConfig

metrics_config = MetricsConfig(
    enabled=True,                         # Enable metrics collection
    port=8080,                           # Metrics server port
    path="/metrics",                     # Metrics endpoint path

    # Prometheus configuration
    prometheus_enabled=True,
    prometheus_prefix="pythia_worker",    # Metric name prefix

    # Custom metrics
    custom_metrics={
        "message_processing_duration": {
            "type": "histogram",
            "description": "Time spent processing messages",
            "buckets": [0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
        },
        "queue_depth": {
            "type": "gauge",
            "description": "Current queue depth"
        },
        "error_rate": {
            "type": "counter",
            "description": "Number of processing errors"
        }
    }
)

# Use in worker
class MetricsAwareWorker(Worker):
    def __init__(self, config: WorkerConfig):
        super().__init__(config, metrics_config=metrics_config)

    async def process_message(self, message):
        # Record processing time
        with self.metrics.timer("message_processing_duration"):
            result = await self._process_message(message)

        # Update counters
        self.metrics.counter("messages_processed_total").inc()

        return result

Security Configuration

from pythia.config import SecurityConfig

security_config = SecurityConfig(
    # SSL/TLS settings
    ssl_enabled=True,
    ssl_cert_file="/etc/ssl/certs/worker.crt",
    ssl_key_file="/etc/ssl/private/worker.key",
    ssl_ca_file="/etc/ssl/certs/ca.crt",

    # Authentication
    auth_enabled=True,
    auth_method="oauth2",                 # oauth2, basic, api_key

    # Field encryption
    encryption_enabled=True,
    encryption_key="base64-encoded-key"   # AES encryption key
)

# Secure worker example
class SecureWorker(Worker):
    def __init__(self, config: WorkerConfig):
        super().__init__(config)
        self.security_config = security_config

    async def process_message(self, message):
        # Decrypt sensitive fields if needed
        if self.security_config.encryption_enabled:
            message = await self._decrypt_message(message)

        result = await self._process_secure_message(message)

        # Encrypt result if needed
        if self.security_config.encryption_enabled:
            result = await self._encrypt_result(result)

        return result

Resilience Configuration

from pythia.config import ResilienceConfig

resilience_config = ResilienceConfig(
    # Retry settings
    max_retries=5,                        # Maximum retry attempts
    retry_delay=1.0,                      # Initial delay
    retry_backoff=2.0,                    # Exponential backoff multiplier
    retry_max_delay=60.0,                 # Maximum delay between retries

    # Circuit breaker
    circuit_breaker_enabled=True,
    circuit_breaker_threshold=10,         # Failures to trigger breaker
    circuit_breaker_timeout=60,           # Breaker reset timeout

    # Timeouts
    processing_timeout=300,               # Per-message timeout (5 minutes)
    connection_timeout=30,                # Connection timeout

    # Rate limiting
    rate_limit_enabled=True,
    rate_limit_requests=1000,             # Requests per minute
    rate_limit_window=60                  # Time window in seconds
)

# Use resilience config
config = WorkerConfig(
    worker_name="resilient-worker",
    resilience=resilience_config
)

Environment-Specific Configuration

Development Configuration

# config/development.py
from pythia.config import WorkerConfig
from pythia.config.redis import RedisConfig

def get_development_config() -> WorkerConfig:
    return WorkerConfig(
        worker_name="dev-worker",
        broker_type="redis",
        log_level="DEBUG",
        log_format="text",                # Readable format for dev
        max_concurrent=5,                 # Lower concurrency
        health_check_interval=10,         # Frequent health checks

        # Development-friendly settings
        max_retries=1,                    # Fail fast for debugging
        retry_delay=0.5                   # Quick retries
    )

def get_redis_config() -> RedisConfig:
    return RedisConfig(
        host="localhost",
        port=6379,
        db=1,                            # Use different DB for dev
        queue="dev-queue",
        batch_size=5                     # Small batches for testing
    )

Production Configuration

# config/production.py
from pythia.config import WorkerConfig
from pythia.config.kafka import KafkaConfig

def get_production_config() -> WorkerConfig:
    return WorkerConfig(
        worker_name="prod-worker",
        broker_type="kafka",
        log_level="INFO",
        log_format="json",               # Structured logging
        log_file="/var/log/pythia/production.log",

        # Production settings
        max_concurrent=50,               # High concurrency
        max_retries=5,                   # More retry attempts
        retry_delay=2.0,

        # Monitoring
        health_check_interval=60,

        # Security
        ssl_enabled=True
    )

def get_kafka_config() -> KafkaConfig:
    return KafkaConfig(
        bootstrap_servers=os.getenv("KAFKA_BROKERS"),
        group_id="production-workers",
        topics=["orders", "payments", "notifications"],

        # Production optimization
        max_poll_records=2000,
        fetch_min_bytes=100000,
        session_timeout_ms=45000,

        # Security
        security_protocol="SASL_SSL",
        sasl_mechanism="PLAIN",
        sasl_username=os.getenv("KAFKA_USERNAME"),
        sasl_password=os.getenv("KAFKA_PASSWORD")
    )

Testing Configuration

# config/testing.py
from pythia.config import WorkerConfig

def get_testing_config() -> WorkerConfig:
    return WorkerConfig(
        worker_name="test-worker",
        broker_type="memory",            # In-memory broker for tests
        log_level="ERROR",               # Minimal logging
        max_concurrent=1,                # Single-threaded for tests
        max_retries=0,                   # No retries in tests
        health_check_interval=1          # Fast health checks
    )

Configuration Files

YAML Configuration

# config.yaml
pythia:
  worker:
    name: "yaml-worker"
    max_concurrent: 20
    log_level: "INFO"

  redis:
    host: "redis.example.com"
    port: 6379
    queue: "production-queue"
    batch_size: 50

  metrics:
    enabled: true
    port: 8080
    prometheus_enabled: true
import yaml
from pythia.config import WorkerConfig

def load_config_from_yaml(file_path: str) -> WorkerConfig:
    with open(file_path, 'r') as file:
        config_data = yaml.safe_load(file)

    return WorkerConfig(**config_data['pythia']['worker'])

JSON Configuration

{
  "pythia": {
    "worker": {
      "name": "json-worker",
      "max_concurrent": 30,
      "log_level": "INFO",
      "broker_type": "kafka"
    },
    "kafka": {
      "bootstrap_servers": "kafka1:9092,kafka2:9092",
      "group_id": "json-workers",
      "topics": ["events"]
    }
  }
}

Environment File (.env)

# .env
PYTHIA_WORKER_NAME=env-worker
PYTHIA_BROKER_TYPE=rabbitmq
PYTHIA_MAX_CONCURRENT=25
PYTHIA_LOG_LEVEL=WARNING

RABBITMQ_URL=amqp://user:pass@rabbitmq:5672/prod
RABBITMQ_QUEUE=production-tasks
RABBITMQ_PREFETCH_COUNT=100

Configuration Validation

from pydantic import ValidationError
from pythia.config import WorkerConfig

def validate_config():
    """Example of configuration validation"""
    try:
        config = WorkerConfig(
            worker_name="test",
            max_concurrent=-1,           # Invalid value
            log_level="INVALID"          # Invalid level
        )
    except ValidationError as e:
        print("Configuration errors:")
        for error in e.errors():
            print(f"  {error['loc'][0]}: {error['msg']}")

# Output:
# Configuration errors:
#   max_concurrent: ensure this value is greater than 0
#   log_level: value is not a valid enumeration member

Dynamic Configuration Updates

class ConfigurableWorker(Worker):
    def __init__(self, config: WorkerConfig):
        super().__init__(config)
        self.config_file_path = "config.yaml"
        self.last_config_check = time.time()

    async def _check_config_updates(self):
        """Check for configuration updates"""
        if time.time() - self.last_config_check > 60:  # Check every minute
            try:
                new_config = self._load_config_from_file()
                if new_config != self.config:
                    self.logger.info("Configuration updated, applying changes...")
                    await self._apply_config_changes(new_config)
                    self.config = new_config

            except Exception as e:
                self.logger.error(f"Failed to reload configuration: {e}")

            self.last_config_check = time.time()

    async def _apply_config_changes(self, new_config: WorkerConfig):
        """Apply configuration changes without restart"""
        # Update logging level
        if new_config.log_level != self.config.log_level:
            self.logger.configure(level=new_config.log_level)

        # Update concurrency
        if new_config.max_concurrent != self.config.max_concurrent:
            await self._adjust_concurrency(new_config.max_concurrent)

Best Practices

1. Environment Separation

import os

def get_config() -> WorkerConfig:
    """Get configuration based on environment"""
    env = os.getenv("ENVIRONMENT", "development")

    if env == "production":
        return get_production_config()
    elif env == "staging":
        return get_staging_config()
    else:
        return get_development_config()

2. Secret Management

import os
from typing import Optional

class SecretManager:
    """Manage secrets from various sources"""

    @staticmethod
    def get_secret(key: str) -> Optional[str]:
        """Get secret from environment or secret store"""
        # Try environment first
        value = os.getenv(key)
        if value:
            return value

        # Try AWS Secrets Manager, HashiCorp Vault, etc.
        return SecretManager._get_from_secret_store(key)

    @staticmethod
    def _get_from_secret_store(key: str) -> Optional[str]:
        # Implementation depends on your secret store
        pass

# Usage
kafka_config = KafkaConfig(
    bootstrap_servers=os.getenv("KAFKA_BROKERS"),
    sasl_username=SecretManager.get_secret("KAFKA_USERNAME"),
    sasl_password=SecretManager.get_secret("KAFKA_PASSWORD")
)

3. Configuration Testing

import pytest
from pythia.config import WorkerConfig

class TestConfiguration:
    def test_default_config(self):
        """Test default configuration values"""
        config = WorkerConfig()
        assert config.worker_name == "pythia-worker"
        assert config.max_retries == 3
        assert config.log_level == "INFO"

    def test_environment_override(self, monkeypatch):
        """Test environment variable overrides"""
        monkeypatch.setenv("PYTHIA_WORKER_NAME", "test-worker")
        monkeypatch.setenv("PYTHIA_MAX_RETRIES", "5")

        config = WorkerConfig()
        assert config.worker_name == "test-worker"
        assert config.max_retries == 5

    def test_invalid_config(self):
        """Test configuration validation"""
        with pytest.raises(ValidationError):
            WorkerConfig(max_concurrent=-1)

Next Steps