Skip to content

Cloud Message Queues

Pythia's Cloud Workers provide complete support for the main cloud messaging platforms: AWS, Google Cloud, and Azure. Implemented following the anti-over-engineering principle, they offer consistent APIs and simple configuration.

🌟 Key Features

  • 📦 Modular Installation: Install only the providers you need
  • 🔄 Unified API: Same interface for all cloud providers
  • ⚡ Simple Configuration: Standard environment variables
  • 🛡️ Error Handling: Platform-specific error management
  • 🔧 Lazy Loading: No unnecessary dependencies

🚀 Installation

# AWS only
uv add pythia[aws]

# Google Cloud only
uv add pythia[gcp]

# Azure only
uv add pythia[azure]

# All cloud providers
uv add pythia[cloud]

☁️ Amazon Web Services (AWS)

Complete support for Amazon SQS (Simple Queue Service) and Amazon SNS (Simple Notification Service).

📥 SQS Consumer

Basic Example

from pythia.brokers.cloud.aws import SQSConsumer
from pythia.config.cloud import AWSConfig

class OrderProcessor(SQSConsumer):
    def __init__(self):
        aws_config = AWSConfig(
            region="us-east-1",
            queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders"
        )
        super().__init__(
            queue_url="https://sqs.us-east-1.amazonaws.com/123456789/orders",
            aws_config=aws_config
        )

    async def process_message(self, message):
        """Process SQS message"""
        order_data = message.body
        order_id = order_data.get('order_id')

        # Your business logic
        print(f"Processing order: {order_id}")
        await self.process_order(order_data)

        return {"processed": True, "order_id": order_id}

    async def process_order(self, order_data):
        # Simulate processing
        await asyncio.sleep(0.1)
        print(f"Order {order_data['order_id']} completed")

# Execute
if __name__ == "__main__":
    OrderProcessor().run_sync()

Advanced Configuration

from pythia.config.cloud import AWSConfig

class AdvancedSQSProcessor(SQSConsumer):
    def __init__(self):
        aws_config = AWSConfig(
            region="us-west-2",
            access_key_id="AKIA...",  # Or use environment variables
            secret_access_key="...",
            endpoint_url="http://localhost:4566",  # For LocalStack
            queue_url="https://sqs.us-west-2.amazonaws.com/123/priority-orders",
            visibility_timeout=60,  # 1 minute
            wait_time_seconds=20,   # Long polling
            max_messages=5          # Batch size
        )

        super().__init__(
            queue_url=aws_config.queue_url,
            aws_config=aws_config
        )

    async def process_message(self, message):
        # Access to SQS metadata
        message_id = message.headers.get('MessageId')
        receipt_handle = message.headers.get('ReceiptHandle')
        delivery_count = message.headers.get('MD5OfBody')

        # Custom attributes
        priority = message.headers.get('attr_priority', 'normal')
        source = message.headers.get('attr_source', 'unknown')

        self.logger.info(
            f"Processing SQS message {message_id}",
            priority=priority,
            source=source
        )

        return {
            "message_id": message_id,
            "priority": priority,
            "processed": True
        }

📤 SNS Producer

Basic Example

from pythia.brokers.cloud.aws import SNSProducer

class NotificationSender(SNSProducer):
    def __init__(self):
        super().__init__(
            topic_arn="arn:aws:sns:us-east-1:123456789:user-notifications"
        )

    async def send_welcome_notification(self, user_data):
        """Send welcome notification"""
        message_data = {
            "event": "user_welcome",
            "user_id": user_data["id"],
            "email": user_data["email"],
            "timestamp": datetime.now().isoformat()
        }

        message_id = await self.publish_message(
            message=message_data,
            subject="Welcome to our platform",
            message_attributes={
                "event_type": "welcome",
                "priority": "high",
                "user_segment": user_data.get("segment", "standard")
            }
        )

        if message_id:
            self.logger.info(f"Welcome notification sent: {message_id}")
        return message_id

    async def send_from_pythia_message(self, pythia_message):
        """Send from Pythia message"""
        return await self.publish_from_pythia_message(
            message=pythia_message,
            subject="System Alert"
        )

