Overview

The Plugin Event System (F090 + F092) enables real-time event-driven communication between AWF core and plugins, and between plugins themselves. It’s built on gRPC and implements a fan-out pattern with per-plugin buffered channels for isolation. F092 extends the system with GRPCBroker-based plugin-to-host event emission and persistent gRPC streaming for optimized host-to-plugin delivery.

Architecture Layers

┌──────────────────────────────────────────────────────────┐
│            Plugin SDK Layer (pkg/plugin/sdk)              │
│   EventSubscriber interface │ HostClient (emit via       │
│   + defaults                │ broker) │ StreamEvents     │
│                             │ handler (recv via stream)   │
└──────────────┬──────────────┴──────────┬─────────────────┘
               │ (gRPC HandleEvent /      │ (GRPCBroker
               │  StreamEvents RPC)       │  HostEventService.Emit)
┌──────────────▼──────────────────────────▼────────────────┐
│   StreamManager + gRPC Adapter (pluginmgr/)              │
│   • StreamManager: per-plugin stream tracking            │
│   • Implements EventDeliverer (stream or unary)          │
│   • Automatic fallback on Unimplemented/broken stream    │
│   • grpcEventAdapter: proto ↔ domain conversions         │
└──────────────┬───────────────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────────────┐
│      EventBus Infrastructure                             │
│   • Pattern matching (glob)                              │
│   • Per-plugin buffered channels (256)                   │
│   • Async delivery goroutines                            │
│   • Cycle detection (depth 3)                            │
│   ┌──────────────────────────────────┐                   │
│   │ HostEventService (broker-served) │                   │
│   │ • Receives plugin Emit() calls   │                   │
│   │ • Validates against manifest     │                   │
│   │ • Publishes to EventBus          │                   │
│   └──────────────────────────────────┘                   │
└──────────────┬───────────────────────────────────────────┘
               │
┌──────────────▼───────────────────────────────────────────┐
│   ExecutionService / Domain                              │
│   Emits lifecycle events (workflow.started)               │
│   Manages EventPublisher port                            │
└──────────────────────────────────────────────────────────┘

Design Decisions

1. Unary RPCs vs Streaming

Decision: Use unary HandleEvent RPC instead of server-side streaming Subscribe.

Rationale:

  • All existing gRPC services in codebase use unary RPCs on plugin side
  • GRPCBroker reverse connections untested in codebase
  • Functionally equivalent for fire-and-forget events
  • Simpler error handling and timeout management

Trade-off:

  • No persistent streaming connection (acceptable because go-plugin manages plugin lifecycle)
  • Connection breaks cause plugin restart, which re-establishes subscriptions from manifest

2. Manifest-Based Subscriptions

Decision: Read subscription patterns from plugin manifest at Init time, not via RPC.

Rationale:

  • Host already parses and validates manifests
  • Gating by capability already established pattern
  • No additional RPC round-trip
  • Consistent with validator/step_type discovery

Implementation:

# plugin.yaml
capabilities:
  - events
events:
  subscribe:
    - "workflow.*"
    - "step.failed"

3. Custom Glob Matching

Decision: Implement custom dot-segment glob matching, not filepath.Match.

Rationale:

  • Spec requires . as segment separator: workflow.* must match workflow.started but NOT workflow.step.started
  • filepath.Match uses / as separator, doesn’t apply to dot-separated names
  • Custom implementation is simple and clear

Implementation:

Pattern: "workflow.*"
Event:   "workflow.started"

Split: ["workflow", "*"]  vs  ["workflow", "started"]
       Match!

Event:   "workflow.step.started"
Split:   ["workflow", "step", "started"]  (3 segments vs 2)
         No match!

4. Per-Plugin Buffered Channels

Decision: Each plugin has its own 256-event buffered channel.

Rationale:

  • Isolation: slow plugin doesn’t block others
  • Non-blocking delivery: events dropped on full buffer (logged as warning)
  • Transport back-pressure: gRPC HTTP/2 flow control beneath application layer
  • 256 events sufficient for typical workflow patterns

