Skip to main content

Events gRPC Service Reference

Package platform.events.v1 contains the EventService — the gRPC interface for the Event Bus primitive. It covers publishing, server-streaming subscription, topic management, consumer position monitoring, and DLQ operations.

Additionally, the package includes schemas/v1/ — typed Protobuf definitions for every domain event schema. These schemas are the single source of truth for event payloads; any breaking schema change requires a new event type (not a version bump on the same type).

See the gRPC Overview for buf configuration, Go package conventions, deadlines, mTLS, and error mapping.

Proto files:

  • proto/platform/events/v1/events.proto — service + Event + RPC messages
  • proto/platform/events/v1/schemas/v1/common.protoEventEnvelope + DLQRecord
  • proto/platform/events/v1/schemas/v1/auth.proto — Auth domain events (8 types)
  • proto/platform/events/v1/schemas/v1/money.proto — Money domain events (7 types)
  • proto/platform/events/v1/schemas/v1/module.proto — Module domain events (6 types)

Go package: kernel.internal/platform-kernel/gen/go/platform/events/v1;eventsv1

Go schema package: kernel.internal/platform-kernel/gen/go/platform/events/v1/schemas/v1;eventschemas


EventService — All RPCs

service EventService {
rpc Publish(PublishRequest) returns (PublishResponse);
rpc BatchPublish(BatchPublishRequest) returns (BatchPublishResponse);
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse);
rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse);
rpc GetTopicSchema(GetTopicSchemaRequest) returns (GetTopicSchemaResponse);
rpc ReplayFrom(ReplayFromRequest) returns (ReplayFromResponse);
rpc GetConsumerPosition(GetConsumerPositionRequest) returns (GetConsumerPositionResponse);
rpc DLQRetry(DLQRetryRequest) returns (DLQRetryResponse);
rpc DLQReplayAll(DLQReplayAllRequest) returns (DLQReplayAllResponse);
}
RPCTypeDescription
PublishUnaryPublish a single event to a domain topic
BatchPublishUnaryPublish multiple events in one call
SubscribeServer-streamingStream events from a topic by consumer group
UnsubscribeUnaryRemove a consumer group subscription
ListTopicsUnaryList all available event topics and partition counts
GetTopicSchemaUnaryGet the Protobuf schema definition for a topic
ReplayFromUnarySeek a consumer group to a past timestamp
GetConsumerPositionUnaryGet current offset, latest offset, and lag
DLQRetryUnaryRetry a single failed DLQ event
DLQReplayAllUnaryReplay all pending DLQ events (throttled: 50 events/sec)

Core Message — Event

The Event message is the wire format for individual events published and received through the gRPC API. Domain schemas are packed into the data field as serialized Protobuf bytes.

message Event {
string id = 1; // UUID v7 — idempotency key
string type = 2; // domain.entity.action (e.g. "auth.user.created")
string source = 3; // Module that published the event
string tenant_id = 4; // Injected by Gateway; never trust client value
bytes data = 5; // Serialized Protobuf payload
string schema_version = 6; // Semantic version (e.g. "1.0.0")
google.protobuf.Timestamp timestamp = 7; // When the event occurred
string trace_id = 8; // OpenTelemetry trace ID
}
FieldDescription
idUUID v7. Used as Kafka message key for idempotency dedup on replay.
typeDot-notation: <domain>.<entity>.<action>. Must match a registered schema.
tenant_idSet by Gateway from JWT tenantId claim. SDK-level consumers filter by this field.
dataRaw Protobuf bytes of the typed domain event (see schemas below).
schema_version"1.0.0". Breaking changes require a new type, not a version bump.
trace_idPropagated from the originating HTTP request for end-to-end trace correlation.

Core Message — EventEnvelope (common.proto)

EventEnvelope is the higher-level wrapper used internally by the Event Bus when publishing via kernel.events() SDK. It carries additional metadata that is not present in the raw Event gRPC message.

message EventEnvelope {
string event_id = 1; // UUID v7
string event_type = 2; // domain.entity.action
string source = 3;
string tenant_id = 4;
string schema_version = 5;
google.protobuf.Timestamp timestamp = 6;
string trace_id = 7;
string correlation_id = 8; // Links related events (saga pattern)
google.protobuf.Any payload = 9; // Typed domain event (Any.pack)
string entity_id = 10; // Partition key (ordering guarantee per entity)
}