AWS Environment Variables

# AWS Credentials
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=...
AWS_ENDPOINT_URL=http://localhost:4566  # For LocalStack

# SQS
SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123/queue-name

# SNS
SNS_TOPIC_ARN=arn:aws:sns:us-east-1:123:topic-name

🏗️ Google Cloud Platform (GCP)

Complete support for Google Cloud Pub/Sub with subscriber and publisher.

📥 Pub/Sub Subscriber

Basic Example

from pythia.brokers.cloud.gcp import PubSubSubscriber
from pythia.config.cloud import GCPConfig

class EventProcessor(PubSubSubscriber):
    def __init__(self):
        gcp_config = GCPConfig(
            project_id="my-gcp-project",
            subscription_name="event-processor-sub"
        )
        super().__init__(
            subscription_path="projects/my-gcp-project/subscriptions/event-processor-sub",
            gcp_config=gcp_config
        )

    async def process_message(self, message):
        """Process Pub/Sub message"""
        event_data = message.body
        event_type = event_data.get('type', 'unknown')

        # Message metadata
        message_id = message.headers.get('message_id')
        publish_time = message.headers.get('publish_time')
        ordering_key = message.headers.get('ordering_key')

        self.logger.info(
            f"Processing Pub/Sub event: {event_type}",
            message_id=message_id,
            ordering_key=ordering_key
        )

        # Routing by event type
        if event_type == 'user_signup':
            await self.handle_user_signup(event_data)
        elif event_type == 'order_placed':
            await self.handle_order_placed(event_data)
        else:
            self.logger.warning(f"Unknown event type: {event_type}")

        return {
            "event_type": event_type,
            "processed": True,
            "message_id": message_id
        }

    async def handle_user_signup(self, event_data):
        # Specific logic for signup
        user_id = event_data.get('user_id')
        print(f"New user signup: {user_id}")

    async def handle_order_placed(self, event_data):
        # Specific logic for orders
        order_id = event_data.get('order_id')
        print(f"New order placed: {order_id}")

Automatic Project ID Extraction

# The project_id is automatically extracted from the path
subscriber = PubSubSubscriber(
    subscription_path="projects/my-project/subscriptions/my-sub"
    # project_id is automatically detected as "my-project"
)

📤 Pub/Sub Publisher

Basic Example

from pythia.brokers.cloud.gcp import PubSubPublisher

class EventPublisher(PubSubPublisher):
    def __init__(self):
        super().__init__(
            topic_path="projects/my-gcp-project/topics/user-events"
        )

    async def publish_user_event(self, user_id, event_type, event_data):
        """Publish user event with ordering"""
        message_data = {
            "user_id": user_id,
            "event_type": event_type,
            "data": event_data,
            "timestamp": datetime.now().isoformat()
        }

        message_id = await self.publish_message(
            message=message_data,
            attributes={
                "event_type": event_type,
                "user_segment": event_data.get("segment", "standard"),
                "source": "user-service",
                "version": "1.0"
            },
            ordering_key=f"user-{user_id}"  # Guarantees order per user
        )

        if message_id:
            self.logger.info(f"User event published: {message_id}")
        return message_id

    async def broadcast_system_alert(self, alert_data):
        """Broadcast without ordering key"""
        return await self.publish_message(
            message={
                "alert_type": alert_data["type"],
                "severity": alert_data["severity"],
                "message": alert_data["message"],
                "timestamp": datetime.now().isoformat()
            },
            attributes={
                "alert_type": alert_data["type"],
                "severity": alert_data["severity"]
            }
            # No ordering_key for broadcast
        )

GCP Environment Variables

# Authentication
GCP_PROJECT_ID=my-gcp-project
GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json

# Pub/Sub
PUBSUB_SUBSCRIPTION=projects/my-project/subscriptions/my-subscription
PUBSUB_TOPIC=projects/my-project/topics/my-topic

🔷 Microsoft Azure

