Subscribing to Events
Modules receive events by registering handlers with
kernel.events().subscribe(). The SDK handles consumer group
management, tenant-scoped filtering, retry logic, dead-lettering, and
offset tracking. Module code receives strongly-typed event objects and
returns a result.
Subscribe to a Single Event Type
import { kernel } from '@platform/sdk-core';
kernel.events().subscribe('auth.user.created', async (event) => {
await createCrmContact({
userId: event.data.userId,
email: event.data.email,
tenantId: event.tenantId,
});
// Return nothing (success) — SDK commits offset
});
The handler is called once per event. If the handler throws, the SDK retries up to 3 times before moving the event to the dead-letter topic.
Full SDK signature
kernel.events().subscribe(
type: string,
handler: (event: Event) => Promise<void>,
options?: {
consumerGroup?: string; // default: '{module}.{type}-handler'
fromBeginning?: boolean; // replay all retained events on start
maxRetries?: number; // default: 3
}
): Subscription
Subscribe to Multiple Event Types
// Handler receives any of the listed event types
kernel.events().subscribe(
['crm.contact.created', 'crm.contact.updated'],
async (event) => {
await syncToSearchIndex(event.data);
}
);
Subscribe by Pattern (TypeScript SDK only)
Pattern subscriptions match event types using glob syntax. Useful for cross-cutting concerns like audit logging:
kernel.events().subscribePattern('crm.*', async (event) => {
await logToAudit({
type: event.type,
tenantId: event.tenantId,
traceId: event.traceId,
timestamp: event.timestamp,
});
});
The pattern crm.* matches crm.contact.created, crm.deal.won, etc.
Kernel events (auth.*, money.*) are never matched by module patterns
unless the module has declared them in manifest.events.subscribes[].
Subscribe RBAC
A module can only subscribe to events it has declared in
module.manifest.json under events.subscribes[]:
{
"events": {
"subscribes": [
"auth.user.created",
"billing.subscription.changed"
]
}
}
| Subscribe attempt | Result |
|---|---|
auth.user.created (declared) | ✅ Handler registered |
money.wallet.debited (not declared) | ❌ 403 Forbidden |
crm.* pattern (all declared) | ✅ Pattern matched against subscribes[] |
Module with no events.subscribes | ❌ Zero subscriptions |
Tenant Isolation
kernel.events().subscribe() automatically filters events by the
tenantId of the authenticated module context. A subscriber running
for tenant A never receives events published by tenant B, even though
both share the same Kafka topic.
The filter is applied at the SDK layer, not at the Kafka level:
Kafka topic: platform.auth.events
Message 1: { tenantId: 'tenant-A', type: 'auth.user.created', ... }
Message 2: { tenantId: 'tenant-B', type: 'auth.user.created', ... }
Message 3: { tenantId: 'tenant-A', type: 'auth.user.created', ... }
SDK (running as tenant-A):
→ delivers Message 1 ✅
→ skips Message 2 (tenant mismatch, offset committed silently)
→ delivers Message 3 ✅
Consumer Groups
Each subscription creates a Kafka consumer group so that multiple replicas of a module service share the event load without double-processing:
Consumer group naming: {module}.{eventType}-handler
Example:
Module 'crm' subscribes to 'auth.user.created'
→ Consumer group: crm.auth.user.created-handler
All CRM pods join the same consumer group.
Each event is processed by exactly one pod.
Custom consumer group names can be set via options.consumerGroup.
Retry and Dead Letter Queue
The SDK wraps every handler invocation in a retry loop:
Event delivered to handler
│
▼
Handler throws → Retry 1 (immediate)
│
▼
Handler throws → Retry 2 (5 s backoff)
│
▼
Handler throws → Retry 3 (30 s backoff)
│
▼
Still failing → Move to DLQ: platform.{domain}.dlq
Offset committed → next event delivered
Dead-lettered events are stored with status: "pending" and visible
in Admin → Events → DLQ. They can be retried manually:
POST https://api.septemcore.com/v1/events/dlq/{eventId}/retry
Authorization: Bearer <access_token>
Or in bulk with throttle protection (50 events/sec default):
POST https://api.septemcore.com/v1/events/dlq/replay-all
Authorization: Bearer <access_token>
Content-Type: application/json
{
"filter": { "moduleId": "crm" }
}
Response:
{
"replayed": 47,
"skipped_already_resolved": 5
}
Idempotency for DLQ replay: Financial event consumers use a
permanent processed_event_ids table in PostgreSQL (not Valkey TTL,
because DLQ events may be older than 24 hours) to prevent
double-processing on replay.
Replay from Timestamp
Kafka retains events for 7 days (168 hours) by default
(KAFKA_RETENTION_HOURS=168). The platform.audit.events topic
retains for 30 days (KAFKA_AUDIT_RETENTION_HOURS=720).
To reprocess events from a specific point in time:
// Replay all auth.user.created events from the last 6 hours
await kernel.events().replayFrom('auth.user.created', {
from: new Date(Date.now() - 6 * 60 * 60 * 1000).toISOString(),
});
Replay beyond the retention window returns an error:
{
"type": "https://api.septemcore.com/problems/validation-error",
"status": 400,
"detail": "Requested replay start is beyond retention window (168h).",
"code": "EVENTS_RETENTION_EXCEEDED"
}
Consumer Lag Alerting
The SDK tracks consumer lag (how far behind the consumer is from the latest offset) and triggers alerts:
| Lag threshold | Alert level | Action |
|---|---|---|
| > 1 minute | info | Log only |
| > 5 minutes | warning | Notify — channel system.health |
| > 15 minutes | critical | Notify + PagerDuty |
| Spike + decreasing trend | Suppressed | Bulk-import detection — warnings suppressed for 30 min |
The event lag threshold in number of events is configurable:
KAFKA_CONSUMER_LAG_ALERT_THRESHOLD=10000.
Offset Tracking
Current consumer position (offset and lag) is available via the SDK and REST:
const position = await kernel.events().consumerPosition('auth.user.created');
// { topic: 'platform.auth.events', partition: 2, offset: 128490, lag: 12 }
GET https://api.septemcore.com/v1/events/consumers/{groupId}/position
Authorization: Bearer <access_token>
Unsubscribe
const sub = kernel.events().subscribe('crm.contact.created', handler);
// Later — stop receiving events (e.g. graceful shutdown)
await sub.unsubscribe();
Unsubscribing commits the current offset so processing resumes from the correct position on the next start.
Error Reference
| Scenario | HTTP / Error code | Notes |
|---|---|---|
Event type not in subscribes[] | 403 EVENT_SUBSCRIBE_FORBIDDEN | Manifest check at startup |
| Replay beyond retention | 400 EVENTS_RETENTION_EXCEEDED | 7-day window default |
| Handler fails 3 times | — dead-lettered | No HTTP error to publisher |
| DLQ retry not found | 404 not-found | Event already resolved or expired |
| Kafka unavailable | Auto-reconnect | Consumer resumes from last offset |