Event-Driven Architecture: From Event Emitters to Distributed Microservices
Event-Driven Architecture (EDA) is a powerful software architecture pattern that enables system components to communicate through the production and consumption of events. Instead of direct function calls or API requests, components emit events when something happens, and other components listen and react to these events accordingly.
This architecture promotes loose coupling, scalability, and maintainability, making it an excellent choice for modern applications ranging from simple local servers to complex distributed microservices.
Core Concepts
What is an Event?
An event is a message that describes something that has happened in the system. Events are typically named using past tense to indicate that an action has already occurred. Examples include:
user-registered
: When a user creates an accountorder-created
: When an order is placedpayment-completed
: When a payment transaction finishes
Event Emitter Pattern
The Event Emitter pattern allows objects to emit events and enables listeners to subscribe to notifications when these events occur. This creates a publish-subscribe mechanism within your application.
Advantages and Disadvantages
Advantages:
- Loose Coupling: Components don’t need to know about each other, only about events
- Scalability: Easy to add or remove event handlers without affecting other parts
- Asynchronous Processing: Non-blocking operations improve overall performance
- Maintainability: Easier to maintain and extend functionality
- Testability: Individual components can be tested in isolation
- Resilience: System continues to function even if some event handlers fail
Disadvantages:
- Complexity: Can be harder to debug and trace execution flow
- Event Ordering: Difficult to guarantee the order of event processing
- Error Handling: More complex error handling compared to synchronous calls
- Memory Leaks: Potential memory leaks if listeners are not properly removed
- Eventual Consistency: May lead to temporary inconsistencies in distributed systems
Node.js EventEmitter Example
const EventEmitter = require('events');
class OrderService extends EventEmitter {
createOrder(orderData) {
// Create order
const order = {
id: Date.now(),
...orderData,
status: 'pending',
createdAt: new Date().toISOString()
};
console.log('Order created:', order.id);
// Emit event
this.emit('order-created', order);
return order;
}
updateOrderStatus(orderId, status) {
console.log(`Order ${orderId} status updated to: ${status}`);
this.emit('order-status-updated', { orderId, status });
}
}
class EmailService {
constructor(orderService) {
// Listen to events
orderService.on('order-created', this.sendOrderConfirmation.bind(this));
orderService.on('order-status-updated', this.sendStatusUpdate.bind(this));
}
sendOrderConfirmation(order) {
console.log(`Sending confirmation email for order ${order.id}`);
// Email sending logic here
setTimeout(() => {
console.log(`Confirmation email sent for order ${order.id}`);
}, 100);
}
sendStatusUpdate({ orderId, status }) {
console.log(`Sending status update email for order ${orderId}: ${status}`);
}
}
class InventoryService {
constructor(orderService) {
orderService.on('order-created', this.updateInventory.bind(this));
}
updateInventory(order) {
console.log(`Updating inventory for order ${order.id}`);
// Inventory update logic
order.items.forEach(item => {
console.log(`Reducing stock for item: ${item}`);
});
}
}
class PaymentService {
constructor(orderService) {
orderService.on('order-created', this.processPayment.bind(this));
}
processPayment(order) {
console.log(`Processing payment for order ${order.id}`);
// Simulate payment processing
setTimeout(() => {
console.log(`Payment completed for order ${order.id}`);
// This would trigger another event
}, 200);
}
}
// Usage
const orderService = new OrderService();
const emailService = new EmailService(orderService);
const inventoryService = new InventoryService(orderService);
const paymentService = new PaymentService(orderService);
// Creating an order automatically triggers all subscribed services
orderService.createOrder({
userId: 123,
items: ['laptop', 'mouse'],
total: 1200
});
Python Implementation
Python can use libraries like PyPubSub
or implement custom event systems
import time
from typing import Dict, Any, List, Callable
from pubsub import pub
class OrderService:
def create_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
order = {
'id': int(time.time() * 1000),
**order_data,
'status': 'pending',
'created_at': time.time()
}
print(f"Order created: {order['id']}")
# Emit event
pub.sendMessage('order-created', order=order)
return order
def update_order_status(self, order_id: int, status: str):
print(f"Order {order_id} status updated to: {status}")
pub.sendMessage('order-status-updated', order_id=order_id, status=status)
class EmailService:
def __init__(self):
# Subscribe to events
pub.subscribe(self.send_order_confirmation, 'order-created')
pub.subscribe(self.send_status_update, 'order-status-updated')
def send_order_confirmation(self, order: Dict[str, Any]):
print(f"Sending confirmation email for order {order['id']}")
# Email sending logic
def send_status_update(self, order_id: int, status: str):
print(f"Sending status update email for order {order_id}: {status}")
class InventoryService:
def __init__(self):
pub.subscribe(self.update_inventory, 'order-created')
def update_inventory(self, order: Dict[str, Any]):
print(f"Updating inventory for order {order['id']}")
for item in order['items']:
print(f"Reducing stock for item: {item}")
# Custom Event System Implementation
class EventBus:
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_name: str, callback: Callable):
if event_name not in self.subscribers:
self.subscribers[event_name] = []
self.subscribers[event_name].append(callback)
def publish(self, event_name: str, data: Any):
if event_name in self.subscribers:
for callback in self.subscribers[event_name]:
try:
callback(data)
except Exception as e:
print(f"Error in event handler: {e}")
# Usage
order_service = OrderService()
email_service = EmailService()
inventory_service = InventoryService()
order_service.create_order({
'user_id': 123,
'items': ['laptop', 'mouse'],
'total': 1200
})
Go Implementation
Go can use channels or libraries like EventBus
package main
import (
"encoding/json"
"fmt"
"sync"
"time"
)
type Order struct {
ID int64 `json:"id"`
UserID int `json:"user_id"`
Items []string `json:"items"`
Total float64 `json:"total"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
type EventBus struct {
subscribers map[string][]chan []byte
mu sync.RWMutex
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]chan []byte),
}
}
func (eb *EventBus) Subscribe(eventName string, ch chan []byte) {
eb.mu.Lock()
defer eb.mu.Unlock()
eb.subscribers[eventName] = append(eb.subscribers[eventName], ch)
}
func (eb *EventBus) Publish(eventName string, data interface{}) {
eb.mu.RLock()
defer eb.mu.RUnlock()
jsonData, err := json.Marshal(data)
if err != nil {
fmt.Printf("Error marshaling event data: %v\n", err)
return
}
for _, ch := range eb.subscribers[eventName] {
go func(ch chan []byte) {
select {
case ch <- jsonData:
case <-time.After(time.Second):
fmt.Println("Event delivery timeout")
}
}(ch)
}
}
type OrderService struct {
eventBus *EventBus
}
func NewOrderService(eventBus *EventBus) *OrderService {
return &OrderService{eventBus: eventBus}
}
func (os *OrderService) CreateOrder(orderData map[string]interface{}) *Order {
order := &Order{
ID: time.Now().UnixNano(),
UserID: orderData["user_id"].(int),
Items: orderData["items"].([]string),
Total: orderData["total"].(float64),
Status: "pending",
CreatedAt: time.Now(),
}
fmt.Printf("Order created: %d\n", order.ID)
// Emit event
os.eventBus.Publish("order-created", order)
return order
}
type EmailService struct {
eventBus *EventBus
}
func NewEmailService(eventBus *EventBus) *EmailService {
return &EmailService{eventBus: eventBus}
}
func (es *EmailService) Start() {
ch := make(chan []byte, 100)
es.eventBus.Subscribe("order-created", ch)
go func() {
for data := range ch {
var order Order
if err := json.Unmarshal(data, &order); err != nil {
fmt.Printf("Error unmarshaling order: %v\n", err)
continue
}
es.sendOrderConfirmation(&order)
}
}()
}
func (es *EmailService) sendOrderConfirmation(order *Order) {
fmt.Printf("Sending confirmation email for order %d\n", order.ID)
// Email sending logic
}
type InventoryService struct {
eventBus *EventBus
}
func NewInventoryService(eventBus *EventBus) *InventoryService {
return &InventoryService{eventBus: eventBus}
}
func (is *InventoryService) Start() {
ch := make(chan []byte, 100)
is.eventBus.Subscribe("order-created", ch)
go func() {
for data := range ch {
var order Order
if err := json.Unmarshal(data, &order); err != nil {
fmt.Printf("Error unmarshaling order: %v\n", err)
continue
}
is.updateInventory(&order)
}
}()
}
func (is *InventoryService) updateInventory(order *Order) {
fmt.Printf("Updating inventory for order %d\n", order.ID)
for _, item := range order.Items {
fmt.Printf("Reducing stock for item: %s\n", item)
}
}
func main() {
eventBus := NewEventBus()
orderService := NewOrderService(eventBus)
emailService := NewEmailService(eventBus)
inventoryService := NewInventoryService(eventBus)
// Start services
emailService.Start()
inventoryService.Start()
// Create order
orderService.CreateOrder(map[string]interface{}{
"user_id": 123,
"items": []string{"laptop", "mouse"},
"total": 1200.0,
})
// Wait for events to be processed
time.Sleep(2 * time.Second)
}
Scaling with Cloud Services
AWS SQS/SNS Integration
Amazon Simple Queue Service (SQS) and Simple Notification Service (SNS) provide managed messaging services
const AWS = require('aws-sdk');
// Configure AWS SDK
AWS.config.update({
region: 'us-east-1',
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY
});
class SNSEventPublisher {
constructor() {
this.sns = new AWS.SNS();
}
async publishEvent(topicArn, eventType, data) {
const message = {
eventType,
data,
timestamp: new Date().toISOString(),
correlationId: this.generateCorrelationId()
};
const params = {
TopicArn: topicArn,
Message: JSON.stringify(message),
MessageAttributes: {
eventType: {
DataType: 'String',
StringValue: eventType
},
source: {
DataType: 'String',
StringValue: 'order-service'
}
}
};
try {
const result = await this.sns.publish(params).promise();
console.log(`Event published: ${result.MessageId}`);
return result;
} catch (error) {
console.error('Error publishing event:', error);
throw error;
}
}
generateCorrelationId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
class SQSEventConsumer {
constructor(queueUrl) {
this.sqs = new AWS.SQS();
this.queueUrl = queueUrl;
this.isPolling = false;
}
async startConsuming() {
this.isPolling = true;
console.log('Starting SQS consumer...');
while (this.isPolling) {
try {
const params = {
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long polling
VisibilityTimeoutSeconds: 30
};
const data = await this.sqs.receiveMessage(params).promise();
if (data.Messages) {
await Promise.all(
data.Messages.map(message => this.processMessage(message))
);
}
} catch (error) {
console.error('Error consuming messages:', error);
await this.sleep(5000); // Wait before retrying
}
}
}
async processMessage(message) {
try {
const event = JSON.parse(message.Body);
console.log('Processing event:', event);
// Process the event
await this.handleEvent(event);
// Delete message after successful processing
await this.sqs.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
}).promise();
console.log(`Message processed and deleted: ${message.MessageId}`);
} catch (error) {
console.error('Error processing message:', error);
// Message will be retried due to visibility timeout
}
}
async handleEvent(event) {
// Implement your event handling logic here
switch (event.eventType) {
case 'order-created':
await this.handleOrderCreated(event.data);
break;
case 'payment-completed':
await this.handlePaymentCompleted(event.data);
break;
default:
console.log(`Unknown event type: ${event.eventType}`);
}
}
async handleOrderCreated(orderData) {
console.log('Handling order created event:', orderData);
// Your business logic here
}
async handlePaymentCompleted(paymentData) {
console.log('Handling payment completed event:', paymentData);
// Your business logic here
}
stop() {
this.isPolling = false;
}
sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// Usage Example
const topicArn = 'arn:aws:sns:us-east-1:123456789012:order-events';
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/order-processing-queue';
const publisher = new SNSEventPublisher();
const consumer = new SQSEventConsumer(queueUrl);
// Publish an event
await publisher.publishEvent(topicArn, 'order-created', {
orderId: 12345,
userId: 67890,
total: 199.99,
items: ['laptop', 'mouse']
});
// Start consuming events
consumer.startConsuming();
AWS EventBridge
EventBridge provides a serverless event bus with advanced routing capabilities
const AWS = require('aws-sdk');
class EventBridgePublisher {
constructor() {
this.eventBridge = new AWS.EventBridge();
}
async publishEvent(source, detailType, detail, eventBusName = 'default') {
const params = {
Entries: [{
Source: source,
DetailType: detailType,
Detail: JSON.stringify(detail),
EventBusName: eventBusName,
Time: new Date()
}]
};
try {
const result = await this.eventBridge.putEvents(params).promise();
console.log('Event published to EventBridge:', result);
return result;
} catch (error) {
console.error('Error publishing to EventBridge:', error);
throw error;
}
}
async createRule(ruleName, eventPattern, targets) {
const ruleParams = {
Name: ruleName,
EventPattern: JSON.stringify(eventPattern),
State: 'ENABLED'
};
await this.eventBridge.putRule(ruleParams).promise();
const targetParams = {
Rule: ruleName,
Targets: targets
};
await this.eventBridge.putTargets(targetParams).promise();
console.log(`Rule ${ruleName} created successfully`);
}
}
// Usage
const publisher = new EventBridgePublisher();
// Publish events
await publisher.publishEvent(
'ecommerce.orders',
'Order Created',
{
orderId: 12345,
userId: 67890,
total: 199.99,
region: 'us-east-1'
}
);
await publisher.publishEvent(
'ecommerce.payments',
'Payment Completed',
{
paymentId: 'pay_12345',
orderId: 12345,
amount: 199.99,
method: 'credit_card'
}
);
Apache Kafka Integration
Kafka provides high-throughput, distributed event streaming
const kafka = require('kafkajs');
class KafkaEventProducer {
constructor(brokers, clientId = 'event-producer') {
this.kafka = kafka({
clientId,
brokers,
retry: {
initialRetryTime: 100,
retries: 8
}
});
this.producer = this.kafka.producer({
maxInFlightRequests: 1,
idempotent: true,
transactionTimeout: 30000
});
}
async connect() {
await this.producer.connect();
console.log('Kafka producer connected');
}
async disconnect() {
await this.producer.disconnect();
console.log('Kafka producer disconnected');
}
async publishEvent(topic, event) {
try {
const result = await this.producer.send({
topic,
messages: [{
partition: this.getPartition(event.id),
key: event.id.toString(),
value: JSON.stringify(event),
timestamp: Date.now(),
headers: {
eventType: event.type,
version: '1.0'
}
}]
});
console.log(`Event published to topic ${topic}:`, result);
return result;
} catch (error) {
console.error('Error publishing event:', error);
throw error;
}
}
getPartition(id) {
// Simple partitioning strategy
return id % 3;
}
}
class KafkaEventConsumer {
constructor(brokers, groupId, clientId = 'event-consumer') {
this.kafka = kafka({
clientId,
brokers,
retry: {
initialRetryTime: 100,
retries: 8
}
});
this.consumer = this.kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000
});
}
async connect() {
await this.consumer.connect();
console.log('Kafka consumer connected');
}
async disconnect() {
await this.consumer.disconnect();
console.log('Kafka consumer disconnected');
}
async subscribe(topics) {
await this.consumer.subscribe({ topics, fromBeginning: false });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const event = JSON.parse(message.value.toString());
const eventType = message.headers.eventType?.toString();
console.log(`Processing event from topic ${topic}, partition ${partition}:`, {
offset: message.offset,
eventType,
event
});
await this.processEvent(event, eventType);
} catch (error) {
console.error('Error processing message:', error);
// Implement dead letter queue logic here
}
}
});
}
async processEvent(event, eventType) {
// Implement event processing logic
switch (eventType) {
case 'order-created':
await this.handleOrderCreated(event);
break;
case 'payment-completed':
await this.handlePaymentCompleted(event);
break;
default:
console.log(`Unknown event type: ${eventType}`);
}
}
async handleOrderCreated(event) {
console.log('Handling order created event:', event);
// Business logic here
}
async handlePaymentCompleted(event) {
console.log('Handling payment completed event:', event);
// Business logic here
}
}
// Usage
const brokers = ['localhost:9092'];
const producer = new KafkaEventProducer(brokers);
const consumer = new KafkaEventConsumer(brokers, 'order-processing-group');
// Start producer
await producer.connect();
// Publish events
await producer.publishEvent('order-events', {
id: 12345,
type: 'order-created',
userId: 67890,
total: 199.99,
items: ['laptop', 'mouse']
});
// Start consumer
await consumer.connect();
await consumer.subscribe(['order-events', 'payment-events']);
Microservices Architecture with Events
In a microservices environment, events enable loose coupling between services
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Order Service │ │ Email Service │ │Inventory Service│
│ │ │ │ │ │
│ - Create Order │ │ - Send Emails │ │ - Update Stock │
│ - Update Status│ │ - Notifications │ │ - Check Avail. │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
│ │ │
└────────────────────────┼────────────────────────┘
│
┌─────────────────────────┐
│ Event Bus/Broker │
│ │
│ - Kafka/EventBridge/ │
│ RabbitMQ/Redis │
│ - Topic Routing │
│ - Message Persistence │
│ - Delivery Guarantees │
└─────────────────────────┘
│
┌────────────────────────┼────────────────────────┐
│ │ │
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│Payment Service │ │ Analytics Svc │ │Shipping Service │
│ │ │ │ │ │
│ - Process Pay │ │ - Track Events │ │ - Calculate │
│ - Handle Refunds│ │ - Generate │ │ - Schedule │
└─────────────────┘ └─────────────────┘ └─────────────────┘
Event Sourcing Pattern
Event sourcing stores all changes as a sequence of events
class EventStore {
constructor() {
this.events = [];
}
appendEvent(streamId, event) {
const eventWithMetadata = {
...event,
streamId,
eventId: this.generateEventId(),
timestamp: new Date().toISOString(),
version: this.getNextVersion(streamId)
};
this.events.push(eventWithMetadata);
return eventWithMetadata;
}
getEvents(streamId, fromVersion = 0) {
return this.events.filter(e =>
e.streamId === streamId && e.version >= fromVersion
);
}
getNextVersion(streamId) {
const streamEvents = this.events.filter(e => e.streamId === streamId);
return streamEvents.length + 1;
}
generateEventId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
class OrderAggregate {
constructor(id) {
this.id = id;
this.version = 0;
this.status = 'pending';
this.items = [];
this.total = 0;
}
static fromEvents(events) {
const order = new OrderAggregate(events[0].streamId);
events.forEach(event => {
order.apply(event);
});
return order;
}
apply(event) {
switch (event.type) {
case 'OrderCreated':
this.status = 'created';
this.items = event.data.items;
this.total = event.data.total;
break;
case 'OrderPaid':
this.status = 'paid';
break;
case 'OrderShipped':
this.status = 'shipped';
break;
case 'OrderDelivered':
this.status = 'delivered';
break;
}
this.version = event.version;
}
}
// Usage
const eventStore = new EventStore();
const orderId = 'order-123';
// Create events
eventStore.appendEvent(orderId, {
type: 'OrderCreated',
data: {
items: ['laptop', 'mouse'],
total: 1200,
userId: 456
}
});
eventStore.appendEvent(orderId, {
type: 'OrderPaid',
data: {
paymentId: 'pay-789',
amount: 1200
}
});
// Rebuild aggregate from events
const events = eventStore.getEvents(orderId);
const order = OrderAggregate.fromEvents(events);
console.log('Order state:', {
id: order.id,
status: order.status,
total: order.total,
version: order.version
});
Best Practices
1. Event Design
- Use past tense: Events describe what happened (order-created, not create-order)
- Include context: Add necessary data to avoid additional lookups
- Versioning: Plan for event schema evolution
- Immutability: Events should never be modified after creation
2. Error Handling
class ResilientEventProcessor {
constructor(maxRetries = 3) {
this.maxRetries = maxRetries;
this.deadLetterQueue = [];
}
async processEvent(event) {
let attempts = 0;
while (attempts < this.maxRetries) {
try {
await this.handleEvent(event);
return;
} catch (error) {
attempts++;
console.log(`Attempt ${attempts} failed:`, error.message);
if (attempts < this.maxRetries) {
await this.exponentialBackoff(attempts);
} else {
this.deadLetterQueue.push({
event,
error: error.message,
timestamp: new Date().toISOString()
});
throw new Error(`Event processing failed after ${this.maxRetries} attempts`);
}
}
}
}
async exponentialBackoff(attempt) {
const delay = Math.pow(2, attempt) * 1000; // 2^attempt seconds
await new Promise(resolve => setTimeout(resolve, delay));
}
async handleEvent(event) {
// Your event processing logic
console.log('Processing event:', event);
}
}
3. Monitoring and Observability
class EventMetrics {
constructor() {
this.metrics = {
eventsPublished: 0,
eventsProcessed: 0,
eventsFailed: 0,
processingTime: []
};
}
recordEventPublished(eventType) {
this.metrics.eventsPublished++;
console.log(`Event published: ${eventType}`);
}
recordEventProcessed(eventType, processingTime) {
this.metrics.eventsProcessed++;
this.metrics.processingTime.push(processingTime);
console.log(`Event processed: ${eventType} in ${processingTime}ms`);
}
recordEventFailed(eventType, error) {
this.metrics.eventsFailed++;
console.log(`Event failed: ${eventType} - ${error}`);
}
getMetrics() {
const avgProcessingTime = this.metrics.processingTime.length > 0
? this.metrics.processingTime.reduce((a, b) => a + b, 0) / this.metrics.processingTime.length
: 0;
return {
...this.metrics,
averageProcessingTime: avgProcessingTime
};
}
}
4. Testing Event-Driven Systems
class EventTestHarness {
constructor() {
this.capturedEvents = [];
this.mockHandlers = new Map();
}
captureEvent(eventType, data) {
this.capturedEvents.push({ eventType, data, timestamp: Date.now() });
}
mockHandler(eventType, handler) {
this.mockHandlers.set(eventType, handler);
}
async processEvents() {
for (const event of this.capturedEvents) {
const handler = this.mockHandlers.get(event.eventType);
if (handler) {
await handler(event.data);
}
}
}
getEventsByType(eventType) {
return this.capturedEvents.filter(e => e.eventType === eventType);
}
reset() {
this.capturedEvents = [];
this.mockHandlers.clear();
}
}
// Usage in tests
const testHarness = new EventTestHarness();
// Setup mock handlers
testHarness.mockHandler('order-created', async (data) => {
console.log('Mock: Order created handler called with:', data);
});
// Capture events during testing
testHarness.captureEvent('order-created', { orderId: 123, total: 100 });
// Process captured events
await testHarness.processEvents();
// Verify events were captured
const orderEvents = testHarness.getEventsByType('order-created');
console.log('Captured order events:', orderEvents.length);
Cloud Provider Comparisons
AWS
- SNS/SQS: Simple, managed, pay-per-use
- EventBridge: Advanced routing, schema registry
- Kinesis: Real-time streaming, high throughput
Google Cloud
- Pub/Sub: Global messaging, automatic scaling
- Eventarc: Event-driven cloud functions
- Cloud Functions: Serverless event processing
Azure
- Service Bus: Enterprise messaging
- Event Grid: Event routing service
- Event Hubs: Big data streaming
Performance Considerations
Message Ordering
class OrderedEventProcessor {
constructor() {
this.partitionQueues = new Map();
}
async processEvent(event) {
const partitionKey = this.getPartitionKey(event);
if (!this.partitionQueues.has(partitionKey)) {
this.partitionQueues.set(partitionKey, []);
this.processPartition(partitionKey);
}
this.partitionQueues.get(partitionKey).push(event);
}
async processPartition(partitionKey) {
const queue = this.partitionQueues.get(partitionKey);
while (queue.length > 0) {
const event = queue.shift();
await this.handleEvent(event);
}
}
getPartitionKey(event) {
// Ensure events for same entity are processed in order
return event.entityId || event.userId || 'default';
}
async handleEvent(event) {
console.log('Processing event in order:', event);
}
}
Batching
class BatchEventProcessor {
constructor(batchSize = 100, flushInterval = 5000) {
this.batchSize = batchSize;
this.flushInterval = flushInterval;
this.batch = [];
this.startBatchTimer();
}
addEvent(event) {
this.batch.push(event);
if (this.batch.length >= this.batchSize) {
this.processBatch();
}
}
async processBatch() {
if (this.batch.length === 0) return;
const currentBatch = [...this.batch];
this.batch = [];
console.log(`Processing batch of ${currentBatch.length} events`);
try {
await this.handleBatch(currentBatch);
} catch (error) {
console.error('Batch processing failed:', error);
// Implement retry logic
}
}
async handleBatch(events) {
// Process events in batch for better performance
for (const event of events) {
await this.handleEvent(event);
}
}
async handleEvent(event) {
console.log('Processing event:', event.type);
}
startBatchTimer() {
setInterval(() => {
this.processBatch();
}, this.flushInterval);
}
}
Conclusion
Event-Driven Architecture provides a powerful foundation for building scalable, maintainable, and resilient systems. Starting with simple EventEmitter patterns for local applications, you can gradually scale to distributed systems using cloud-managed services like AWS SQS/SNS, EventBridge, or Apache Kafka.
The key to successful event-driven systems lies in understanding the trade-offs, implementing proper error handling, monitoring, and following best practices for event design. Whether you’re building a simple Node.js application or a complex microservices architecture, events can help you create systems that are both flexible and robust.
Remember that event-driven systems require careful consideration of consistency, ordering, and failure scenarios. But when implemented correctly, they provide the foundation for systems that can scale to handle millions of events while maintaining loose coupling and high availability.
As your applications grow, the investment in event-driven architecture will pay dividends in terms of maintainability, scalability, and the ability to evolve your system architecture over time.