If Buffer Overflows:

Event published to slow plugin
│
├─ Non-blocking send to channel (cap 256)
│
└─ If full: Drop event, log warning
   (Other plugins still receive)

5. Async Delivery Goroutines

Decision: One delivery goroutine per plugin subscription.

Rationale:

  • gRPC HandleEvent RPC runs async from event publication
  • Non-blocking: slow RPC doesn’t block EventBus or other plugins
  • Clean cleanup: done channel signals goroutine exit

Flow:

Publish(event)
│
├─ For each subscription matching event pattern:
│  │
│  └─ Send to channel (non-blocking)
│
└─ Delivery goroutine (for each plugin):
   │
   ├─ Read from channel
   │
   └─ Call plugin's HandleEvent RPC
      ├─ On success: process emitted events
      ├─ On error: log, continue (don't break)
      └─ On timeout: abort gracefully

6. Propagation Depth Limiting

Decision: Limit event propagation to depth 3.

Rationale:

  • Prevent infinite loops from circular event chains
  • Plugin A emits → triggers Plugin B → emits → triggers Plugin A (caught at depth 3)
  • Hard limit prevents stack overflow
  • Warnings logged for visibility

Implementation:

type DomainEvent struct {
    PropagationDepth int  // Incremented on each plugin emit
}

// In EventBus delivery goroutine:
if event.PropagationDepth >= 3 {
    logger.Warn("propagation depth exceeded for event %s", event.Type)
    return  // Stop propagation
}

7. DomainEvent in pluginmodel/ Package

Decision: Place event entity in internal/domain/pluginmodel/, not internal/domain/workflow/.

Rationale:

  • Events are plugin-specific concerns
  • Colocated with capability constants and manifest types
  • Avoids coupling workflow domain to plugin event system
  • Clear separation of concerns

GRPCBroker and Reverse Channels (F092)

F092 activates the GRPCBroker to enable two capabilities:

  1. Plugin-to-host event emission — Plugins emit events via HostEventService exposed through the broker
  2. Optimized event delivery — Persistent client-side gRPC streams replace repeated unary RPC calls

HostEventService

The host exposes a HostEventService on the GRPCBroker that plugins can dial to emit events:

// Host side: Expose HostEventService on broker
service HostEventService {
  rpc Emit(EmitRequest) returns (EmitResponse);
}

// Plugin side: Dial and use HostEventService
hostClient := NewHostClient(broker, "plugin-name")
hostClient.Emit(ctx, "event.type", payload, metadata)

Permission Model:

Emit requests are validated against the plugin’s events.emit manifest patterns:

# plugin.yaml
events:
  emit:
    - "custom.analysis.*"

Attempting to emit undeclared event types returns an authorization error:

Plugin emits "custom.analysis.complete" → Manifest allows "custom.analysis.*" → ✓ Accepted
Plugin emits "workflow.failed" → Not in manifest → ✗ Denied (BROKER_EMIT_DENIED)

Event Flow:

Plugin→HostClient.Emit(event)
        │
        └─ gRPC Emit() call via broker
           │
           └─ HostEventService receives request
              │
              ├─ Validate: Is emitted event type in plugin's events.emit patterns?
              │
              ├─ Validate: Does plugin have 'events' capability?
              │
              └─ Publish to EventBus (same path as core-emitted events)
                 │
                 └─ EventBus routes to subscribers
                    └─ Each subscriber receives via HandleEvent RPC or streaming

StreamManager and Streaming Delivery

While plugin→host emission goes through the broker’s HostEventService, the reverse (host→plugin event delivery) is optimized with client-side streaming:

Streaming RPC Definition:

service EventService {
  rpc HandleEvent(HandleEventRequest) returns (HandleEventResponse);  // Unary (existing)
  rpc StreamEvents(stream EventStreamMessage) returns (StreamEventsResponse);  // Streaming (F092+)
}

Note the asymmetry: HandleEvent is unary (one request), StreamEvents is client-side streaming (multiple messages from client). This is because:

  • Plugin is gRPC server, host is gRPC client
  • Host needs to push events to plugin = host (client) sends stream
  • Typical gRPC pattern for client→server push

StreamManager Workflow:

wireEventSubscriptions() called for plugin
    │
    ├─ Check if plugin supports StreamEvents RPC
    │
    ├─ If YES: Register stream via broker
    │  │
    │  └─ StreamManager.RegisterStream(pluginName, stream)
    │     │
    │     └─ Future events routed via stream (low latency, fewer RPC calls)
    │
    └─ If NO: Use unary adapter (fallback)
       │
       └─ Each event triggers a separate HandleEvent RPC call

Benefits:

  • Lower latency: One persistent connection vs per-event RPC setup
  • Fewer round-trips: 100 events = 1 stream + 100 Sends vs 100 HandleEvent calls
  • Transparent fallback: Plugins without streaming support continue to work unchanged
  • Automatic recovery: If stream breaks, fallback to unary (no manual intervention)

Per-Event Sequence Numbers:

Messages on the stream include a sequence number for ordering and debugging:

EventStreamMessage {
  sequence_number: 1  // First event
  id: "evt-123"
  type: "workflow.started"
  ...
}

EventStreamMessage {
  sequence_number: 2  // Second event
  ...
}

Sequence numbers are per-plugin (reset on stream reconnect).

Key Components

DomainEvent (Domain)

type DomainEvent struct {
    ID               string    // UUID for idempotency
    Type             string    // "workflow.started", "deploy.completed", etc.
    Timestamp        time.Time // When event occurred
    Source           string    // "core" or plugin name
    Metadata         map[string]string
    Payload          []byte    // JSON-serializable data
    PropagationDepth int       // Incremented on inter-plugin routing
}

Properties:

  • UUID enables idempotent processing after reconnection
  • Metadata contains context (workflow_id, step_name, etc.)
  • Payload for structured event-specific data
  • PropagationDepth prevents cycles

EventPublisher Port (Domain)

type EventPublisher interface {
    Publish(ctx context.Context, event *DomainEvent) error
    Close() error
}

Implementations:

  • EventBus in infrastructure
  • MockEventPublisher in tests

EventBus (Infrastructure)

type EventBus struct {
    subscriptions map[string]*eventSubscription  // pluginName → subscription
    mu            sync.RWMutex
    logger        ports.Logger
}

type eventSubscription struct {
    patterns      []string
    pluginName    string
    eventCh       chan *DomainEvent  // Buffered
    done          chan struct{}       // Signal goroutine exit
    deliverer     EventDeliverer      // Calls plugin RPC
}

Responsibilities:

  • Pattern matching (glob on event types)
  • Per-plugin channel management
  • Async delivery goroutines
  • Cycle detection
  • Back-pressure handling

gRPC Event Adapter (Infrastructure)

type grpcEventAdapter struct {
    client    pluginv1.EventServiceClient
    pluginName string
    timeout   time.Duration
    logger    ports.Logger
}

func (a *grpcEventAdapter) DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) {
    req := domainEventToProto(event)
    resp, err := a.client.HandleEvent(ctx, req)
    if err != nil {
        return nil, err
    }
    return protoToDomainEvents(resp.EmittedEvents), nil
}

