Plugin Events
Plugin Events enable real-time communication between AWF core and plugins, and between plugins themselves. This guide covers subscribing to workflow lifecycle events, emitting custom events, and building event-driven plugins.
Overview
AWF emits structured events at each step of workflow execution:
awf run workflow
├─ workflow.started (core event)
├─ step.started (core event)
├─ step.completed (core event)
├─ step.started (core event)
├─ step.completed (core event)
├─ workflow.completed (core event)
└─ [custom events from plugins]Plugins can:
- Subscribe to core events — react to workflow/step lifecycle (
workflow.started,step.failed, etc.) - Subscribe to custom events — react to events from other plugins
- Emit custom events — notify other plugins of plugin-specific milestones, either by returning events from
HandleEventor by callingHostClient.Emit()at any time during execution - Use glob patterns — subscribe to event families (
workflow.*,step.*) - Receive events via streaming — opt into persistent gRPC streaming for lower-latency delivery (automatic fallback to unary RPCs)
All communication happens in real-time via gRPC without the plugin polling or managing connections. The host uses GRPCBroker to expose a reverse channel that plugins can use to emit events back at runtime.
Subscribing to Events
1. Implement EventSubscriber
Add the EventSubscriber interface to your plugin:
type NotificationPlugin struct {
sdk.BasePlugin
config map[string]string
}
// Patterns declares which events this plugin cares about
func (p *NotificationPlugin) Patterns() []string {
return []string{
"workflow.completed", // Exact match
"step.failed", // Specific event
"step.*", // All step events
}
}
// HandleEvent is invoked when a matching event occurs
func (p *NotificationPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
switch event.Type {
case "workflow.completed":
return p.notifySuccess(ctx, event)
case "workflow.failed":
return p.notifyFailure(ctx, event)
case "step.failed":
return p.logError(ctx, event)
}
return nil, nil
}2. Declare Events in Manifest
Update plugin.yaml to declare subscription patterns:
name: awf-plugin-notify
version: 1.0.0
awf_version: ">=0.7.0"
capabilities:
- events
events:
subscribe:
- "workflow.completed"
- "workflow.failed"
- "step.failed"3. Test with awf run
Install your plugin and run a workflow:
make install
awf plugin enable awf-plugin-notify
awf run example-workflowAWF will invoke HandleEvent on your plugin each time a matching event occurs.
Core Events Reference
Workflow Events
workflow.started — Emitted when workflow execution begins
Event{
ID: "uuid-1234",
Type: "workflow.started",
Source: "core",
Timestamp: time.Now(),
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"workflow_name": "deploy-app",
"version": "1.0.0",
},
}workflow.completed — Emitted when workflow succeeds
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"workflow_name": "deploy-app",
"duration": "45s",
"status": "success",
}workflow.failed — Emitted when workflow fails
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"workflow_name": "deploy-app",
"error_message": "step 'deploy' failed: exit code 1",
"failed_step": "deploy",
}Step Events
step.started — Emitted before step execution
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"step_name": "validate",
"step_type": "step",
}step.completed — Emitted after successful step
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"step_name": "validate",
"duration": "2s",
}step.failed — Emitted after failed step
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"step_name": "validate",
"error_message": "validation failed: missing version",
"exit_code": "1",
}step.retrying — Emitted before retry attempt
Metadata: map[string]string{
"workflow_id": "wf-abc-123",
"step_name": "deploy",
"attempt": "2",
"max_attempts": "3",
}Emitting Custom Events
Plugins can emit events in two ways:
- Via
HandleEventreturn value — Emit events as a response to received events - Via
HostClient— Emit events directly to the host at any time (F092+)
Method 1: Return Events from HandleEvent
Emit events that other plugins subscribe to by returning them from HandleEvent:
Example: Deploy Plugin → Notification Plugin
Deploy Plugin (awf-plugin-deploy):
func (p *DeployPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
if event.Type != "step.completed" {
return nil, nil
}
if event.Metadata["step_name"] == "production-deployment" {
// Emit custom event that notification plugin can react to
return []sdk.Event{
{
Type: "deploy.completed",
Source: p.PluginName, // "awf-plugin-deploy"
Metadata: map[string]string{
"environment": "production",
"version": "v2.1.0",
"status": "success",
},
},
}, nil
}
return nil, nil
}Notification Plugin (awf-plugin-notify) subscribes:
# plugin.yaml
events:
subscribe:
- "deploy.*" # Matches "deploy.completed", "deploy.rolled_back", etc.func (p *NotificationPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
if event.Type == "deploy.completed" {
return []sdk.Event{
{
Type: "notification.sent",
Source: p.PluginName,
Metadata: map[string]string{
"channel": "slack",
"recipient": "team-devops",
},
},
}, nil
}
return nil, nil
}Pattern Matching Rules
Patterns use . as a segment separator. * matches one segment, not multiple:
| Pattern | Matches | Does NOT Match |
|---|---|---|
workflow.started | workflow.started (exact) | workflow.completed |
workflow.* | workflow.started, workflow.completed, workflow.failed | workflow.step.started |
step.* | step.started, step.completed, step.failed, step.retrying | step.database.query |
*.* | All two-segment events | One-segment events |
* | One-segment events only | Multi-segment events |
deploy.* | deploy.completed, deploy.rolled_back | system.deploy.complete |
Subscribing to ALL events:
func (p *AuditPlugin) Patterns() []string {
return []string{"*.*"} // Matches all two-segment events
}Real-World Examples
Audit Logger Plugin
Logs all workflow and step events to a file:
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/awf-project/cli/pkg/plugin/sdk"
)
type AuditPlugin struct {
sdk.BasePlugin
logFile *os.File
}
func (p *AuditPlugin) Patterns() []string {
return []string{"*.*"} // All events
}
func (p *AuditPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
// Log to audit file
entry := fmt.Sprintf("[%s] %s (source: %s) | %v\n",
time.Now().Format(time.RFC3339),
event.Type,
event.Source,
event.Metadata,
)
p.logFile.WriteString(entry)
return nil, nil
}
func main() {
f, _ := os.OpenFile("audit.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
sdk.Serve(&AuditPlugin{
BasePlugin: sdk.BasePlugin{
PluginName: "awf-plugin-audit",
PluginVersion: "1.0.0",
},
logFile: f,
})
}Metrics Plugin
Counts events and exports to Prometheus:
package main
import (
"context"
"sync"
"github.com/awf-project/cli/pkg/plugin/sdk"
"github.com/prometheus/client_golang/prometheus"
)
type MetricsPlugin struct {
sdk.BasePlugin
workflowsCompleted prometheus.Counter
stepsFailed prometheus.Counter
mu sync.Mutex
}
func (p *MetricsPlugin) Patterns() []string {
return []string{"workflow.*", "step.*"}
}
func (p *MetricsPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
p.mu.Lock()
defer p.mu.Unlock()
switch event.Type {
case "workflow.completed":
p.workflowsCompleted.Inc()
case "step.failed":
p.stepsFailed.Inc()
}
return nil, nil
}
func main() {
sdk.Serve(&MetricsPlugin{
BasePlugin: sdk.BasePlugin{
PluginName: "awf-plugin-metrics",
PluginVersion: "1.0.0",
},
workflowsCompleted: prometheus.NewCounter(prometheus.CounterOpts{
Name: "awf_workflows_completed_total",
}),
stepsFailed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "awf_steps_failed_total",
}),
})
}Slack Notifier Plugin
Sends notifications to Slack channels:
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"github.com/awf-project/cli/pkg/plugin/sdk"
)
type SlackNotifier struct {
sdk.BasePlugin
webhookURL string
}
func (p *SlackNotifier) Patterns() []string {
return []string{"workflow.completed", "workflow.failed"}
}
func (p *SlackNotifier) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
message := fmt.Sprintf(":wave: Workflow `%s` %s",
event.Metadata["workflow_name"],
event.Type,
)
if event.Type == "workflow.failed" {
message = fmt.Sprintf(":x: Workflow `%s` failed: %s",
event.Metadata["workflow_name"],
event.Metadata["error_message"],
)
}
payload := map[string]string{"text": message}
data, _ := json.Marshal(payload)
http.Post(p.webhookURL, "application/json", bytes.NewReader(data))
return nil, nil
}
func main() {
sdk.Serve(&SlackNotifier{
BasePlugin: sdk.BasePlugin{
PluginName: "awf-plugin-slack",
PluginVersion: "1.0.0",
},
webhookURL: os.Getenv("SLACK_WEBHOOK_URL"),
})
}Method 2: Emit Directly via HostClient
For long-running operations, async work, or independent event emission, use HostClient to emit events directly to the host at any time:
import (
"context"
"encoding/json"
"github.com/awf-project/cli/pkg/plugin/sdk"
)
type AnalysisPlugin struct {
sdk.BasePlugin
hostClient *sdk.HostClient
}
// Plugin receives broker connection during initialization
func (p *AnalysisPlugin) SetHostClient(client *sdk.HostClient) {
p.hostClient = client
}
func (p *AnalysisPlugin) Operation(ctx context.Context, req *sdk.OperationRequest) (*sdk.OperationResponse, error) {
// Do analysis work...
result := analyzeCode(req.Input)
// Emit event directly via HostClient (doesn't wait for HandleEvent call)
if p.hostClient != nil {
payload, _ := json.Marshal(map[string]any{
"file": req.Input,
"severity": result.Severity,
"issues": len(result.Issues),
})
p.hostClient.Emit(ctx, "analysis.complete", payload, map[string]string{
"status": result.Status,
})
}
return &sdk.OperationResponse{Output: result.Summary}, nil
}
func main() {
sdk.Serve(&AnalysisPlugin{
BasePlugin: sdk.BasePlugin{
PluginName: "awf-plugin-analysis",
PluginVersion: "1.0.0",
},
})
}Requirements for HostClient.Emit():
- Declare emit patterns in manifest — The
events.emitfield must list all event types your plugin can emit:
name: awf-plugin-analysis
version: 1.0.0
awf_version: ">=0.8.0"
capabilities:
- operations
- events
events:
emit:
- "analysis.*" # Pattern: analysis.complete, analysis.failed, etc.
- "code.scanned" # Specific event- Implement
SetHostClient— The framework calls this during plugin initialization to pass the broker connection:
func (p *MyPlugin) SetHostClient(client *sdk.HostClient) {
p.hostClient = client
}- Check for nil —
HostClientis only available if the host supports broker communication (AWF v0.8.0+). Always check before using:
if p.hostClient != nil {
p.hostClient.Emit(ctx, "event.type", payload, metadata)
}Error Handling:
Emit calls can fail if:
- Plugin doesn’t declare the event type in
events.emit(authorization denied) - Event type not correctly declared (misspelled pattern)
- Host’s event system is temporarily unavailable (rare)
Handle errors gracefully — emit failures shouldn’t break your operation:
if p.hostClient != nil {
if err := p.hostClient.Emit(ctx, eventType, payload, metadata); err != nil {
p.logger.Warn("emit failed (continuing anyway)", "event", eventType, "error", err)
}
}Handling Errors
If HandleEvent returns an error, the event is logged but doesn’t block event delivery to other plugins:
func (p *MyPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
if event.Type == "step.failed" {
// Error doesn't break the pipeline
return nil, fmt.Errorf("notification service temporarily unavailable")
}
return nil, nil
}Plugins should handle timeouts gracefully by using context:
func (p *MyPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
// Respect context cancellation
select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-p.doAsyncWork():
return []sdk.Event{result}, nil
}
}Streaming Event Delivery
By default, AWF delivers events to plugins via individual unary HandleEvent RPCs — one RPC per event. For event-heavy workflows, this creates overhead from repeated connection round-trips.
Plugins that implement the StreamEvents RPC receive events over a persistent gRPC stream instead. The host (gRPC client) pushes events via Send(), and the plugin (gRPC server) receives them in a Recv() loop. This is automatic — plugins that support streaming get it; those that don’t continue using unary delivery.
How It Works
Host detects plugin supports StreamEvents
│
├─ Opens persistent stream connection
│
└─ All subsequent events use stream.Send()
instead of individual HandleEvent RPCsAutomatic Fallback
If a plugin does not implement StreamEvents, the host detects the gRPC Unimplemented status and falls back to unary HandleEvent transparently. No configuration needed.
If an active stream breaks (plugin crash, network issue), the StreamManager detects the disconnect within 5 seconds and falls back to unary delivery. Three consecutive send timeouts also trigger stream teardown and fallback.
When to Use Streaming
Streaming is beneficial when:
- Your plugin receives many events in rapid succession (parallel step execution)
- Latency between event emission and handling is critical
- A workflow emits 100+ events to your plugin per run
For plugins that handle a few events per workflow run, unary delivery is equally effective.
Performance Considerations
Event Buffer Limits:
- Each plugin has a 256-event buffer
- If buffer fills, new events are dropped with a warning
- Slow
HandleEventimplementations can cause buffer overflow
Solutions:
- Keep
HandleEventimplementations fast (< 100ms ideal) - Use goroutines for blocking I/O:
func (p *MyPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) { go p.sendAsync(event) // Don't block HandleEvent return nil, nil }
Cycle Prevention:
- Event propagation limited to depth 3
- Circular event chains automatically halt with a warning
- No manual cycle detection needed
Debugging Events
Logging All Events
Create a debug plugin to inspect event flow:
func (p *DebugPlugin) Patterns() []string {
return []string{"*.*"}
}
func (p *DebugPlugin) HandleEvent(ctx context.Context, event sdk.Event) ([]sdk.Event, error) {
fmt.Printf("Event: %s (from %s)\n", event.Type, event.Source)
fmt.Printf(" ID: %s\n", event.ID)
fmt.Printf(" Metadata: %v\n", event.Metadata)
return nil, nil
}Checking Plugin Status
awf plugin list # Shows enabled plugins with capabilitiesSee Also
- Plugins Guide - Complete plugin reference
- Plugin Event Architecture - EventBus, gRPC adapter, wiring, and design decisions
- Architecture - Overall AWF architecture