On this page
- Overview
- Architecture Layers
- Design Decisions
- GRPCBroker and Reverse Channels (F092)
- Key Components
- F092 Integration: StreamManager and EventDeliverer
- Wiring (Interfaces Layer)
- Event Flow (End-to-End)
- Lifecycle & Cleanup
- Non-Functional Requirements
- Testing Strategy
- Recent Enhancements (F092)
- Future Extensions
- See Also
Plugin Event System Architecture
Overview
The Plugin Event System (F090 + F092) enables real-time event-driven communication between AWF core and plugins, and between plugins themselves. It’s built on gRPC and implements a fan-out pattern with per-plugin buffered channels for isolation. F092 extends the system with GRPCBroker-based plugin-to-host event emission and persistent gRPC streaming for optimized host-to-plugin delivery.
Architecture Layers
┌──────────────────────────────────────────────────────────┐
│ Plugin SDK Layer (pkg/plugin/sdk) │
│ EventSubscriber interface │ HostClient (emit via │
│ + defaults │ broker) │ StreamEvents │
│ │ handler (recv via stream) │
└──────────────┬──────────────┴──────────┬─────────────────┘
│ (gRPC HandleEvent / │ (GRPCBroker
│ StreamEvents RPC) │ HostEventService.Emit)
┌──────────────▼──────────────────────────▼────────────────┐
│ StreamManager + gRPC Adapter (pluginmgr/) │
│ • StreamManager: per-plugin stream tracking │
│ • Implements EventDeliverer (stream or unary) │
│ • Automatic fallback on Unimplemented/broken stream │
│ • grpcEventAdapter: proto ↔ domain conversions │
└──────────────┬───────────────────────────────────────────┘
│
┌──────────────▼───────────────────────────────────────────┐
│ EventBus Infrastructure │
│ • Pattern matching (glob) │
│ • Per-plugin buffered channels (256) │
│ • Async delivery goroutines │
│ • Cycle detection (depth 3) │
│ ┌──────────────────────────────────┐ │
│ │ HostEventService (broker-served) │ │
│ │ • Receives plugin Emit() calls │ │
│ │ • Validates against manifest │ │
│ │ • Publishes to EventBus │ │
│ └──────────────────────────────────┘ │
└──────────────┬───────────────────────────────────────────┘
│
┌──────────────▼───────────────────────────────────────────┐
│ ExecutionService / Domain │
│ Emits lifecycle events (workflow.started) │
│ Manages EventPublisher port │
└──────────────────────────────────────────────────────────┘Design Decisions
1. Unary RPCs vs Streaming
Decision: Use unary HandleEvent RPC instead of server-side streaming Subscribe.
Rationale:
- All existing gRPC services in codebase use unary RPCs on plugin side
- GRPCBroker reverse connections untested in codebase
- Functionally equivalent for fire-and-forget events
- Simpler error handling and timeout management
Trade-off:
- No persistent streaming connection (acceptable because go-plugin manages plugin lifecycle)
- Connection breaks cause plugin restart, which re-establishes subscriptions from manifest
2. Manifest-Based Subscriptions
Decision: Read subscription patterns from plugin manifest at Init time, not via RPC.
Rationale:
- Host already parses and validates manifests
- Gating by capability already established pattern
- No additional RPC round-trip
- Consistent with validator/step_type discovery
Implementation:
# plugin.yaml
capabilities:
- events
events:
subscribe:
- "workflow.*"
- "step.failed"3. Custom Glob Matching
Decision: Implement custom dot-segment glob matching, not filepath.Match.
Rationale:
- Spec requires
.as segment separator:workflow.*must matchworkflow.startedbut NOTworkflow.step.started filepath.Matchuses/as separator, doesn’t apply to dot-separated names- Custom implementation is simple and clear
Implementation:
Pattern: "workflow.*"
Event: "workflow.started"
Split: ["workflow", "*"] vs ["workflow", "started"]
Match!
Event: "workflow.step.started"
Split: ["workflow", "step", "started"] (3 segments vs 2)
No match!4. Per-Plugin Buffered Channels
Decision: Each plugin has its own 256-event buffered channel.
Rationale:
- Isolation: slow plugin doesn’t block others
- Non-blocking delivery: events dropped on full buffer (logged as warning)
- Transport back-pressure: gRPC HTTP/2 flow control beneath application layer
- 256 events sufficient for typical workflow patterns
If Buffer Overflows:
Event published to slow plugin
│
├─ Non-blocking send to channel (cap 256)
│
└─ If full: Drop event, log warning
(Other plugins still receive)5. Async Delivery Goroutines
Decision: One delivery goroutine per plugin subscription.
Rationale:
- gRPC HandleEvent RPC runs async from event publication
- Non-blocking: slow RPC doesn’t block EventBus or other plugins
- Clean cleanup: done channel signals goroutine exit
Flow:
Publish(event)
│
├─ For each subscription matching event pattern:
│ │
│ └─ Send to channel (non-blocking)
│
└─ Delivery goroutine (for each plugin):
│
├─ Read from channel
│
└─ Call plugin's HandleEvent RPC
├─ On success: process emitted events
├─ On error: log, continue (don't break)
└─ On timeout: abort gracefully6. Propagation Depth Limiting
Decision: Limit event propagation to depth 3.
Rationale:
- Prevent infinite loops from circular event chains
- Plugin A emits → triggers Plugin B → emits → triggers Plugin A (caught at depth 3)
- Hard limit prevents stack overflow
- Warnings logged for visibility
Implementation:
type DomainEvent struct {
PropagationDepth int // Incremented on each plugin emit
}
// In EventBus delivery goroutine:
if event.PropagationDepth >= 3 {
logger.Warn("propagation depth exceeded for event %s", event.Type)
return // Stop propagation
}7. DomainEvent in pluginmodel/ Package
Decision: Place event entity in internal/domain/pluginmodel/, not internal/domain/workflow/.
Rationale:
- Events are plugin-specific concerns
- Colocated with capability constants and manifest types
- Avoids coupling workflow domain to plugin event system
- Clear separation of concerns
GRPCBroker and Reverse Channels (F092)
F092 activates the GRPCBroker to enable two capabilities:
- Plugin-to-host event emission — Plugins emit events via
HostEventServiceexposed through the broker - Optimized event delivery — Persistent client-side gRPC streams replace repeated unary RPC calls
HostEventService
The host exposes a HostEventService on the GRPCBroker that plugins can dial to emit events:
// Host side: Expose HostEventService on broker
service HostEventService {
rpc Emit(EmitRequest) returns (EmitResponse);
}
// Plugin side: Dial and use HostEventService
hostClient := NewHostClient(broker, "plugin-name")
hostClient.Emit(ctx, "event.type", payload, metadata)Permission Model:
Emit requests are validated against the plugin’s events.emit manifest patterns:
# plugin.yaml
events:
emit:
- "custom.analysis.*"Attempting to emit undeclared event types returns an authorization error:
Plugin emits "custom.analysis.complete" → Manifest allows "custom.analysis.*" → ✓ Accepted
Plugin emits "workflow.failed" → Not in manifest → ✗ Denied (BROKER_EMIT_DENIED)Event Flow:
Plugin→HostClient.Emit(event)
│
└─ gRPC Emit() call via broker
│
└─ HostEventService receives request
│
├─ Validate: Is emitted event type in plugin's events.emit patterns?
│
├─ Validate: Does plugin have 'events' capability?
│
└─ Publish to EventBus (same path as core-emitted events)
│
└─ EventBus routes to subscribers
└─ Each subscriber receives via HandleEvent RPC or streamingStreamManager and Streaming Delivery
While plugin→host emission goes through the broker’s HostEventService, the reverse (host→plugin event delivery) is optimized with client-side streaming:
Streaming RPC Definition:
service EventService {
rpc HandleEvent(HandleEventRequest) returns (HandleEventResponse); // Unary (existing)
rpc StreamEvents(stream EventStreamMessage) returns (StreamEventsResponse); // Streaming (F092+)
}Note the asymmetry: HandleEvent is unary (one request), StreamEvents is client-side streaming (multiple messages from client). This is because:
- Plugin is gRPC server, host is gRPC client
- Host needs to push events to plugin = host (client) sends stream
- Typical gRPC pattern for client→server push
StreamManager Workflow:
wireEventSubscriptions() called for plugin
│
├─ Check if plugin supports StreamEvents RPC
│
├─ If YES: Register stream via broker
│ │
│ └─ StreamManager.RegisterStream(pluginName, stream)
│ │
│ └─ Future events routed via stream (low latency, fewer RPC calls)
│
└─ If NO: Use unary adapter (fallback)
│
└─ Each event triggers a separate HandleEvent RPC callBenefits:
- Lower latency: One persistent connection vs per-event RPC setup
- Fewer round-trips: 100 events = 1 stream + 100 Sends vs 100 HandleEvent calls
- Transparent fallback: Plugins without streaming support continue to work unchanged
- Automatic recovery: If stream breaks, fallback to unary (no manual intervention)
Per-Event Sequence Numbers:
Messages on the stream include a sequence number for ordering and debugging:
EventStreamMessage {
sequence_number: 1 // First event
id: "evt-123"
type: "workflow.started"
...
}
EventStreamMessage {
sequence_number: 2 // Second event
...
}Sequence numbers are per-plugin (reset on stream reconnect).
Key Components
DomainEvent (Domain)
type DomainEvent struct {
ID string // UUID for idempotency
Type string // "workflow.started", "deploy.completed", etc.
Timestamp time.Time // When event occurred
Source string // "core" or plugin name
Metadata map[string]string
Payload []byte // JSON-serializable data
PropagationDepth int // Incremented on inter-plugin routing
}Properties:
- UUID enables idempotent processing after reconnection
- Metadata contains context (workflow_id, step_name, etc.)
- Payload for structured event-specific data
- PropagationDepth prevents cycles
EventPublisher Port (Domain)
type EventPublisher interface {
Publish(ctx context.Context, event *DomainEvent) error
Close() error
}Implementations:
EventBusin infrastructureMockEventPublisherin tests
EventBus (Infrastructure)
type EventBus struct {
subscriptions map[string]*eventSubscription // pluginName → subscription
mu sync.RWMutex
logger ports.Logger
}
type eventSubscription struct {
patterns []string
pluginName string
eventCh chan *DomainEvent // Buffered
done chan struct{} // Signal goroutine exit
deliverer EventDeliverer // Calls plugin RPC
}Responsibilities:
- Pattern matching (glob on event types)
- Per-plugin channel management
- Async delivery goroutines
- Cycle detection
- Back-pressure handling
gRPC Event Adapter (Infrastructure)
type grpcEventAdapter struct {
client pluginv1.EventServiceClient
pluginName string
timeout time.Duration
logger ports.Logger
}
func (a *grpcEventAdapter) DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) {
req := domainEventToProto(event)
resp, err := a.client.HandleEvent(ctx, req)
if err != nil {
return nil, err
}
return protoToDomainEvents(resp.EmittedEvents), nil
}Responsibilities:
- Domain ↔ proto conversions
- Timeout handling
- Emitted event extraction
EventSubscriber Interface (SDK)
type EventSubscriber interface {
Patterns() []string
HandleEvent(ctx context.Context, event Event) ([]Event, error)
}Plugin Implementation:
func (p *MyPlugin) Patterns() []string {
return []string{"workflow.*", "step.failed"}
}
func (p *MyPlugin) HandleEvent(ctx context.Context, event Event) ([]Event, error) {
// Handle event, optionally emit new events
return emittedEvents, err
}F092 Integration: StreamManager and EventDeliverer
F092 introduces minimal coupling by leveraging the existing EventDeliverer interface seam:
type EventDeliverer interface {
DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error)
}Before F092:
- EventBus.Subscribe() takes a
grpcEventAdapter(implements EventDeliverer) - grpcEventAdapter calls HandleEvent unary RPC
After F092:
- EventBus.Subscribe() takes either:
grpcEventAdapter(unary, fallback)streamDeliverer(streaming, preferred)
- StreamManager decides which deliverer to use per plugin
- No changes to EventBus internals
SelectionLogic:
func (sm *StreamManager) GetDeliverer(pluginName string, unaryFallback EventDeliverer) EventDeliverer {
if sm.HasStream(pluginName) {
return &streamDeliverer{
stream: sm.streams[pluginName],
fallback: unaryFallback,
}
}
return unaryFallback
}Fallback Handling:
If a streaming send fails (stream broken, timeout, or plugin doesn’t support streaming), the streamDeliverer automatically falls back to unary:
func (d *streamDeliverer) DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) {
if err := d.stream.Send(eventStreamMessage); err != nil {
// Detect "Unimplemented" gRPC status (plugin doesn't support StreamEvents)
if isUnimplemented(err) {
d.sm.UnregisterStream(d.pluginName) // Mark stream as unavailable
}
// Fall back to unary delivery
return d.fallback.DeliverEvent(ctx, event)
}
return nil, nil
}This design ensures:
- Backward compatibility: Existing unary path unchanged
- Minimal surface area: No EventBus modifications
- Graceful degradation: Streaming failure → unary (always works)
- Per-plugin granularity: Each plugin independently uses streaming if available
Wiring (Interfaces Layer)
Initialization:
InitSystem()
│
├─ Create EventBus
│
└─ Set on RPCPluginManager
│
└─ After plugin Init:
└─ If plugin has "events" capability:
└─ Create grpcEventAdapter
└─ Call EventBus.Subscribe()Execution:
run.go
│
├─ initPluginSystem()
│ └─ Get EventBus from SystemResult
│
└─ Create ExecutionSetup
│
└─ WithEventPublisher(eventBus)
│
└─ ExecutionService.SetEventPublisher(eventBus)
│
└─ On each lifecycle point:
└─ Emit domain event via EventPublisher.Publish()Event Flow (End-to-End)
1. Workflow step completes
└─ ExecutionService.Run() completes step
2. ExecutionService emits event
└─ event := NewDomainEvent("step.completed", "core", metadata)
└─ eventPublisher.Publish(ctx, event)
3. EventBus.Publish() routes event
└─ For each subscription:
└─ matchEventPattern("step.*", "step.completed") → true
└─ Send event to plugin's channel (non-blocking)
4. Delivery goroutine processes event
└─ Read from eventCh
└─ Call plugin's HandleEvent RPC
5. Plugin's HandleEvent runs
└─ Receives sdk.Event
└─ Returns []sdk.Event (custom events to emit)
6. Emitted events routed
└─ Increment PropagationDepth
└─ Send to other subscribed plugins
└─ If depth ≥ 3: halt propagation (log warning)Lifecycle & Cleanup
Plugin Initialization:
plugin process starts
│
├─ Plugin.Init() called
│
└─ Plugin has "events" capability:
└─ EventBus.Subscribe(pluginName, patterns, adapter)
└─ Start delivery goroutinePlugin Shutdown:
Plugin process dies / removed via `awf plugin remove`
│
└─ EventBus.Unsubscribe(pluginName)
│
├─ Close done channel
│
└─ Delivery goroutine exits (receives on done)
│
└─ Channel cleanup
└─ Goroutine count back to baseline (5s max)Graceful Shutdown:
awf run completes
│
└─ ExecutionService.Run() returns
└─ run.go defers cleanup
│
└─ pluginResult.Cleanup()
│
└─ EventBus.Close()
│
└─ Unsubscribe all plugins
└─ Wait for delivery goroutinesNon-Functional Requirements
Latency (NFR-001)
Target: Event delivery to plugin HandleEvent < 10ms (p95) for idle streams.
Achieved via:
- In-process EventBus (no network hop)
- Buffered channels (no allocation per event)
- Immediate async delivery (no queuing beyond buffer)
Isolation (NFR-002)
Slow plugin must not block event delivery to other plugins.
Achieved via:
- Per-plugin buffered channels (independent)
- Non-blocking send (drops on full)
- Async delivery goroutine per plugin
Cleanup (NFR-003)
Stream disconnection cleanup within 5 seconds, no goroutine leaks.
Achieved via:
- done channel signals goroutine exit
- Delivery goroutine reads from done in select
- Channel cleanup on exit
Back-Pressure (NFR-004)
Slow/crashed plugins don’t cause buffer overflow in EventBus core.
Achieved via:
- Non-blocking send to per-plugin channel
- 256-event buffer per plugin
- Drops logged as warnings
- EventBus core never blocks on plugin channels
Secret Masking (NFR-005)
Event metadata/payload must mask secrets in logs.
Implementation:
func emitEvent(ctx context.Context, eventType, source string, metadata map[string]string) {
// Mask secrets in metadata
masked := logger.SecretMasker.Mask(metadata)
event := NewDomainEvent(eventType, source, masked, nil)
eventPublisher.Publish(ctx, event)
}Backward Compatibility (NFR-006)
Existing plugins without events capability unaffected.
Achieved via:
- Optional capability gating
- EventSubscriber on BasePlugin has no-op defaults
- Manifest validation allows empty events field
Testing Strategy
Unit Tests
- Pattern Matching (11 cases): exact, wildcard single/multi-segment, empty inputs
- EventBus Publishing: delivery to matching subscribers, no-match skip
- Buffer Management: drop on full, warning logged
- Cycle Detection: depth 3 halt, propagation prevented
- Goroutine Cleanup: baseline count restored after unsubscribe
- gRPC Adapter: proto conversion, RPC calls
- SDK Event Defaults: BasePlugin no-op, custom subscriber dispatch
Integration Tests
- Plugin with events capability receives workflow.completed
- Two plugins: A emits → B receives with source attribution
- Goroutine baseline restored after full cleanup
- All existing plugin tests pass without modification
Recent Enhancements (F092)
F092 introduced two major enhancements:
- Plugin-to-Host Event Emission — Plugins can now emit events via HostClient, not just return events from HandleEvent
- Optimized Event Delivery — Persistent client-side gRPC streams reduce latency and RPC overhead
These features completed the bidirectional event system (host→plugin via EventBus, plugin→host via broker) and optimized the high-throughput case.
Future Extensions
Persistent Event Log
Status: Deferred
Rationale: Fire-and-forget sufficient for v0.8+; replay adds complexity (durable storage, message ordering guarantees)
Fine-Grained Filters
Status: Deferred
Rationale: Glob on event type covers 90% of use cases; field-level filtering adds protocol complexity
OTel Correlation
Status: Deferred to OTel roadmap progress
Rationale: Events carry metadata for manual correlation; automatic integration requires OTel lib decisions
Bidirectional Streaming
Status: Not planned (client-side streaming sufficient)
Rationale: F092 implements client-side streaming (host→plugin) for optimal push delivery. Bidirectional streaming adds complexity without demonstrated benefit — plugin→host uses broker’s separate Emit RPC instead
See Also
- Plugin Events Guide - User-facing documentation
- Plugins Guide - Plugin development reference
- Architecture - Overall AWF architecture