Responsibilities:

  • Domain ↔ proto conversions
  • Timeout handling
  • Emitted event extraction

EventSubscriber Interface (SDK)

type EventSubscriber interface {
    Patterns() []string
    HandleEvent(ctx context.Context, event Event) ([]Event, error)
}

Plugin Implementation:

func (p *MyPlugin) Patterns() []string {
    return []string{"workflow.*", "step.failed"}
}

func (p *MyPlugin) HandleEvent(ctx context.Context, event Event) ([]Event, error) {
    // Handle event, optionally emit new events
    return emittedEvents, err
}

F092 Integration: StreamManager and EventDeliverer

F092 introduces minimal coupling by leveraging the existing EventDeliverer interface seam:

type EventDeliverer interface {
    DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error)
}

Before F092:

  • EventBus.Subscribe() takes a grpcEventAdapter (implements EventDeliverer)
  • grpcEventAdapter calls HandleEvent unary RPC

After F092:

  • EventBus.Subscribe() takes either:
    • grpcEventAdapter (unary, fallback)
    • streamDeliverer (streaming, preferred)
  • StreamManager decides which deliverer to use per plugin
  • No changes to EventBus internals

SelectionLogic:

func (sm *StreamManager) GetDeliverer(pluginName string, unaryFallback EventDeliverer) EventDeliverer {
    if sm.HasStream(pluginName) {
        return &streamDeliverer{
            stream:   sm.streams[pluginName],
            fallback: unaryFallback,
        }
    }
    return unaryFallback
}

