Events gRPC Service Reference
Package platform.events.v1 contains the EventService — the gRPC
interface for the Event Bus primitive. It covers publishing, server-streaming
subscription, topic management, consumer position monitoring, and DLQ
operations.
Additionally, the package includes schemas/v1/ — typed Protobuf definitions
for every domain event schema. These schemas are the single source of truth
for event payloads; any breaking schema change requires a new event type
(not a version bump on the same type).
See the gRPC Overview for buf configuration, Go package conventions, deadlines, mTLS, and error mapping.
Proto files:
proto/platform/events/v1/events.proto— service +Event+ RPC messagesproto/platform/events/v1/schemas/v1/common.proto—EventEnvelope+DLQRecordproto/platform/events/v1/schemas/v1/auth.proto— Auth domain events (8 types)proto/platform/events/v1/schemas/v1/money.proto— Money domain events (7 types)proto/platform/events/v1/schemas/v1/module.proto— Module domain events (6 types)
Go package: kernel.internal/platform-kernel/gen/go/platform/events/v1;eventsv1
Go schema package: kernel.internal/platform-kernel/gen/go/platform/events/v1/schemas/v1;eventschemas
EventService — All RPCs
service EventService {
rpc Publish(PublishRequest) returns (PublishResponse);
rpc BatchPublish(BatchPublishRequest) returns (BatchPublishResponse);
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
rpc Unsubscribe(UnsubscribeRequest) returns (UnsubscribeResponse);
rpc ListTopics(ListTopicsRequest) returns (ListTopicsResponse);
rpc GetTopicSchema(GetTopicSchemaRequest) returns (GetTopicSchemaResponse);
rpc ReplayFrom(ReplayFromRequest) returns (ReplayFromResponse);
rpc GetConsumerPosition(GetConsumerPositionRequest) returns (GetConsumerPositionResponse);
rpc DLQRetry(DLQRetryRequest) returns (DLQRetryResponse);
rpc DLQReplayAll(DLQReplayAllRequest) returns (DLQReplayAllResponse);
}
| RPC | Type | Description |
|---|---|---|
Publish | Unary | Publish a single event to a domain topic |
BatchPublish | Unary | Publish multiple events in one call |
Subscribe | Server-streaming | Stream events from a topic by consumer group |
Unsubscribe | Unary | Remove a consumer group subscription |
ListTopics | Unary | List all available event topics and partition counts |
GetTopicSchema | Unary | Get the Protobuf schema definition for a topic |
ReplayFrom | Unary | Seek a consumer group to a past timestamp |
GetConsumerPosition | Unary | Get current offset, latest offset, and lag |
DLQRetry | Unary | Retry a single failed DLQ event |
DLQReplayAll | Unary | Replay all pending DLQ events (throttled: 50 events/sec) |
Core Message — Event
The Event message is the wire format for individual events published and
received through the gRPC API. Domain schemas are packed into the data field
as serialized Protobuf bytes.
message Event {
string id = 1; // UUID v7 — idempotency key
string type = 2; // domain.entity.action (e.g. "auth.user.created")
string source = 3; // Module that published the event
string tenant_id = 4; // Injected by Gateway; never trust client value
bytes data = 5; // Serialized Protobuf payload
string schema_version = 6; // Semantic version (e.g. "1.0.0")
google.protobuf.Timestamp timestamp = 7; // When the event occurred
string trace_id = 8; // OpenTelemetry trace ID
}
| Field | Description |
|---|---|
id | UUID v7. Used as Kafka message key for idempotency dedup on replay. |
type | Dot-notation: <domain>.<entity>.<action>. Must match a registered schema. |
tenant_id | Set by Gateway from JWT tenantId claim. SDK-level consumers filter by this field. |
data | Raw Protobuf bytes of the typed domain event (see schemas below). |
schema_version | "1.0.0". Breaking changes require a new type, not a version bump. |
trace_id | Propagated from the originating HTTP request for end-to-end trace correlation. |
Core Message — EventEnvelope (common.proto)
EventEnvelope is the higher-level wrapper used internally by the Event Bus
when publishing via kernel.events() SDK. It carries additional metadata that
is not present in the raw Event gRPC message.
message EventEnvelope {
string event_id = 1; // UUID v7
string event_type = 2; // domain.entity.action
string source = 3;
string tenant_id = 4;
string schema_version = 5;
google.protobuf.Timestamp timestamp = 6;
string trace_id = 7;
string correlation_id = 8; // Links related events (saga pattern)
google.protobuf.Any payload = 9; // Typed domain event (Any.pack)
string entity_id = 10; // Partition key (ordering guarantee per entity)
}
entity_idas partition key: Kafka partition assignment is deterministic onentity_id. All events for the same entity (e.g., samewallet_id) land on the same partition, guaranteeing processing order per entity within a consumer group.
Core Message — DLQRecord (common.proto)
message DLQRecord {
EventEnvelope original_event = 1;
int32 attempt_count = 2; // Attempts before DLQ
string last_error = 3;
google.protobuf.Timestamp dlq_timestamp = 4;
string consumer_group = 5;
string status = 6; // "pending"|"retrying"|"resolved"|"expired"
}
Publish
Publish a single event to a Kafka domain topic.
Request — PublishRequest
message PublishRequest {
string topic = 1; // Required. e.g. "platform.auth.events"
string key = 2; // Required. Partition key — use entityId for ordering guarantee
Event event = 3; // Required. The event to publish
}
Response — PublishResponse
message PublishResponse {
string event_id = 1; // Confirmed event ID
}
gRPC errors:
| gRPC status | Condition |
|---|---|
NOT_FOUND | Topic does not exist |
PERMISSION_DENIED | Caller lacks events.publish RBAC permission for this topic |
INVALID_ARGUMENT | event.type does not match a registered schema |
UNAVAILABLE | Kafka broker unavailable — caller should retry with backoff |
BatchPublish
Publish multiple events in a single Kafka producer batch. More efficient than
repeated Publish for bulk event emission (e.g., after a database import).
Request — BatchPublishRequest
message BatchPublishRequest {
string topic = 1; // Required. All items go to the same topic.
repeated BatchPublishItem items = 2; // Required.
}
message BatchPublishItem {
string key = 1; // Partition key for this item
Event event = 2;
}
Response — BatchPublishResponse
message BatchPublishResponse {
int32 published_count = 1; // Number of events successfully published
}
| Limit | Value |
|---|---|
| Max items per batch | 500 |
| Exceeded | INVALID_ARGUMENT |
Subscribe
Server-streaming RPC. Streams events from a Kafka topic for a given consumer group. The stream remains open until the caller cancels it or the connection is closed.
rpc Subscribe(SubscribeRequest) returns (stream SubscribeResponse);
Request — SubscribeRequest
message SubscribeRequest {
string topic = 1; // Required. Kafka topic name.
string group_id = 2; // Required. Consumer group: <service>.<handler> convention.
}
Response — SubscribeResponse (streamed)
message SubscribeResponse {
Event event = 1; // One event per stream message
}
Consumer group naming convention:
group_id = "<consuming-service>.<handler-name>"
Examples:
"audit.audit-record-handler"
"billing.subscription-event-handler"
"notify.user-notification-handler"
Tenant filtering: The Event Bus filters events by tenant_id extracted
from the gRPC call metadata (set by Gateway). Consumers never receive events
belonging to other tenants even on shared Kafka topics.
gRPC errors:
| gRPC status | Condition |
|---|---|
NOT_FOUND | Topic does not exist |
PERMISSION_DENIED | Caller lacks events.subscribe RBAC permission for this topic |
Unsubscribe
Request — UnsubscribeRequest
message UnsubscribeRequest {
string topic = 1;
string group_id = 2;
}
Response — UnsubscribeResponse
message UnsubscribeResponse {}
ListTopics
Request — ListTopicsRequest
message ListTopicsRequest {}
Response — ListTopicsResponse
message ListTopicsResponse {
repeated TopicInfo topics = 1;
}
message TopicInfo {
string name = 1;
int32 partition_count = 2;
}
Platform topics catalogue:
| Topic | Domain | Partition key | Retention |
|---|---|---|---|
platform.auth.events | IAM | user_id | 7 days |
platform.money.events | Money | wallet_id | 7 days |
platform.module.events | Module Registry | module_id | 7 days |
platform.files.events | File Storage | file_id | 7 days |
platform.notify.events | Notify | user_id | 7 days |
platform.audit.events | Audit | entity_id | 30 days |
platform.billing.events | Billing | tenant_id | 7 days |
platform.data.events | Data Layer (CDC) | record_id | 7 days |
Retention periods are configured via environment variables:
- 7 days (Global Default): Controlled by
KAFKA_RETENTION_HOURS=168. This is the cluster-wide default and applies to all standard topics (auth, money, module, files, notify, billing, data). - 30 days (Audit Override): The
platform.audit.eventstopic requires a longer retention for compliance, overridden specifically viaKAFKA_AUDIT_RETENTION_HOURS=720.
GetTopicSchema
Request — GetTopicSchemaRequest
message GetTopicSchemaRequest {
string topic = 1; // Required.
}
Response — GetTopicSchemaResponse
message GetTopicSchemaResponse {
string topic = 1;
string schema = 2; // Protobuf schema definition (text format)
string schema_version = 3; // Current active version
}
ReplayFrom
Seeks a consumer group's offset to a past timestamp, enabling event replay
from a specific point in time. Replay is bounded by Kafka's topic retention
(KAFKA_RETENTION_HOURS).
Request — ReplayFromRequest
message ReplayFromRequest {
string topic = 1; // Required.
string group_id = 2; // Required.
google.protobuf.Timestamp from_timestamp = 3; // Required. Replay starts from this point.
}
Response — ReplayFromResponse
message ReplayFromResponse {
string message = 1; // Confirmation (e.g. "Seek scheduled for 2026-04-01T00:00:00Z")
}
gRPC errors:
| gRPC status | Condition |
|---|---|
OUT_OF_RANGE | from_timestamp is older than the topic's retention window |
NOT_FOUND | Topic or consumer group does not exist |
Bulk import suppress: When replaying large volumes (Kafka lag > 100 000 events), lag alert thresholds are automatically suppressed for 30 minutes (
KAFKA_BULK_IMPORT_SUPPRESS_MINUTES=30) to avoid false alerting.
GetConsumerPosition
Returns the current offset, latest offset, and lag for a consumer group on a topic. Used by the Admin UI health dashboard and alerting.
Request — GetConsumerPositionRequest
message GetConsumerPositionRequest {
string topic = 1;
string group_id = 2;
}
Response — GetConsumerPositionResponse
message GetConsumerPositionResponse {
int64 current_offset = 1; // Consumer's current committed offset
int64 latest_offset = 2; // Latest offset in the topic partition
int64 lag = 3; // latest_offset − current_offset
}
Lag alert thresholds:
| Lag age | Alert level |
|---|---|
| > 1 minute | Warning |
| > 5 minutes | Error |
| > 15 minutes | Critical (PagerDuty) |
DLQRetry
Retries a single failed DLQ event by re-publishing it to its original topic.
Request — DLQRetryRequest
message DLQRetryRequest {
string event_id = 1; // Required. DLQ event ID to retry.
}
Response — DLQRetryResponse
message DLQRetryResponse {
bool success = 1;
string message = 2; // Human-readable result
}
gRPC errors:
| gRPC status | Condition |
|---|---|
NOT_FOUND | DLQ event ID does not exist |
FAILED_PRECONDITION | Event is already in resolved or expired status |
DLQReplayAll
Replays all pending DLQ events. Throttled to 50 events/second to avoid
overwhelming the Kafka broker.
Request — DLQReplayAllRequest
message DLQReplayAllRequest {}
Response — DLQReplayAllResponse
message DLQReplayAllResponse {
int32 replayed = 1; // Events re-published to Kafka
int32 skipped_already_resolved = 2; // Events in "resolved" status — skipped
}
Idempotency: Financial events (money.*) are deduplicated against the
processed_event_ids PostgreSQL table (Valkey TTL is not used for financial
events — permanent dedup). Non-financial events use Valkey dedup with a
24-hour TTL.
Domain Event Schemas
Auth Events — platform.auth.events
Topic: platform.auth.events · Partition key: user_id · Source: IAM Service
| Message | Description (Event Type) |
|---|---|
UserCreatedEvent | New user account created (auth.user.created) |
UserUpdatedEvent | User profile modified, sparse fields (auth.user.updated) |
UserDeletedEvent | User soft-deleted (auth.user.deleted) |
UserLoggedInEvent | Successful authentication (auth.user.logged_in) |
UserLoggedOutEvent | Session terminated (auth.user.logged_out) |
RoleChangedEvent | User role changed by admin/owner (auth.role.changed) |
PasswordChangedEvent | Password changed or reset (auth.password.changed) |
APIKeyCreatedEvent | New API key generated (auth.api_key.created) |
APIKeyRevokedEvent | API key revoked (auth.api_key.revoked) |
Key schemas:
message UserCreatedEvent {
string user_id = 1;
string email = 2;
string display_name = 3;
string role = 4; // "member" | "admin" | "owner"
string tenant_id = 5;
google.protobuf.Timestamp created_at = 6;
}
message UserUpdatedEvent {
string user_id = 1;
optional string email = 2; // Only changed fields are populated
optional string display_name = 3;
optional bool is_active = 4;
string updated_by = 5;
google.protobuf.Timestamp updated_at = 6;
}
message RoleChangedEvent {
string user_id = 1;
string previous_role = 2;
string new_role = 3;
string changed_by = 4;
string tenant_id = 5;
google.protobuf.Timestamp changed_at = 6;
}
message UserLoggedInEvent {
string user_id = 1;
string session_id = 2;
string ip_address = 3;
string user_agent = 4;
string auth_method = 5; // "password" | "oauth" | "api_key"
google.protobuf.Timestamp logged_in_at = 6;
}
Money Events — platform.money.events
Topic: platform.money.events · Partition key: wallet_id · Source: Money Service
| Message | Description (Event Type) |
|---|---|
WalletCreatedEvent | New wallet created (money.wallet.created) |
WalletCreditedEvent | Funds added to wallet (money.wallet.credited) |
WalletDebitedEvent | Funds withdrawn from wallet (money.wallet.debited) |
TransferCompletedEvent | Wallet-to-wallet transfer success (money.transfer.completed) |
TransferFailedEvent | Wallet-to-wallet transfer failure (money.transfer.failed) |
PaymentProcessedEvent | External payment processed (money.payment.processed) |
WalletFrozenEvent | Wallet frozen for compliance (money.wallet.frozen) |
WalletUnfrozenEvent | Frozen wallet restored (money.wallet.unfrozen) |
Key schemas:
message WalletCreditedEvent {
string wallet_id = 1;
string transaction_id = 2; // Idempotency key
string tenant_id = 3;
int64 amount = 4; // Minor units (cents). int64 — no float precision issues
string currency = 5; // ISO 4217
int64 balance_after = 6; // Running balance after credit (minor units)
string source = 7; // "deposit"|"refund"|"transfer_in"|"bonus"
string reference_id = 8; // External payment processor reference
string description = 9;
google.protobuf.Timestamp credited_at = 10;
}
message TransferCompletedEvent {
string transfer_id = 1; // Idempotency key
string from_wallet_id = 2;
string to_wallet_id = 3;
string tenant_id = 4;
int64 amount = 5;
string currency = 6;
string initiated_by = 7;
string description = 8;
google.protobuf.Timestamp completed_at = 9;
}
message TransferFailedEvent {
string transfer_id = 1;
string from_wallet_id = 2;
string to_wallet_id = 3;
string tenant_id = 4;
int64 amount = 5;
string currency = 6;
string failure_reason = 7; // "insufficient_funds"|"wallet_frozen"|"limit_exceeded"
string initiated_by = 8;
google.protobuf.Timestamp failed_at = 9;
}
Module Events — platform.module.events
Topic: platform.module.events · Partition key: module_id · Source: Module Registry
| Message | Description (Event Type) |
|---|---|
ModuleRegisteredEvent | New module registered (module.registry.registered) |
ModuleActivatedEvent | Module activated for tenant (module.registry.activated) |
ModuleDeactivatedEvent | Module deactivated (module.registry.deactivated) |
ModuleUpdatedEvent | Module version upgraded (module.registry.updated) |
ModuleConfigChangedEvent | Module config changed (module.registry.config_changed) |
ModuleHealthChangedEvent | Module health status changed (module.registry.health_changed) |
Key schemas:
message ModuleRegisteredEvent {
string module_id = 1;
string name = 2;
string version = 3; // Semantic version (e.g. "1.2.0")
string manifest_hash = 4; // SHA-256 of manifest.json
string tenant_id = 5;
string registered_by = 6;
google.protobuf.Timestamp registered_at = 7;
}
message ModuleUpdatedEvent {
string module_id = 1;
string previous_version = 2;
string new_version = 3;
string tenant_id = 4;
string updated_by = 5;
bool requires_migration = 6; // true → DB schema migration needed
google.protobuf.Timestamp updated_at = 7;
}
Schema Versioning Policy
| Rule | Detail |
|---|---|
| Backward-compatible changes | Adding optional fields — allowed without version change |
| Breaking changes | New event_type required — never change existing field semantics |
| Schema CI enforcement | buf breaking --use WIRE_JSON on every PR — wire-breaking changes fail CI |
| Consumer contract | Old consumers ignore unknown fields (proto3 default) |
| Deprecation | Old event_type kept for 2 full Kafka retention windows (14 days) before removal |