Skip to main content

Subscribing to Events

Modules receive events by registering handlers with kernel.events().subscribe(). The SDK handles consumer group management, tenant-scoped filtering, retry logic, dead-lettering, and offset tracking. Module code receives strongly-typed event objects and returns a result.


Subscribe to a Single Event Type

import { kernel } from '@platform/sdk-core';

kernel.events().subscribe('auth.user.created', async (event) => {
await createCrmContact({
userId: event.data.userId,
email: event.data.email,
tenantId: event.tenantId,
});
// Return nothing (success) — SDK commits offset
});

The handler is called once per event. If the handler throws, the SDK retries up to 3 times before moving the event to the dead-letter topic.

Full SDK signature

kernel.events().subscribe(
type: string,
handler: (event: Event) => Promise<void>,
options?: {
consumerGroup?: string; // default: '{module}.{type}-handler'
fromBeginning?: boolean; // replay all retained events on start
maxRetries?: number; // default: 3
}
): Subscription

Subscribe to Multiple Event Types

// Handler receives any of the listed event types
kernel.events().subscribe(
['crm.contact.created', 'crm.contact.updated'],
async (event) => {
await syncToSearchIndex(event.data);
}
);

Subscribe by Pattern (TypeScript SDK only)

Pattern subscriptions match event types using glob syntax. Useful for cross-cutting concerns like audit logging:

kernel.events().subscribePattern('crm.*', async (event) => {
await logToAudit({
type: event.type,
tenantId: event.tenantId,
traceId: event.traceId,
timestamp: event.timestamp,
});
});

The pattern crm.* matches crm.contact.created, crm.deal.won, etc. Kernel events (auth.*, money.*) are never matched by module patterns unless the module has declared them in manifest.events.subscribes[].


Subscribe RBAC

A module can only subscribe to events it has declared in module.manifest.json under events.subscribes[]:

{
"events": {
"subscribes": [
"auth.user.created",
"billing.subscription.changed"
]
}
}
Subscribe attemptResult
auth.user.created (declared)✅ Handler registered
money.wallet.debited (not declared)403 Forbidden
crm.* pattern (all declared)✅ Pattern matched against subscribes[]
Module with no events.subscribes❌ Zero subscriptions

Tenant Isolation

kernel.events().subscribe() automatically filters events by the tenantId of the authenticated module context. A subscriber running for tenant A never receives events published by tenant B, even though both share the same Kafka topic.

The filter is applied at the SDK layer, not at the Kafka level:

Kafka topic: platform.auth.events
Message 1: { tenantId: 'tenant-A', type: 'auth.user.created', ... }
Message 2: { tenantId: 'tenant-B', type: 'auth.user.created', ... }
Message 3: { tenantId: 'tenant-A', type: 'auth.user.created', ... }

SDK (running as tenant-A):
→ delivers Message 1 ✅
→ skips Message 2 (tenant mismatch, offset committed silently)
→ delivers Message 3 ✅

Consumer Groups

Each subscription creates a Kafka consumer group so that multiple replicas of a module service share the event load without double-processing:

Consumer group naming: {module}.{eventType}-handler

Example:
Module 'crm' subscribes to 'auth.user.created'
→ Consumer group: crm.auth.user.created-handler

All CRM pods join the same consumer group.
Each event is processed by exactly one pod.

Custom consumer group names can be set via options.consumerGroup.


Retry and Dead Letter Queue

The SDK wraps every handler invocation in a retry loop:

Event delivered to handler


Handler throws → Retry 1 (immediate)


Handler throws → Retry 2 (5 s backoff)


Handler throws → Retry 3 (30 s backoff)


Still failing → Move to DLQ: platform.{domain}.dlq
Offset committed → next event delivered

Dead-lettered events are stored with status: "pending" and visible in Admin → Events → DLQ. They can be retried manually:

POST https://api.septemcore.com/v1/events/dlq/{eventId}/retry
Authorization: Bearer <access_token>

Or in bulk with throttle protection (50 events/sec default):

POST https://api.septemcore.com/v1/events/dlq/replay-all
Authorization: Bearer <access_token>
Content-Type: application/json

{
"filter": { "moduleId": "crm" }
}

Response:

{
"replayed": 47,
"skipped_already_resolved": 5
}

Idempotency for DLQ replay: Financial event consumers use a permanent processed_event_ids table in PostgreSQL (not Valkey TTL, because DLQ events may be older than 24 hours) to prevent double-processing on replay.


Replay from Timestamp

Kafka retains events for 7 days (168 hours) by default (KAFKA_RETENTION_HOURS=168). The platform.audit.events topic retains for 30 days (KAFKA_AUDIT_RETENTION_HOURS=720).

To reprocess events from a specific point in time:

// Replay all auth.user.created events from the last 6 hours
await kernel.events().replayFrom('auth.user.created', {
from: new Date(Date.now() - 6 * 60 * 60 * 1000).toISOString(),
});

Replay beyond the retention window returns an error:

{
"type": "https://api.septemcore.com/problems/validation-error",
"status": 400,
"detail": "Requested replay start is beyond retention window (168h).",
"code": "EVENTS_RETENTION_EXCEEDED"
}

Consumer Lag Alerting

The SDK tracks consumer lag (how far behind the consumer is from the latest offset) and triggers alerts:

Lag thresholdAlert levelAction
> 1 minuteinfoLog only
> 5 minuteswarningNotify — channel system.health
> 15 minutescriticalNotify + PagerDuty
Spike + decreasing trendSuppressedBulk-import detection — warnings suppressed for 30 min

The event lag threshold in number of events is configurable: KAFKA_CONSUMER_LAG_ALERT_THRESHOLD=10000.


Offset Tracking

Current consumer position (offset and lag) is available via the SDK and REST:

const position = await kernel.events().consumerPosition('auth.user.created');
// { topic: 'platform.auth.events', partition: 2, offset: 128490, lag: 12 }
GET https://api.septemcore.com/v1/events/consumers/{groupId}/position
Authorization: Bearer <access_token>

Unsubscribe

const sub = kernel.events().subscribe('crm.contact.created', handler);

// Later — stop receiving events (e.g. graceful shutdown)
await sub.unsubscribe();

Unsubscribing commits the current offset so processing resumes from the correct position on the next start.


Error Reference

ScenarioHTTP / Error codeNotes
Event type not in subscribes[]403 EVENT_SUBSCRIBE_FORBIDDENManifest check at startup
Replay beyond retention400 EVENTS_RETENTION_EXCEEDED7-day window default
Handler fails 3 times— dead-letteredNo HTTP error to publisher
DLQ retry not found404 not-foundEvent already resolved or expired
Kafka unavailableAuto-reconnectConsumer resumes from last offset