Complete support for Azure Service Bus and Azure Storage Queues.

📥 Service Bus Consumer

Basic Example

from pythia.brokers.cloud.azure import ServiceBusConsumer
from pythia.config.cloud import AzureConfig

class TaskProcessor(ServiceBusConsumer):
    def __init__(self):
        azure_config = AzureConfig(
            service_bus_connection_string="Endpoint=sb://my-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...",
            service_bus_queue_name="task-queue"
        )
        super().__init__(
            queue_name="task-queue",
            azure_config=azure_config
        )

    async def process_message(self, message):
        """Process Service Bus message"""
        task_data = message.body

        # Message metadata
        message_id = message.headers.get('message_id')
        correlation_id = message.headers.get('correlation_id')
        content_type = message.headers.get('content_type')
        delivery_count = message.headers.get('delivery_count')

        # Custom properties
        priority = message.headers.get('prop_priority', 'normal')
        task_type = message.headers.get('prop_task_type', 'generic')

        self.logger.info(
            f"Processing Service Bus task: {task_type}",
            message_id=message_id,
            correlation_id=correlation_id,
            priority=priority,
            delivery_count=delivery_count
        )

        # Type-based processing
        if task_type == 'image_processing':
            await self.process_image_task(task_data)
        elif task_type == 'email_send':
            await self.process_email_task(task_data)
        else:
            await self.process_generic_task(task_data)

        return {
            "task_type": task_type,
            "processed": True,
            "correlation_id": correlation_id
        }

    async def process_image_task(self, task_data):
        # Specific logic for image processing
        image_id = task_data.get('image_id')
        print(f"Processing image: {image_id}")

    async def process_email_task(self, task_data):
        # Specific logic for email sending
        email_id = task_data.get('email_id')
        print(f"Sending email: {email_id}")

    async def process_generic_task(self, task_data):
        # Generic logic
        print(f"Processing generic task: {task_data}")

📤 Service Bus Producer

from pythia.brokers.cloud.azure import ServiceBusProducer

class TaskScheduler(ServiceBusProducer):
    def __init__(self):
        super().__init__(
            queue_name="task-queue",
            connection_string="Endpoint=sb://my-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=..."
        )

    async def schedule_image_processing(self, image_data):
        """Schedule image processing"""
        task_data = {
            "task_type": "image_processing",
            "image_id": image_data["id"],
            "image_url": image_data["url"],
            "processing_options": image_data.get("options", {})
        }

        success = await self.send_message(
            message=task_data,
            properties={
                "task_type": "image_processing",
                "priority": "high",
                "source": "upload-service"
            },
            correlation_id=f"img-{image_data['id']}",
            content_type="application/json"
        )

        if success:
            self.logger.info(f"Image processing task scheduled: {image_data['id']}")
        return success

📥 Storage Queue Consumer

Basic Example

from pythia.brokers.cloud.azure import StorageQueueConsumer

class SimpleTaskProcessor(StorageQueueConsumer):
    def __init__(self):
        super().__init__(
            queue_name="simple-tasks",
            connection_string="DefaultEndpointsProtocol=https;AccountName=mystorage;AccountKey=...;EndpointSuffix=core.windows.net"
        )

    async def process_message(self, message):
        """Process Storage Queue message"""
        task_data = message.body

        # Message metadata
        message_id = message.headers.get('message_id')
        dequeue_count = message.headers.get('dequeue_count')
        insertion_time = message.headers.get('insertion_time')

        self.logger.info(
            f"Processing Storage Queue task",
            message_id=message_id,
            dequeue_count=dequeue_count,
            insertion_time=insertion_time
        )

        # Process simple task
        task_type = task_data.get('type', 'unknown')
        if task_type == 'cleanup':
            await self.perform_cleanup(task_data)
        elif task_type == 'backup':
            await self.perform_backup(task_data)

        return {
            "task_type": task_type,
            "processed": True,
            "message_id": message_id
        }

    async def perform_cleanup(self, task_data):
        # Cleanup logic
        print(f"Performing cleanup: {task_data}")

    async def perform_backup(self, task_data):
        # Backup logic
        print(f"Performing backup: {task_data}")