Fallback Handling:

If a streaming send fails (stream broken, timeout, or plugin doesn’t support streaming), the streamDeliverer automatically falls back to unary:

func (d *streamDeliverer) DeliverEvent(ctx context.Context, event *DomainEvent) ([]*DomainEvent, error) {
    if err := d.stream.Send(eventStreamMessage); err != nil {
        // Detect "Unimplemented" gRPC status (plugin doesn't support StreamEvents)
        if isUnimplemented(err) {
            d.sm.UnregisterStream(d.pluginName)  // Mark stream as unavailable
        }
        // Fall back to unary delivery
        return d.fallback.DeliverEvent(ctx, event)
    }
    return nil, nil
}

This design ensures:

  • Backward compatibility: Existing unary path unchanged
  • Minimal surface area: No EventBus modifications
  • Graceful degradation: Streaming failure → unary (always works)
  • Per-plugin granularity: Each plugin independently uses streaming if available

Wiring (Interfaces Layer)

Initialization:

InitSystem()
    │
    ├─ Create EventBus
    │
    └─ Set on RPCPluginManager
        │
        └─ After plugin Init:
           └─ If plugin has "events" capability:
              └─ Create grpcEventAdapter
              └─ Call EventBus.Subscribe()

Execution:

run.go
    │
    ├─ initPluginSystem()
    │   └─ Get EventBus from SystemResult
    │
    └─ Create ExecutionSetup
        │
        └─ WithEventPublisher(eventBus)
            │
            └─ ExecutionService.SetEventPublisher(eventBus)
                │
                └─ On each lifecycle point:
                   └─ Emit domain event via EventPublisher.Publish()

Event Flow (End-to-End)

1. Workflow step completes
   └─ ExecutionService.Run() completes step
   
2. ExecutionService emits event
   └─ event := NewDomainEvent("step.completed", "core", metadata)
   └─ eventPublisher.Publish(ctx, event)
   
3. EventBus.Publish() routes event
   └─ For each subscription:
      └─ matchEventPattern("step.*", "step.completed") → true
      └─ Send event to plugin's channel (non-blocking)
   
4. Delivery goroutine processes event
   └─ Read from eventCh
   └─ Call plugin's HandleEvent RPC
   
5. Plugin's HandleEvent runs
   └─ Receives sdk.Event
   └─ Returns []sdk.Event (custom events to emit)
   
6. Emitted events routed
   └─ Increment PropagationDepth
   └─ Send to other subscribed plugins
   └─ If depth ≥ 3: halt propagation (log warning)

Lifecycle & Cleanup

Plugin Initialization:

plugin process starts
    │
    ├─ Plugin.Init() called
    │
    └─ Plugin has "events" capability:
       └─ EventBus.Subscribe(pluginName, patterns, adapter)
          └─ Start delivery goroutine

Plugin Shutdown:

Plugin process dies / removed via `awf plugin remove`
    │
    └─ EventBus.Unsubscribe(pluginName)
       │
       ├─ Close done channel
       │
       └─ Delivery goroutine exits (receives on done)
          │
          └─ Channel cleanup
          └─ Goroutine count back to baseline (5s max)

