Database Sync Worker Example
This example demonstrates how to create a database synchronization worker that replicates data between PostgreSQL and MySQL databases.
Scenario
We'll build a data replication system that: - Syncs customer data from PostgreSQL (production) to MySQL (analytics) - Performs incremental syncs based on timestamps - Validates data integrity after sync - Handles cross-database type conversions
Code
import asyncio
from typing import List, Dict, Any
from datetime import datetime, timedelta
from pythia.brokers.database import DatabaseSyncWorker
class DataReplicationWorker(DatabaseSyncWorker):
"""
Production to Analytics database sync worker
"""
def __init__(self):
sync_config = {
'batch_size': 1000,
'mode': 'incremental',
'conflict_resolution': 'source_wins',
'timestamp_column': 'updated_at',
'truncate_target': False
}
super().__init__(
source_connection="postgresql://user:password@prod-db:5432/ecommerce",
target_connection="mysql://user:password@analytics-db:3306/analytics",
sync_config=sync_config
)
# Define tables to sync with their configurations
self.sync_tables = {
'customers': {
'primary_key': 'id',
'timestamp_column': 'updated_at',
'batch_size': 1000,
'validations': ['email_format', 'required_fields']
},
'orders': {
'primary_key': 'id',
'timestamp_column': 'updated_at',
'batch_size': 500,
'validations': ['amount_positive', 'customer_exists']
},
'products': {
'primary_key': 'id',
'timestamp_column': 'updated_at',
'batch_size': 2000,
'validations': ['price_positive']
},
'order_items': {
'primary_key': 'id',
'timestamp_column': 'created_at', # Different timestamp column
'batch_size': 2000,
'validations': ['quantity_positive']
}
}
async def sync_all_configured_tables(self) -> Dict[str, Any]:
"""Sync all configured tables with their specific settings"""
results = []
start_time = datetime.now()
for table_name, config in self.sync_tables.items():
try:
self.logger.info(f"Starting sync for {table_name}")
# Override timestamp column for this table
original_timestamp_column = self.timestamp_column
self.timestamp_column = config['timestamp_column']
# Override batch size for this table
original_batch_size = self.batch_size
self.batch_size = config['batch_size']
# Perform sync
sync_result = await self.sync_table(table_name)
# Validate sync
validation_result = await self.validate_sync(table_name)
# Run custom validations
custom_validations = await self._run_custom_validations(
table_name, config['validations']
)
# Restore original settings
self.timestamp_column = original_timestamp_column
self.batch_size = original_batch_size
result = {
**sync_result,
'validation': validation_result,
'custom_validations': custom_validations,
'success': True
}
results.append(result)
self.logger.info(f"✅ {table_name} sync completed: {sync_result['rows_synced']} rows")
except Exception as e:
self.logger.error(f"❌ Error syncing {table_name}: {e}")
results.append({
'table': table_name,
'error': str(e),
'success': False
})
end_time = datetime.now()
total_duration = (end_time - start_time).total_seconds()
total_rows_synced = sum(r.get('rows_synced', 0) for r in results if 'rows_synced' in r)
summary = {
'total_tables': len(self.sync_tables),
'successful_syncs': len([r for r in results if r.get('success', False)]),
'failed_syncs': len([r for r in results if not r.get('success', False)]),
'total_rows_synced': total_rows_synced,
'total_duration_seconds': total_duration,
'start_time': start_time.isoformat(),
'end_time': end_time.isoformat(),
'results': results
}
return summary
async def _run_custom_validations(self, table_name: str, validations: List[str]) -> Dict[str, Any]:
"""Run custom data validations"""
validation_results = {}
for validation in validations:
try:
if validation == 'email_format':
result = await self._validate_email_format(table_name)
elif validation == 'required_fields':
result = await self._validate_required_fields(table_name)
elif validation == 'amount_positive':
result = await self._validate_positive_amounts(table_name)
elif validation == 'customer_exists':
result = await self._validate_customer_references(table_name)
elif validation == 'price_positive':
result = await self._validate_positive_prices(table_name)
elif validation == 'quantity_positive':
result = await self._validate_positive_quantities(table_name)
else:
result = {'status': 'skipped', 'reason': 'Unknown validation'}
validation_results[validation] = result
except Exception as e:
validation_results[validation] = {
'status': 'error',
'error': str(e)
}
return validation_results
async def _validate_email_format(self, table_name: str) -> Dict[str, Any]:
"""Validate email format in target database"""
if self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"""
SELECT COUNT(*) as total,
SUM(CASE WHEN email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{{2,}}$' THEN 1 ELSE 0 END) as valid
FROM {table_name}
"""
await cursor.execute(query)
result = await cursor.fetchone()
return {
'status': 'completed',
'total_rows': result[0],
'valid_emails': result[1],
'invalid_emails': result[0] - result[1],
'valid_percentage': (result[1] / result[0] * 100) if result[0] > 0 else 0
}
return {'status': 'skipped', 'reason': 'Only supported for MySQL target'}
async def _validate_required_fields(self, table_name: str) -> Dict[str, Any]:
"""Validate required fields are not null"""
required_fields = {
'customers': ['email', 'first_name', 'last_name'],
'orders': ['customer_id', 'total_amount', 'status'],
'products': ['name', 'price'],
'order_items': ['order_id', 'product_id', 'quantity']
}
fields = required_fields.get(table_name, [])
if not fields:
return {'status': 'skipped', 'reason': 'No required fields defined'}
validation_results = {}
for field in fields:
if self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"SELECT COUNT(*) FROM {table_name} WHERE {field} IS NULL"
await cursor.execute(query)
null_count = (await cursor.fetchone())[0]
validation_results[field] = {
'null_count': null_count,
'is_valid': null_count == 0
}
return {
'status': 'completed',
'field_validations': validation_results,
'all_valid': all(v['is_valid'] for v in validation_results.values())
}
async def _validate_positive_amounts(self, table_name: str) -> Dict[str, Any]:
"""Validate amounts are positive"""
if self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"""
SELECT COUNT(*) as total,
SUM(CASE WHEN total_amount > 0 THEN 1 ELSE 0 END) as positive
FROM {table_name}
"""
await cursor.execute(query)
result = await cursor.fetchone()
return {
'status': 'completed',
'total_orders': result[0],
'positive_amounts': result[1],
'negative_or_zero': result[0] - result[1],
'valid_percentage': (result[1] / result[0] * 100) if result[0] > 0 else 0
}
return {'status': 'skipped', 'reason': 'Only supported for MySQL target'}
async def _validate_customer_references(self, table_name: str) -> Dict[str, Any]:
"""Validate customer references exist"""
if table_name == 'orders' and self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"""
SELECT COUNT(*) as total_orders,
COUNT(c.id) as valid_customers
FROM {table_name} o
LEFT JOIN customers c ON o.customer_id = c.id
"""
await cursor.execute(query)
result = await cursor.fetchone()
return {
'status': 'completed',
'total_orders': result[0],
'valid_references': result[1],
'orphaned_orders': result[0] - result[1],
'valid_percentage': (result[1] / result[0] * 100) if result[0] > 0 else 0
}
return {'status': 'skipped', 'reason': 'Not applicable for this table'}
async def _validate_positive_prices(self, table_name: str) -> Dict[str, Any]:
"""Validate product prices are positive"""
if self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"""
SELECT COUNT(*) as total,
SUM(CASE WHEN price > 0 THEN 1 ELSE 0 END) as positive
FROM {table_name}
"""
await cursor.execute(query)
result = await cursor.fetchone()
return {
'status': 'completed',
'total_products': result[0],
'positive_prices': result[1],
'invalid_prices': result[0] - result[1],
'valid_percentage': (result[1] / result[0] * 100) if result[0] > 0 else 0
}
return {'status': 'skipped'}
async def _validate_positive_quantities(self, table_name: str) -> Dict[str, Any]:
"""Validate quantities are positive"""
if self.target_type == 'mysql':
async with self.target_conn.cursor() as cursor:
query = f"""
SELECT COUNT(*) as total,
SUM(CASE WHEN quantity > 0 THEN 1 ELSE 0 END) as positive
FROM {table_name}
"""
await cursor.execute(query)
result = await cursor.fetchone()
return {
'status': 'completed',
'total_items': result[0],
'positive_quantities': result[1],
'invalid_quantities': result[0] - result[1],
'valid_percentage': (result[1] / result[0] * 100) if result[0] > 0 else 0
}
return {'status': 'skipped'}
async def generate_sync_report(self, sync_result: Dict[str, Any]) -> str:
"""Generate a detailed sync report"""
report_lines = [
"=" * 60,
"DATABASE SYNCHRONIZATION REPORT",
"=" * 60,
f"Sync completed at: {sync_result['end_time']}",
f"Total duration: {sync_result['total_duration_seconds']:.2f} seconds",
f"Total tables processed: {sync_result['total_tables']}",
f"Successful syncs: {sync_result['successful_syncs']}",
f"Failed syncs: {sync_result['failed_syncs']}",
f"Total rows synced: {sync_result['total_rows_synced']:,}",
"",
"TABLE DETAILS:",
"-" * 40
]
for result in sync_result['results']:
if result.get('success'):
table = result['table']
rows_synced = result.get('rows_synced', 0)
duration = result.get('duration_seconds', 0)
validation = result.get('validation', {})
report_lines.extend([
f"✅ {table}:",
f" Rows synced: {rows_synced:,}",
f" Duration: {duration:.2f}s",
f" Rate: {rows_synced/duration:.0f} rows/sec" if duration > 0 else " Rate: N/A",
f" Validation: {'✅ PASSED' if validation.get('is_valid', False) else '❌ FAILED'}",
""
])
else:
table = result['table']
error = result.get('error', 'Unknown error')
report_lines.extend([
f"❌ {table}:",
f" Error: {error}",
""
])
return "\n".join(report_lines)
async def scheduled_sync():
"""Scheduled sync job (run every hour)"""
worker = DataReplicationWorker()
try:
async with worker:
print("🚀 Starting scheduled database sync...")
# Run the sync
result = await worker.sync_all_configured_tables()
# Generate and print report
report = await worker.generate_sync_report(result)
print(report)
# Check if any syncs failed
if result['failed_syncs'] > 0:
print(f"⚠️ {result['failed_syncs']} table sync(s) failed!")
# In production, send alert email/notification
return result
except Exception as e:
print(f"💥 Sync job failed: {e}")
raise
async def full_resync():
"""Full database resync (run weekly or on-demand)"""
# Create worker with full sync mode
worker = DataReplicationWorker()
worker.sync_mode = 'full'
worker.sync_config['truncate_target'] = True # Clear target tables
try:
async with worker:
print("🔄 Starting FULL database resync...")
print("⚠️ Target tables will be truncated!")
# Confirm in production
confirmation = input("Continue? (y/N): ")
if confirmation.lower() != 'y':
print("Sync cancelled.")
return
result = await worker.sync_all_configured_tables()
# Generate report
report = await worker.generate_sync_report(result)
print(report)
return result
except Exception as e:
print(f"💥 Full resync failed: {e}")
raise
async def validate_existing_data():
"""Validate data integrity without syncing"""
worker = DataReplicationWorker()
try:
async with worker:
print("🔍 Validating existing data integrity...")
for table_name in worker.sync_tables.keys():
print(f"\nValidating {table_name}...")
# Basic validation
validation = await worker.validate_sync(table_name)
print(f" Row count validation: {'✅ PASS' if validation['is_valid'] else '❌ FAIL'}")
print(f" Source: {validation['source_count']:,} rows")
print(f" Target: {validation['target_count']:,} rows")
# Custom validations
validations = worker.sync_tables[table_name]['validations']
custom_results = await worker._run_custom_validations(table_name, validations)
for validation_name, result in custom_results.items():
if result['status'] == 'completed':
print(f" {validation_name}: ✅ COMPLETED")
else:
print(f" {validation_name}: ⚠️ {result.get('reason', 'SKIPPED')}")
except Exception as e:
print(f"💥 Validation failed: {e}")
raise
async def main():
"""Main CLI interface"""
import sys
if len(sys.argv) < 2:
print("Usage:")
print(" python sync_worker.py sync # Incremental sync")
print(" python sync_worker.py full # Full resync")
print(" python sync_worker.py validate # Validate data only")
return
command = sys.argv[1]
if command == "sync":
await scheduled_sync()
elif command == "full":
await full_resync()
elif command == "validate":
await validate_existing_data()
else:
print(f"Unknown command: {command}")
if __name__ == "__main__":
asyncio.run(main())
Database Setup
Source Database (PostgreSQL)
-- Create production database
CREATE DATABASE ecommerce;
-- Create tables
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(id),
status VARCHAR(50) DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE order_items (
id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES orders(id),
product_id INTEGER REFERENCES products(id),
quantity INTEGER NOT NULL,
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create indexes on timestamp columns
CREATE INDEX idx_customers_updated_at ON customers(updated_at);
CREATE INDEX idx_products_updated_at ON products(updated_at);
CREATE INDEX idx_orders_updated_at ON orders(updated_at);
CREATE INDEX idx_order_items_created_at ON order_items(created_at);
Target Database (MySQL)
-- Create analytics database
CREATE DATABASE analytics;
-- Create corresponding tables
CREATE TABLE customers (
id INT PRIMARY KEY,
email VARCHAR(255) NOT NULL,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE products (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id INT PRIMARY KEY,
customer_id INT,
status VARCHAR(50) DEFAULT 'pending',
total_amount DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_customer_id (customer_id)
);
CREATE TABLE order_items (
id INT PRIMARY KEY,
order_id INT,
product_id INT,
quantity INT NOT NULL,
price DECIMAL(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_order_id (order_id),
INDEX idx_product_id (product_id)
);
Running the Example
-
Install dependencies:
-
Configure connections: Update the connection strings in the worker code.
-
Run sync operations:
Expected Output
Incremental Sync
🚀 Starting scheduled database sync...
============================================================
DATABASE SYNCHRONIZATION REPORT
============================================================
Sync completed at: 2024-01-15T10:30:00
Total duration: 12.45 seconds
Total tables processed: 4
Successful syncs: 4
Failed syncs: 0
Total rows synced: 15,234
TABLE DETAILS:
----------------------------------------
✅ customers:
Rows synced: 1,234
Duration: 2.1s
Rate: 587 rows/sec
Validation: ✅ PASSED
✅ orders:
Rows synced: 5,678
Duration: 4.2s
Rate: 1,351 rows/sec
Validation: ✅ PASSED
Validation Only
🔍 Validating existing data integrity...
Validating customers...
Row count validation: ✅ PASS
Source: 10,234 rows
Target: 10,234 rows
email_format: ✅ COMPLETED
required_fields: ✅ COMPLETED
Validating orders...
Row count validation: ✅ PASS
Source: 45,678 rows
Target: 45,678 rows
amount_positive: ✅ COMPLETED
customer_exists: ✅ COMPLETED
Key Features Demonstrated
- Cross-Database Sync: PostgreSQL to MySQL replication
- Incremental Sync: Only sync changed data using timestamps
- Custom Validations: Business logic validation after sync
- Batch Processing: Configurable batch sizes per table
- Error Handling: Robust error handling with detailed reporting
- Data Integrity: Validation of synced data
- Performance Monitoring: Sync rates and duration tracking
- Flexible Configuration: Per-table configuration options
Production Considerations
- Scheduling: Use cron or task scheduler for regular syncs
- Monitoring: Set up alerting for failed syncs
- Performance: Tune batch sizes based on data volume
- Security: Use read-only source connections
- Backup: Always backup target before full resync
- Network: Consider connection timeouts and retries