📤 Storage Queue Producer

from pythia.brokers.cloud.azure import StorageQueueProducer

class MaintenanceScheduler(StorageQueueProducer):
    def __init__(self):
        super().__init__(
            queue_name="maintenance-tasks",
            connection_string="DefaultEndpointsProtocol=https;AccountName=mystorage;AccountKey=...;EndpointSuffix=core.windows.net"
        )

    async def schedule_cleanup_task(self, cleanup_data):
        """Schedule cleanup task"""
        task_data = {
            "type": "cleanup",
            "target": cleanup_data["target"],
            "options": cleanup_data.get("options", {}),
            "scheduled_time": datetime.now().isoformat()
        }

        message_id = await self.send_message(
            message=task_data,
            visibility_timeout=300,  # 5 minutes invisible initially
            time_to_live=86400      # TTL of 24 hours
        )

        if message_id:
            self.logger.info(f"Cleanup task scheduled: {message_id}")
        return message_id

    async def schedule_backup_task(self, backup_data):
        """Schedule backup task"""
        return await self.send_message(
            message={
                "type": "backup",
                "database": backup_data["database"],
                "backup_type": backup_data.get("type", "full")
            },
            time_to_live=172800  # TTL of 48 hours for backups
        )

Azure Environment Variables

# Service Bus
AZURE_SERVICE_BUS_CONNECTION_STRING=Endpoint=sb://my-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=...
SERVICE_BUS_QUEUE_NAME=task-queue

# Storage Queue
AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=mystorage;AccountKey=...;EndpointSuffix=core.windows.net
STORAGE_QUEUE_NAME=simple-tasks

🔧 Advanced Configuration

Programmatic Configuration

from pythia.config.cloud import AWSConfig, GCPConfig, AzureConfig, CloudProviderConfig

# Unified configuration
cloud_config = CloudProviderConfig.from_env()

# Provider-specific configuration
aws_config = AWSConfig(
    region="us-east-1",
    queue_url="https://sqs.us-east-1.amazonaws.com/123/queue",
    visibility_timeout=60,
    max_messages=10
)

gcp_config = GCPConfig(
    project_id="my-project",
    subscription_name="my-subscription",
    max_messages=100,
    ack_deadline_seconds=600
)

azure_config = AzureConfig(
    service_bus_connection_string="Endpoint=sb://...",
    service_bus_queue_name="my-queue",
    max_messages=32,
    visibility_timeout=30
)

Multi-Cloud Worker Pattern

from pythia.brokers.cloud.aws import SQSConsumer
from pythia.brokers.cloud.gcp import PubSubSubscriber
from pythia.brokers.cloud.azure import ServiceBusConsumer

class MultiCloudProcessor:
    def __init__(self):
        # Configure multiple consumers
        self.aws_consumer = SQSConsumer(
            queue_url="https://sqs.us-east-1.amazonaws.com/123/aws-tasks"
        )
        self.gcp_subscriber = PubSubSubscriber(
            subscription_path="projects/my-project/subscriptions/gcp-tasks"
        )
        self.azure_consumer = ServiceBusConsumer(
            queue_name="azure-tasks",
            connection_string="Endpoint=sb://..."
        )

    async def run_all(self):
        """Run all consumers concurrently"""
        await asyncio.gather(
            self.aws_consumer.run(),
            self.gcp_subscriber.run(),
            self.azure_consumer.run()
        )

🛡️ Error Handling and Retries

AWS SQS - Visibility Timeout

class ResilientSQSProcessor(SQSConsumer):
    async def process_message(self, message):
        try:
            # Your logic here
            await self.complex_processing(message.body)
            return {"processed": True}
        except TemporaryError as e:
            # Don't return result -> message will be visible again
            self.logger.warning(f"Temporary error, will retry: {e}")
            return None  # This causes the message to be retried
        except PermanentError as e:
            # Return False to avoid infinite retries
            self.logger.error(f"Permanent error: {e}")
            await self.send_to_dlq(message)
            return {"processed": False, "error": str(e)}