entity_id as partition key: Kafka partition assignment is deterministic on entity_id. All events for the same entity (e.g., same wallet_id) land on the same partition, guaranteeing processing order per entity within a consumer group.


Core Message — DLQRecord (common.proto)

message DLQRecord {
EventEnvelope original_event = 1;
int32 attempt_count = 2; // Attempts before DLQ
string last_error = 3;
google.protobuf.Timestamp dlq_timestamp = 4;
string consumer_group = 5;
string status = 6; // "pending"|"retrying"|"resolved"|"expired"
}

Publish

Publish a single event to a Kafka domain topic.

Request — PublishRequest

message PublishRequest {
string topic = 1; // Required. e.g. "platform.auth.events"
string key = 2; // Required. Partition key — use entityId for ordering guarantee
Event event = 3; // Required. The event to publish
}

Response — PublishResponse

message PublishResponse {
string event_id = 1; // Confirmed event ID
}

gRPC errors:

gRPC statusCondition
NOT_FOUNDTopic does not exist
PERMISSION_DENIEDCaller lacks events.publish RBAC permission for this topic
INVALID_ARGUMENTevent.type does not match a registered schema
UNAVAILABLEKafka broker unavailable — caller should retry with backoff

BatchPublish

Publish multiple events in a single Kafka producer batch. More efficient than repeated Publish for bulk event emission (e.g., after a database import).

Request — BatchPublishRequest

message BatchPublishRequest {
string topic = 1; // Required. All items go to the same topic.
repeated BatchPublishItem items = 2; // Required.
}

message BatchPublishItem {
string key = 1; // Partition key for this item
Event event = 2;
}

Response — BatchPublishResponse

message BatchPublishResponse {
int32 published_count = 1; // Number of events successfully published
}
LimitValue
Max items per batch500
ExceededINVALID_ARGUMENT

Subscribe

Server-streaming RPC. Streams events from a Kafka topic for a given consumer group. The stream remains open until the caller cancels it or the connection is closed.

rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);

Request — SubscribeRequest

message SubscribeRequest {
string topic = 1; // Required. Kafka topic name.
string group_id = 2; // Required. Consumer group: <service>.<handler> convention.
}

Response — SubscribeResponse (streamed)

message SubscribeResponse {
Event event = 1; // One event per stream message
}

Consumer group naming convention:

group_id = "<consuming-service>.<handler-name>"

Examples:
"audit.audit-record-handler"
"billing.subscription-event-handler"
"notify.user-notification-handler"

Tenant filtering: The Event Bus filters events by tenant_id extracted from the gRPC call metadata (set by Gateway). Consumers never receive events belonging to other tenants even on shared Kafka topics.

gRPC errors:

gRPC statusCondition
NOT_FOUNDTopic does not exist
PERMISSION_DENIEDCaller lacks events.subscribe RBAC permission for this topic

Unsubscribe

Request — UnsubscribeRequest

message UnsubscribeRequest {
string topic = 1;
string group_id = 2;
}

Response — UnsubscribeResponse

message UnsubscribeResponse {}

ListTopics

Request — ListTopicsRequest

message ListTopicsRequest {}

Response — ListTopicsResponse

message ListTopicsResponse {
repeated TopicInfo topics = 1;
}

message TopicInfo {
string name = 1;
int32 partition_count = 2;
}

Platform topics catalogue:

TopicDomainPartition keyRetention
platform.auth.eventsIAMuser_id7 days
platform.money.eventsMoneywallet_id7 days
platform.module.eventsModule Registrymodule_id7 days
platform.files.eventsFile Storagefile_id7 days
platform.notify.eventsNotifyuser_id7 days
platform.audit.eventsAuditentity_id30 days
platform.billing.eventsBillingtenant_id7 days
platform.data.eventsData Layer (CDC)record_id7 days
note

Retention periods are configured via environment variables:

  • 7 days (Global Default): Controlled by KAFKA_RETENTION_HOURS=168. This is the cluster-wide default and applies to all standard topics (auth, money, module, files, notify, billing, data).
  • 30 days (Audit Override): The platform.audit.events topic requires a longer retention for compliance, overridden specifically via KAFKA_AUDIT_RETENTION_HOURS=720.