Graceful Shutdown:

awf run completes
    │
    └─ ExecutionService.Run() returns
    
    └─ run.go defers cleanup
       │
       └─ pluginResult.Cleanup()
          │
          └─ EventBus.Close()
             │
             └─ Unsubscribe all plugins
             └─ Wait for delivery goroutines

Non-Functional Requirements

Latency (NFR-001)

Target: Event delivery to plugin HandleEvent < 10ms (p95) for idle streams.

Achieved via:

  • In-process EventBus (no network hop)
  • Buffered channels (no allocation per event)
  • Immediate async delivery (no queuing beyond buffer)

Isolation (NFR-002)

Slow plugin must not block event delivery to other plugins.

Achieved via:

  • Per-plugin buffered channels (independent)
  • Non-blocking send (drops on full)
  • Async delivery goroutine per plugin

Cleanup (NFR-003)

Stream disconnection cleanup within 5 seconds, no goroutine leaks.

Achieved via:

  • done channel signals goroutine exit
  • Delivery goroutine reads from done in select
  • Channel cleanup on exit

Back-Pressure (NFR-004)

Slow/crashed plugins don’t cause buffer overflow in EventBus core.

Achieved via:

  • Non-blocking send to per-plugin channel
  • 256-event buffer per plugin
  • Drops logged as warnings
  • EventBus core never blocks on plugin channels

Secret Masking (NFR-005)

Event metadata/payload must mask secrets in logs.

Implementation:

func emitEvent(ctx context.Context, eventType, source string, metadata map[string]string) {
    // Mask secrets in metadata
    masked := logger.SecretMasker.Mask(metadata)
    event := NewDomainEvent(eventType, source, masked, nil)
    eventPublisher.Publish(ctx, event)
}

Backward Compatibility (NFR-006)

Existing plugins without events capability unaffected.

Achieved via:

  • Optional capability gating
  • EventSubscriber on BasePlugin has no-op defaults
  • Manifest validation allows empty events field

Testing Strategy

Unit Tests

  • Pattern Matching (11 cases): exact, wildcard single/multi-segment, empty inputs
  • EventBus Publishing: delivery to matching subscribers, no-match skip
  • Buffer Management: drop on full, warning logged
  • Cycle Detection: depth 3 halt, propagation prevented
  • Goroutine Cleanup: baseline count restored after unsubscribe
  • gRPC Adapter: proto conversion, RPC calls
  • SDK Event Defaults: BasePlugin no-op, custom subscriber dispatch

Integration Tests

  • Plugin with events capability receives workflow.completed
  • Two plugins: A emits → B receives with source attribution
  • Goroutine baseline restored after full cleanup
  • All existing plugin tests pass without modification

Recent Enhancements (F092)

F092 introduced two major enhancements:

  1. Plugin-to-Host Event Emission — Plugins can now emit events via HostClient, not just return events from HandleEvent
  2. Optimized Event Delivery — Persistent client-side gRPC streams reduce latency and RPC overhead

These features completed the bidirectional event system (host→plugin via EventBus, plugin→host via broker) and optimized the high-throughput case.

Future Extensions

Persistent Event Log

Status: Deferred

Rationale: Fire-and-forget sufficient for v0.8+; replay adds complexity (durable storage, message ordering guarantees)

Fine-Grained Filters

Status: Deferred

Rationale: Glob on event type covers 90% of use cases; field-level filtering adds protocol complexity

OTel Correlation

Status: Deferred to OTel roadmap progress

Rationale: Events carry metadata for manual correlation; automatic integration requires OTel lib decisions

Bidirectional Streaming

Status: Not planned (client-side streaming sufficient)

Rationale: F092 implements client-side streaming (host→plugin) for optimal push delivery. Bidirectional streaming adds complexity without demonstrated benefit — plugin→host uses broker’s separate Emit RPC instead

See Also