GCP Pub/Sub - Acknowledgment Control

class ResilientPubSubProcessor(PubSubSubscriber):
    async def process_message(self, message):
        try:
            await self.process_event(message.body)
            return {"processed": True}  # Message will be acknowledged
        except Exception as e:
            self.logger.error(f"Processing failed: {e}")
            # Don't return anything -> message will NOT be acknowledged
            # Will be retried automatically

Azure Service Bus - Complete vs Abandon

class ResilientServiceBusProcessor(ServiceBusConsumer):
    async def process_message(self, message):
        try:
            result = await self.process_task(message.body)
            return result  # Complete message
        except RetriableError as e:
            # The worker automatically does abandon_message
            # when there's an exception
            raise  # This causes abandon and retry
        except NonRetriableError as e:
            # Return result to complete and avoid retries
            await self.handle_poison_message(message)
            return {"processed": False, "error": "poison_message"}

📊 Monitoring and Observability

Structured Logging

class MonitoredCloudProcessor(SQSConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.metrics = {
            "messages_processed": 0,
            "errors_count": 0,
            "processing_times": []
        }

    async def process_message(self, message):
        start_time = time.time()

        try:
            # Your processing
            result = await self.handle_message(message.body)

            # Success metrics
            processing_time = time.time() - start_time
            self.metrics["messages_processed"] += 1
            self.metrics["processing_times"].append(processing_time)

            self.logger.info(
                "Message processed successfully",
                message_id=message.headers.get('MessageId'),
                processing_time_ms=processing_time * 1000,
                queue_name=self.queue_name,
                metrics=self.metrics
            )

            return result

        except Exception as e:
            self.metrics["errors_count"] += 1
            self.logger.error(
                "Message processing failed",
                message_id=message.headers.get('MessageId'),
                error=str(e),
                error_type=type(e).__name__,
                queue_name=self.queue_name,
                metrics=self.metrics,
                exc_info=True
            )
            raise

🎯 Common Use Cases

1. Image Processing Pipeline

# AWS SQS to receive tasks
class ImageProcessor(SQSConsumer):
    async def process_message(self, message):
        image_data = message.body

        # Process image
        processed_image = await self.process_image(image_data)

        # Notify completion via SNS
        notification_sender = SNSProducer(topic_arn="arn:aws:sns:...")
        await notification_sender.publish_message({
            "event": "image_processed",
            "image_id": image_data["id"],
            "result_url": processed_image["url"]
        })

        return {"processed": True}

2. Multi-Channel Notification System

# GCP Pub/Sub for user events
class NotificationDispatcher(PubSubSubscriber):
    async def process_message(self, message):
        event = message.body
        user_id = event["user_id"]

        # Send to Azure Service Bus for processing
        service_bus_producer = ServiceBusProducer(queue_name="notifications")
        await service_bus_producer.send_message(
            message={
                "user_id": user_id,
                "notification_type": event["type"],
                "channels": ["email", "push", "sms"]
            },
            correlation_id=f"user-{user_id}"
        )

        return {"dispatched": True}

3. Backup and Maintenance System

# Azure Storage Queue for maintenance tasks
class MaintenanceWorker(StorageQueueConsumer):
    async def process_message(self, message):
        task = message.body

        if task["type"] == "backup":
            await self.perform_backup(task["database"])
        elif task["type"] == "cleanup":
            await self.cleanup_old_files(task["path"])
        elif task["type"] == "health_check":
            health_status = await self.check_system_health()

            # Report to AWS SNS if there are issues
            if not health_status["healthy"]:
                sns_producer = SNSProducer(topic_arn="arn:aws:sns:...")
                await sns_producer.publish_message(
                    message=health_status,
                    subject="System Health Alert"
                )

        return {"completed": True}

Pythia's Cloud Workers provide a unified and powerful experience for working with the main cloud messaging platforms, maintaining the simplicity and elegance that characterizes the framework.

Start building your cloud-native workers today! 🚀