GetTopicSchema

Request — GetTopicSchemaRequest

message GetTopicSchemaRequest {
string topic = 1; // Required.
}

Response — GetTopicSchemaResponse

message GetTopicSchemaResponse {
string topic = 1;
string schema = 2; // Protobuf schema definition (text format)
string schema_version = 3; // Current active version
}

ReplayFrom

Seeks a consumer group's offset to a past timestamp, enabling event replay from a specific point in time. Replay is bounded by Kafka's topic retention (KAFKA_RETENTION_HOURS).

Request — ReplayFromRequest

message ReplayFromRequest {
string topic = 1; // Required.
string group_id = 2; // Required.
google.protobuf.Timestamp from_timestamp = 3; // Required. Replay starts from this point.
}

Response — ReplayFromResponse

message ReplayFromResponse {
string message = 1; // Confirmation (e.g. "Seek scheduled for 2026-04-01T00:00:00Z")
}

gRPC errors:

gRPC statusCondition
OUT_OF_RANGEfrom_timestamp is older than the topic's retention window
NOT_FOUNDTopic or consumer group does not exist

Bulk import suppress: When replaying large volumes (Kafka lag > 100 000 events), lag alert thresholds are automatically suppressed for 30 minutes (KAFKA_BULK_IMPORT_SUPPRESS_MINUTES=30) to avoid false alerting.


GetConsumerPosition

Returns the current offset, latest offset, and lag for a consumer group on a topic. Used by the Admin UI health dashboard and alerting.

Request — GetConsumerPositionRequest

message GetConsumerPositionRequest {
string topic = 1;
string group_id = 2;
}

Response — GetConsumerPositionResponse

message GetConsumerPositionResponse {
int64 current_offset = 1; // Consumer's current committed offset
int64 latest_offset = 2; // Latest offset in the topic partition
int64 lag = 3; // latest_offset − current_offset
}

Lag alert thresholds:

Lag ageAlert level
> 1 minuteWarning
> 5 minutesError
> 15 minutesCritical (PagerDuty)

DLQRetry

Retries a single failed DLQ event by re-publishing it to its original topic.

Request — DLQRetryRequest

message DLQRetryRequest {
string event_id = 1; // Required. DLQ event ID to retry.
}

Response — DLQRetryResponse

message DLQRetryResponse {
bool success = 1;
string message = 2; // Human-readable result
}

gRPC errors:

gRPC statusCondition
NOT_FOUNDDLQ event ID does not exist
FAILED_PRECONDITIONEvent is already in resolved or expired status

DLQReplayAll

Replays all pending DLQ events. Throttled to 50 events/second to avoid overwhelming the Kafka broker.

Request — DLQReplayAllRequest

message DLQReplayAllRequest {}

Response — DLQReplayAllResponse

message DLQReplayAllResponse {
int32 replayed = 1; // Events re-published to Kafka
int32 skipped_already_resolved = 2; // Events in "resolved" status — skipped
}

Idempotency: Financial events (money.*) are deduplicated against the processed_event_ids PostgreSQL table (Valkey TTL is not used for financial events — permanent dedup). Non-financial events use Valkey dedup with a 24-hour TTL.


Domain Event Schemas

Auth Events — platform.auth.events

Topic: platform.auth.events · Partition key: user_id · Source: IAM Service

MessageDescription (Event Type)
UserCreatedEventNew user account created (auth.user.created)
UserUpdatedEventUser profile modified, sparse fields (auth.user.updated)
UserDeletedEventUser soft-deleted (auth.user.deleted)
UserLoggedInEventSuccessful authentication (auth.user.logged_in)
UserLoggedOutEventSession terminated (auth.user.logged_out)
RoleChangedEventUser role changed by admin/owner (auth.role.changed)
PasswordChangedEventPassword changed or reset (auth.password.changed)
APIKeyCreatedEventNew API key generated (auth.api_key.created)
APIKeyRevokedEventAPI key revoked (auth.api_key.revoked)

Key schemas:

message UserCreatedEvent {
string user_id = 1;
string email = 2;
string display_name = 3;
string role = 4; // "member" | "admin" | "owner"
string tenant_id = 5;
google.protobuf.Timestamp created_at = 6;
}

