HTTP Workers
Pythia's HTTP Workers enable efficient HTTP-based tasks like API polling and webhook sending with built-in resilience patterns.
🔍 HTTP Worker Types
1. PollerWorker - API Polling
Worker that performs HTTP polling to external APIs at regular intervals.
2. WebhookSenderWorker - Webhook Sending
Worker that sends HTTP webhooks with automatic retry and broadcast support.
3. HTTPWorker - Base Class
Base class that provides common HTTP functionality for all workers.
🚀 PollerWorker
Basic Use Case
from pythia.brokers.http import PollerWorker
from pythia.config.http import PollerConfig
class PaymentStatusPoller(PollerWorker):
def __init__(self):
super().__init__(
url="https://api.payments.com/status",
interval=30, # Poll every 30 seconds
method="GET",
headers={"Authorization": "Bearer your-token"},
params={"status": "pending"}
)
async def process_message(self, message):
"""Process HTTP response from polling"""
data = message.body
# Handle different response types
if isinstance(data, list):
for item in data:
await self.process_payment_update(item)
else:
await self.process_payment_update(data)
return {"processed": True, "count": len(data) if isinstance(data, list) else 1}
async def process_payment_update(self, payment_data):
"""Process individual payment update"""
payment_id = payment_data.get("id")
status = payment_data.get("status")
if status == "completed":
await self.handle_payment_completed(payment_id)
elif status == "failed":
await self.handle_payment_failed(payment_id, payment_data.get("error"))
self.logger.info(f"Processed payment {payment_id} with status {status}")
# Run the worker
if __name__ == "__main__":
poller = PaymentStatusPoller()
poller.run_sync()
Advanced Configuration
from pythia.config.http import PollerConfig
class AdvancedAPIPoller(PollerWorker):
def __init__(self):
# Advanced poller configuration
poller_config = PollerConfig(
base_url="https://api.example.com",
url="https://api.example.com/events",
interval=60,
method="POST",
connect_timeout=10.0,
read_timeout=30.0,
max_connections=50,
verify_ssl=True,
follow_redirects=True,
user_agent="MyApp/1.0"
)
super().__init__(
url="https://api.example.com/events",
interval=60,
method="POST",
headers={
"Authorization": "Bearer your-token",
"Content-Type": "application/json"
},
poller_config=poller_config
)
async def process_message(self, message):
# Process response with metadata
response_data = message.body
status_code = message.headers.get("status_code")
timestamp = message.timestamp
self.logger.info(
f"Received response with status {status_code} at {timestamp}",
response_size=len(str(response_data))
)
return {"status": "processed", "response_code": status_code}
Custom Data Extraction
def extract_events(response_data):
"""Custom data extractor function"""
if "events" in response_data:
return response_data["events"] # Return list of events
return [response_data] # Single event
class EventPoller(PollerWorker):
def __init__(self):
super().__init__(
url="https://api.events.com/feed",
interval=120,
data_extractor=extract_events # Custom data extraction
)
async def process_message(self, message):
"""Each message contains a single extracted event"""
event = message.body
event_type = event.get("type")
if event_type == "user_signup":
await self.handle_user_signup(event)
elif event_type == "purchase":
await self.handle_purchase(event)
return {"event_type": event_type, "processed": True}
📤 WebhookSenderWorker
Basic Use Case
from pythia.brokers.http import WebhookSenderWorker
from pythia.config.http import WebhookConfig
class OrderNotificationSender(WebhookSenderWorker):
def __init__(self):
super().__init__(
base_url="https://webhooks.example.com",
webhook_config=WebhookConfig(
base_url="https://webhooks.example.com",
timeout=30,
max_retries=3,
retry_delay=2.0
)
)
async def process(self):
"""Main processing loop - override based on your needs"""
# This is where you'd consume from another source
# and send webhooks based on that data
pass
async def send_order_created(self, order_data):
"""Send order created webhook"""
success = await self.send_webhook(
endpoint="/orders/created",
data={
"event": "order_created",
"order_id": order_data["id"],
"customer_id": order_data["customer_id"],
"amount": order_data["total"],
"timestamp": order_data["created_at"]
},
headers={"X-Event-Source": "order-service"}
)
if success:
self.logger.info(f"Order created webhook sent for order {order_data['id']}")
else:
self.logger.error(f"Failed to send order created webhook for order {order_data['id']}")
return success
Worker that Consumes and Sends Webhooks
from pythia.core.message import Message
from pythia.brokers.redis import RedisListConsumer
class OrderWebhookRelay(WebhookSenderWorker):
def __init__(self):
super().__init__(base_url="https://partner-webhooks.com")
# Source to consume messages
self.source = RedisListConsumer(queue="order_events")
async def process(self):
"""Consume from Redis and send webhooks"""
async for message in self.source.consume():
await self.process_message(message)
async def process_message(self, message: Message):
"""Process message from Redis and send webhook"""
order_data = message.body
event_type = order_data.get("event_type")
# Route to different webhook endpoints based on event type
if event_type == "order_created":
endpoint = "/webhooks/orders/created"
elif event_type == "order_updated":
endpoint = "/webhooks/orders/updated"
elif event_type == "order_cancelled":
endpoint = "/webhooks/orders/cancelled"
else:
self.logger.warning(f"Unknown event type: {event_type}")
return {"error": "unknown_event_type"}
# Send webhook
success = await self.send_webhook_from_message(
message=message,
endpoint=endpoint,
headers={"X-Source": "order-service", "X-Event-Type": event_type}
)
return {"webhook_sent": success, "event_type": event_type}
Broadcast to Multiple Endpoints
class SystemAlertBroadcaster(WebhookSenderWorker):
def __init__(self):
super().__init__(base_url="https://alerts.example.com")
async def broadcast_system_alert(self, alert_data, urgent=False):
"""Broadcast system alert to multiple endpoints"""
# Define endpoints based on urgency
if urgent:
endpoints = [
"/critical/alerts",
"/slack/alerts",
"/email/alerts",
"/sms/alerts"
]
fail_fast = True # Stop on first failure for urgent alerts
else:
endpoints = [
"/general/alerts",
"/slack/alerts"
]
fail_fast = False # Continue even if some fail
# Broadcast to all endpoints
results = await self.broadcast_webhook(
endpoints=endpoints,
data={
"alert_type": alert_data["type"],
"severity": "urgent" if urgent else "normal",
"message": alert_data["message"],
"timestamp": alert_data["timestamp"],
"source": alert_data.get("source", "system")
},
headers={"X-Alert-Priority": "high" if urgent else "normal"},
fail_fast=fail_fast
)
# Log results
successful = sum(results.values())
total = len(results)
self.logger.info(
f"Alert broadcast completed: {successful}/{total} successful",
results=results,
alert_type=alert_data["type"]
)
return results
🔧 Configuration
Environment Variables
# HTTP Client Configuration
HTTP_BASE_URL=https://api.example.com
HTTP_TIMEOUT=30
HTTP_MAX_RETRIES=3
HTTP_RETRY_DELAY=1.0
HTTP_RETRY_BACKOFF=2.0
# Authentication
HTTP_AUTH_TYPE=bearer
HTTP_BEARER_TOKEN=your-bearer-token
HTTP_API_KEY=your-api-key
HTTP_API_KEY_HEADER=X-API-Key
# SSL/TLS
HTTP_SSL_VERIFY=true
HTTP_SSL_CERT_FILE=/path/to/cert.pem
HTTP_SSL_KEY_FILE=/path/to/key.pem
# Connection Settings
HTTP_CONNECTION_POOL_SIZE=100
HTTP_MAX_KEEPALIVE_CONNECTIONS=20
HTTP_KEEPALIVE_EXPIRY=300
# Poller Specific
POLLER_URL=https://api.example.com/events
POLLER_INTERVAL=60
POLLER_METHOD=GET
POLLER_CONNECT_TIMEOUT=10.0
POLLER_READ_TIMEOUT=30.0
Programmatic Configuration
from pythia.http import HTTPClientConfig
from pythia.config.http import PollerConfig, WebhookConfig
# HTTP Client base config
http_config = HTTPClientConfig(
max_connections=100,
connect_timeout=5.0,
read_timeout=30.0,
verify_ssl=True,
default_headers={"User-Agent": "MyApp/1.0"}
)
# Poller specific config
poller_config = PollerConfig(
base_url="https://api.example.com",
url="https://api.example.com/events",
interval=60,
method="GET",
connect_timeout=10.0,
max_connections=50
)
# Webhook specific config
webhook_config = WebhookConfig(
base_url="https://webhooks.example.com",
timeout=30,
max_retries=5,
retry_delay=2.0,
retry_backoff=2.0,
endpoints={
"orders": "/orders/webhook",
"users": "/users/webhook",
"alerts": "/system/alerts"
}
)
🛡️ Resilience Patterns
Circuit Breaker
from pythia.http import CircuitBreakerConfig
class ResilientAPIPoller(PollerWorker):
def __init__(self):
# Configure circuit breaker
circuit_config = CircuitBreakerConfig(
failure_threshold=5, # Open after 5 failures
timeout_seconds=60, # Stay open for 60 seconds
expected_exception=Exception
)
http_config = HTTPClientConfig(
circuit_breaker_config=circuit_config
)
super().__init__(
url="https://unreliable-api.com/data",
interval=30,
http_config=http_config
)
async def process_message(self, message):
try:
# Process normally
return await self.handle_api_response(message.body)
except Exception as e:
# Circuit breaker will handle failures
self.logger.error(f"API processing failed: {e}")
return {"error": str(e), "processed": False}
Retry Policies
from pythia.http import RetryConfig, RetryStrategy
class RetryableWebhookSender(WebhookSenderWorker):
def __init__(self):
# Configure custom retry policy
retry_config = RetryConfig(
strategy=RetryStrategy.EXPONENTIAL,
max_attempts=5,
initial_delay=1.0,
max_delay=60.0,
exponential_base=2.0,
jitter=True # Add randomness to prevent thundering herd
)
http_config = HTTPClientConfig(
retry_config=retry_config
)
super().__init__(
base_url="https://webhooks.example.com",
http_config=http_config
)
async def send_critical_webhook(self, data):
"""Send webhook with custom retry logic"""
success = await self.send_webhook(
endpoint="/critical/alerts",
data=data,
headers={"X-Priority": "critical"}
)
if not success:
# Log failure after all retries exhausted
self.logger.critical(
"Critical webhook failed after all retries",
data=data
)
return success
📊 Monitoring and Logging
Structured Logging
class MonitoredPoller(PollerWorker):
def __init__(self):
super().__init__(
url="https://api.example.com/metrics",
interval=60
)
self.metrics = {
"requests_sent": 0,
"responses_received": 0,
"errors_count": 0
}
async def process_message(self, message):
self.metrics["responses_received"] += 1
# Log with context
self.logger.info(
"Processing API response",
url=self.url,
status_code=message.headers.get("status_code"),
response_size=len(str(message.body)),
metrics=self.metrics
)
try:
result = await self.handle_response(message.body)
return result
except Exception as e:
self.metrics["errors_count"] += 1
self.logger.error(
"Error processing API response",
error=str(e),
metrics=self.metrics,
exc_info=True
)
raise
Health Checks
class HealthCheckPoller(PollerWorker):
def __init__(self):
super().__init__(
url="https://api.example.com/health",
interval=30
)
self.last_successful_poll = None
self.consecutive_failures = 0
async def process_message(self, message):
"""Monitor API health"""
from datetime import datetime
status_code = message.headers.get("status_code", 0)
if 200 <= status_code < 300:
self.last_successful_poll = datetime.now()
self.consecutive_failures = 0
self.logger.info(
"API health check successful",
status_code=status_code,
response_time_ms=message.headers.get("response_time", 0)
)
else:
self.consecutive_failures += 1
self.logger.warning(
"API health check failed",
status_code=status_code,
consecutive_failures=self.consecutive_failures
)
# Alert on multiple failures
if self.consecutive_failures >= 3:
await self.send_health_alert()
return {
"healthy": 200 <= status_code < 300,
"status_code": status_code,
"consecutive_failures": self.consecutive_failures
}
async def send_health_alert(self):
"""Send alert when API is unhealthy"""
self.logger.critical(
f"API health check failed {self.consecutive_failures} times consecutively",
url=self.url
)
🎯 Common Use Cases
1. Payment API Integration
class PaymentWebhookProcessor(WebhookSenderWorker):
async def process_payment_event(self, payment_data):
if payment_data["status"] == "completed":
await self.send_webhook("/payments/completed", payment_data)
elif payment_data["status"] == "failed":
await self.send_webhook("/payments/failed", payment_data)
2. External API Monitoring
class APIMonitor(PollerWorker):
def __init__(self):
super().__init__(url="https://status.example.com/api", interval=60)
async def process_message(self, message):
if message.body.get("status") != "operational":
await self.alert_ops_team(message.body)
3. Data Synchronization
class DataSyncPoller(PollerWorker):
async def process_message(self, message):
# Sync external data to local database
await self.sync_data_to_database(message.body)
HTTP Workers provide a robust and scalable way to work with HTTP APIs in Pythia, with built-in resilience patterns and flexible configuration to adapt to any use case.