message UserUpdatedEvent {
string user_id = 1;
optional string email = 2; // Only changed fields are populated
optional string display_name = 3;
optional bool is_active = 4;
string updated_by = 5;
google.protobuf.Timestamp updated_at = 6;
}

message RoleChangedEvent {
string user_id = 1;
string previous_role = 2;
string new_role = 3;
string changed_by = 4;
string tenant_id = 5;
google.protobuf.Timestamp changed_at = 6;
}

message UserLoggedInEvent {
string user_id = 1;
string session_id = 2;
string ip_address = 3;
string user_agent = 4;
string auth_method = 5; // "password" | "oauth" | "api_key"
google.protobuf.Timestamp logged_in_at = 6;
}

Money Events — platform.money.events

Topic: platform.money.events · Partition key: wallet_id · Source: Money Service

MessageDescription (Event Type)
WalletCreatedEventNew wallet created (money.wallet.created)
WalletCreditedEventFunds added to wallet (money.wallet.credited)
WalletDebitedEventFunds withdrawn from wallet (money.wallet.debited)
TransferCompletedEventWallet-to-wallet transfer success (money.transfer.completed)
TransferFailedEventWallet-to-wallet transfer failure (money.transfer.failed)
PaymentProcessedEventExternal payment processed (money.payment.processed)
WalletFrozenEventWallet frozen for compliance (money.wallet.frozen)
WalletUnfrozenEventFrozen wallet restored (money.wallet.unfrozen)

Key schemas:

message WalletCreditedEvent {
string wallet_id = 1;
string transaction_id = 2; // Idempotency key
string tenant_id = 3;
int64 amount = 4; // Minor units (cents). int64 — no float precision issues
string currency = 5; // ISO 4217
int64 balance_after = 6; // Running balance after credit (minor units)
string source = 7; // "deposit"|"refund"|"transfer_in"|"bonus"
string reference_id = 8; // External payment processor reference
string description = 9;
google.protobuf.Timestamp credited_at = 10;
}

message TransferCompletedEvent {
string transfer_id = 1; // Idempotency key
string from_wallet_id = 2;
string to_wallet_id = 3;
string tenant_id = 4;
int64 amount = 5;
string currency = 6;
string initiated_by = 7;
string description = 8;
google.protobuf.Timestamp completed_at = 9;
}

message TransferFailedEvent {
string transfer_id = 1;
string from_wallet_id = 2;
string to_wallet_id = 3;
string tenant_id = 4;
int64 amount = 5;
string currency = 6;
string failure_reason = 7; // "insufficient_funds"|"wallet_frozen"|"limit_exceeded"
string initiated_by = 8;
google.protobuf.Timestamp failed_at = 9;
}

Module Events — platform.module.events

Topic: platform.module.events · Partition key: module_id · Source: Module Registry

MessageDescription (Event Type)
ModuleRegisteredEventNew module registered (module.registry.registered)
ModuleActivatedEventModule activated for tenant (module.registry.activated)
ModuleDeactivatedEventModule deactivated (module.registry.deactivated)
ModuleUpdatedEventModule version upgraded (module.registry.updated)
ModuleConfigChangedEventModule config changed (module.registry.config_changed)
ModuleHealthChangedEventModule health status changed (module.registry.health_changed)

Key schemas:

message ModuleRegisteredEvent {
string module_id = 1;
string name = 2;
string version = 3; // Semantic version (e.g. "1.2.0")
string manifest_hash = 4; // SHA-256 of manifest.json
string tenant_id = 5;
string registered_by = 6;
google.protobuf.Timestamp registered_at = 7;
}

message ModuleUpdatedEvent {
string module_id = 1;
string previous_version = 2;
string new_version = 3;
string tenant_id = 4;
string updated_by = 5;
bool requires_migration = 6; // true → DB schema migration needed
google.protobuf.Timestamp updated_at = 7;
}

Schema Versioning Policy

RuleDetail
Backward-compatible changesAdding optional fields — allowed without version change
Breaking changesNew event_type required — never change existing field semantics
Schema CI enforcementbuf breaking --use WIRE_JSON on every PR — wire-breaking changes fail CI
Consumer contractOld consumers ignore unknown fields (proto3 default)
DeprecationOld event_type kept for 2 full Kafka retention windows (14 days) before removal