Agent skill

interactor-webhooks

Receive real-time updates from Interactor via webhooks (push) or Server-Sent Events (pull). Use when building real-time UIs, monitoring credential changes, tracking workflow progress, or streaming AI chat responses.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/interactor-webhooks

SKILL.md

Interactor Webhooks and Streaming Skill

Receive real-time updates from Interactor via webhooks (push to your server) or Server-Sent Events (pull from browser/client).

When to Use

  • Credential Monitoring: React to credential status changes (expired, revoked)
  • Workflow Notifications: Get notified when workflows complete or need input
  • Real-time Chat: Stream AI assistant responses to users
  • Live Dashboards: Display real-time workflow progress
  • Event-Driven Architecture: Trigger actions based on Interactor events

Prerequisites

  • Interactor authentication configured (see interactor-auth skill)
  • HTTPS endpoint for webhooks (required for production)
  • Understanding of webhook security (signature verification)

Webhooks vs SSE: When to Use Each

Use Case Recommended Reason
Backend notifications Webhooks Server-to-server, reliable delivery
Credential status changes Webhooks Background processing, no UI needed
Workflow completion Webhooks Trigger backend actions
Real-time chat UI SSE Low latency, browser-native
Live workflow progress SSE Visual feedback for users
Streaming AI responses SSE Token-by-token display

General Rule: Use webhooks for backend-to-backend, SSE for frontend real-time updates.


Webhooks

Webhooks push events to your server when things happen in Interactor.

Available Event Types

bash
curl https://core.interactor.com/api/v1/webhooks/event-types \
  -H "Authorization: Bearer <token>"

Response:

json
{
  "data": {
    "event_types": [
      "credential.created",
      "credential.refreshed",
      "credential.expired",
      "credential.revoked",
      "workflow.instance.created",
      "workflow.instance.completed",
      "workflow.instance.failed",
      "workflow.instance.halted",
      "agent.room.message",
      "agent.room.closed"
    ]
  }
}

Note: Additional events like workflow.instance.resumed and tool invocation events are available via SSE streams only. See the SSE section for details.

Event Categories

Category Webhook Events Description
Credentials credential.created, credential.refreshed, credential.expired, credential.revoked OAuth token lifecycle
Workflows workflow.instance.created, workflow.instance.completed, workflow.instance.failed, workflow.instance.halted Workflow execution status
Agents agent.room.message, agent.room.closed AI chat events

SSE-Only Events: workflow.instance.resumed, tool_use, tool_result are available via Server-Sent Events streams only.

Schema Versioning Policy

Interactor follows these principles for webhook payload changes:

Change Type Versioning Example
New optional fields Non-breaking, no version bump Adding metadata field to events
New event types Non-breaking, subscribe to receive credential.metadata_updated
Field type changes Breaking, announced 90 days ahead amount from string to number
Field removal Breaking, announced 90 days ahead Removing deprecated fields
Payload restructure New API version (v2) Complete payload format change

Best practices for forward compatibility:

  • Ignore unknown fields (don't fail on extra properties)
  • Use optional types for new fields: metadata?: Record<string, unknown>
  • Subscribe to Interactor changelog for breaking change announcements
  • Test against the /webhooks/:id/test endpoint after updates

Complete Event Mapping Table

Event Trigger Delivery Typical Handler Action
credential.created User completes OAuth flow Webhook Log for audit, update UI state
credential.refreshed Token auto-refreshed Webhook Log for audit (usually no action needed)
credential.expired Refresh token failed Webhook Notify user to reconnect, disable features
credential.revoked User revoked via provider Webhook Notify user to reconnect, disable features
workflow.instance.created Workflow started Webhook Track in analytics, show in dashboard
workflow.instance.halted Workflow needs user input Webhook Notify user, show input form
workflow.instance.completed Workflow finished successfully Webhook Process results, update records
workflow.instance.failed Workflow error Webhook Alert ops, log error details
agent.room.message AI sent complete message Webhook Forward to push notification or websocket
agent.room.closed Chat session ended Webhook Log analytics, cleanup resources
state_changed Workflow state transition SSE Update progress UI
workflow_data_updated Workflow data modified SSE Refresh displayed data
halted Workflow needs input SSE Show input form
resumed User provided input SSE Update UI, show processing
completed Workflow finished SSE Show completion, redirect
message Complete message received SSE Display in chat
message_start AI started responding SSE Show typing indicator
message_delta Token received SSE Append to streaming message
message_end AI finished message SSE Finalize message, enable input
tool_use AI invoked a tool SSE Show tool activity indicator
tool_result Tool returned result SSE Display tool result (optional)
heartbeat Connection keepalive SSE Reset connection health timer

Permissions & RBAC

Webhook management requires specific permissions in Interactor:

Action Required Permission Who Has It
List webhooks webhooks:read Admin, Developer
Create webhook webhooks:write Admin, Developer
Update webhook webhooks:write Admin, Developer
Delete webhook webhooks:delete Admin only
Regenerate secret webhooks:write Admin, Developer
View delivery history webhooks:read Admin, Developer

API Token Scopes:

When creating API tokens for webhook management, request these scopes:

  • webhooks - Full webhook management (read + write + delete)
  • webhooks:read - Read-only access to webhook configuration
  • webhooks:write - Create and update (no delete)
bash
# Token with full webhook access
curl -X POST https://core.interactor.com/api/v1/tokens \
  -H "Authorization: Bearer <admin_token>" \
  -d '{"name": "Webhook Manager", "scopes": ["webhooks"]}'

Security Note: Webhook secrets are only shown once at creation and regeneration. Store them securely in environment variables or a secrets manager.


Instructions

Step 1: Create a Webhook

bash
curl -X POST https://core.interactor.com/api/v1/webhooks \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://yourapp.com/webhooks/interactor",
    "events": [
      "credential.expired",
      "credential.revoked",
      "workflow.instance.completed",
      "workflow.instance.halted"
    ],
    "enabled": true
  }'

Response:

json
{
  "data": {
    "id": "wh_abc",
    "url": "https://yourapp.com/webhooks/interactor",
    "secret": "whsec_xyz_SAVE_THIS",
    "events": [
      "credential.expired",
      "credential.revoked",
      "workflow.instance.completed",
      "workflow.instance.halted"
    ],
    "enabled": true,
    "created_at": "2026-01-20T12:00:00Z"
  }
}

CRITICAL: Save the secret - you'll need it to verify webhook signatures. It's only shown once!

Step 2: List Webhooks

bash
curl https://core.interactor.com/api/v1/webhooks \
  -H "Authorization: Bearer <token>"

Response:

json
{
  "data": {
    "webhooks": [
      {
        "id": "wh_abc",
        "url": "https://yourapp.com/webhooks/interactor",
        "events": ["credential.expired", "workflow.instance.completed"],
        "enabled": true,
        "created_at": "2026-01-20T12:00:00Z",
        "last_delivery_at": "2026-01-20T12:30:00Z",
        "last_delivery_status": "delivered"
      }
    ]
  }
}

Step 3: Get Webhook Details

bash
curl https://core.interactor.com/api/v1/webhooks/wh_abc \
  -H "Authorization: Bearer <token>"

Step 4: Update Webhook

bash
curl -X PUT https://core.interactor.com/api/v1/webhooks/wh_abc \
  -H "Authorization: Bearer <token>" \
  -H "Content-Type: application/json" \
  -d '{
    "events": ["credential.created", "credential.expired"],
    "url": "https://yourapp.com/webhooks/v2/interactor"
  }'

Step 5: Toggle Webhook (Enable/Disable)

bash
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/toggle \
  -H "Authorization: Bearer <token>"

Step 6: Delete Webhook

bash
curl -X DELETE https://core.interactor.com/api/v1/webhooks/wh_abc \
  -H "Authorization: Bearer <token>"

Step 7: Regenerate Secret

If your secret is compromised:

bash
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/regenerate-secret \
  -H "Authorization: Bearer <token>"

Response:

json
{
  "data": {
    "id": "wh_abc",
    "secret": "whsec_NEW_SECRET_SAVE_THIS",
    "regenerated_at": "2026-01-20T12:00:00Z"
  }
}

CRITICAL: The new secret is only shown once. Update your webhook handler immediately with the new secret.

Step 8: View Recent Events

See delivery history and debug issues:

bash
curl https://core.interactor.com/api/v1/webhooks/wh_abc/events \
  -H "Authorization: Bearer <token>"

Response:

json
{
  "data": {
    "events": [
      {
        "id": "evt_123",
        "type": "workflow.instance.completed",
        "delivered_at": "2026-01-20T12:00:00Z",
        "status": "delivered",
        "response_code": 200,
        "response_time_ms": 145
      },
      {
        "id": "evt_122",
        "type": "credential.expired",
        "delivered_at": "2026-01-20T11:55:00Z",
        "status": "failed",
        "response_code": 500,
        "retry_count": 2
      }
    ]
  }
}

Step 9: Test Webhook

Send a test event to verify your endpoint:

bash
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/test \
  -H "Authorization: Bearer <token>"

Webhook Payload Format

All webhook events follow this structure:

json
{
  "id": "evt_abc123",
  "type": "workflow.instance.completed",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "instance_id": "inst_xyz",
    "workflow_name": "approval_workflow",
    "namespace": "user_123",
    "status": "completed",
    "output": {
      "approved": true,
      "amount": 5000
    }
  }
}

Event-Specific Payloads

credential.created:

json
{
  "id": "evt_001",
  "type": "credential.created",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "credential_id": "cred_abc",
    "service_id": "google_calendar",
    "service_name": "Google Calendar",
    "namespace": "user_123",
    "scopes": ["calendar.readonly", "calendar.events"]
  }
}

credential.refreshed:

json
{
  "id": "evt_002",
  "type": "credential.refreshed",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "credential_id": "cred_abc",
    "service_id": "google_calendar",
    "service_name": "Google Calendar",
    "namespace": "user_123",
    "expires_at": "2026-01-20T13:00:00Z"
  }
}

credential.expired:

json
{
  "id": "evt_003",
  "type": "credential.expired",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "credential_id": "cred_abc",
    "service_id": "google_calendar",
    "service_name": "Google Calendar",
    "namespace": "user_123",
    "reason": "refresh_token_invalid"
  }
}

credential.revoked:

json
{
  "id": "evt_004",
  "type": "credential.revoked",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "credential_id": "cred_abc",
    "service_id": "google_calendar",
    "service_name": "Google Calendar",
    "namespace": "user_123",
    "reason": "user_revoked_access"
  }
}

workflow.instance.created:

json
{
  "id": "evt_005",
  "type": "workflow.instance.created",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "instance_id": "inst_xyz",
    "workflow_name": "approval_workflow",
    "workflow_id": "wf_abc",
    "namespace": "user_123",
    "initial_input": {
      "request_id": "req_456",
      "amount": 5000
    }
  }
}

workflow.instance.halted:

json
{
  "id": "evt_006",
  "type": "workflow.instance.halted",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "instance_id": "inst_xyz",
    "workflow_name": "approval_workflow",
    "namespace": "user_123",
    "current_state": "await_approval",
    "halting_presentation": {
      "type": "form",
      "title": "Approval Required",
      "fields": [...]
    }
  }
}

workflow.instance.completed:

json
{
  "id": "evt_007",
  "type": "workflow.instance.completed",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "instance_id": "inst_xyz",
    "workflow_name": "approval_workflow",
    "namespace": "user_123",
    "final_state": "approved",
    "workflow_data": {
      "request_id": "req_456",
      "approved": true,
      "amount": 5000
    }
  }
}

workflow.instance.failed:

json
{
  "id": "evt_008",
  "type": "workflow.instance.failed",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "instance_id": "inst_xyz",
    "workflow_name": "approval_workflow",
    "namespace": "user_123",
    "failed_state": "process_payment",
    "error": {
      "code": "payment_declined",
      "message": "Card was declined by issuer"
    }
  }
}

agent.room.message:

json
{
  "id": "evt_009",
  "type": "agent.room.message",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "room_id": "room_xyz",
    "assistant_id": "asst_abc",
    "namespace": "user_123",
    "message_id": "msg_123",
    "role": "assistant",
    "content": "Here's what I found about your billing question..."
  }
}

agent.room.closed:

json
{
  "id": "evt_010",
  "type": "agent.room.closed",
  "timestamp": "2026-01-20T12:00:00Z",
  "data": {
    "room_id": "room_xyz",
    "assistant_id": "asst_abc",
    "namespace": "user_123",
    "reason": "user_closed",
    "message_count": 15,
    "duration_seconds": 300
  }
}

Note: Not all events require explicit handlers. For example, credential.created and credential.refreshed are often only logged for audit purposes, while workflow.instance.created may only need tracking in analytics systems.


Verifying Webhook Signatures

CRITICAL: Always verify signatures to ensure webhooks came from Interactor.

Signature Header Format

Webhooks include two headers for verification:

X-Interactor-Signature: sha256=<64 hex characters>
X-Interactor-Timestamp: 2026-01-20T12:00:00Z

Example:

X-Interactor-Signature: sha256=a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4e5f6a1b2
X-Interactor-Timestamp: 2026-01-20T12:00:00Z

Format validation: The signature header MUST match the format sha256= followed by exactly 64 lowercase hexadecimal characters. Reject any other format.

Preventing Replay Attacks

CRITICAL: Always validate the timestamp to prevent replay attacks.

  1. Parse X-Interactor-Timestamp as ISO8601
  2. Reject requests where |now - timestamp| > allowed_skew (recommended: 5 minutes)
  3. Verify signature only after timestamp validation passes
typescript
const MAX_TIMESTAMP_SKEW_SECONDS = 300; // 5 minutes

function validateTimestamp(timestampHeader: string | undefined): boolean {
  if (!timestampHeader) return false;

  const timestamp = new Date(timestampHeader);
  if (isNaN(timestamp.getTime())) return false;

  const now = Date.now();
  const diff = Math.abs(now - timestamp.getTime());

  return diff <= MAX_TIMESTAMP_SKEW_SECONDS * 1000;
}

Key Rotation & Multiple Active Secrets

When rotating webhook secrets, you may have a period where both old and new secrets are valid:

typescript
function verifyWithMultipleSecrets(
  payload: string,
  signatureHeader: string,
  secrets: string[]
): boolean {
  for (const secret of secrets) {
    if (isValidSignature(signatureHeader, Buffer.from(payload), secret)) {
      return true;
    }
  }
  return false;
}

// During rotation, configure both secrets:
const WEBHOOK_SECRETS = [
  process.env.INTERACTOR_WEBHOOK_SECRET!,           // Current secret
  process.env.INTERACTOR_WEBHOOK_SECRET_PREVIOUS!,  // Previous secret (optional)
].filter(Boolean);

Rotation procedure:

  1. Generate new secret via /regenerate-secret endpoint
  2. Deploy new secret to INTERACTOR_WEBHOOK_SECRET
  3. Keep old secret in INTERACTOR_WEBHOOK_SECRET_PREVIOUS for 24-48 hours
  4. Remove old secret after all in-flight events have been delivered

TypeScript/Node.js Verification

typescript
import crypto from 'crypto';
import express from 'express';

const app = express();

const MAX_TIMESTAMP_SKEW_MS = 5 * 60 * 1000; // 5 minutes

/**
 * Validates the X-Interactor-Timestamp header to prevent replay attacks.
 * Returns true if the timestamp is within the allowed skew window.
 */
function validateTimestamp(timestampHeader: string | undefined): boolean {
  if (!timestampHeader) return false;

  const timestamp = new Date(timestampHeader);
  if (isNaN(timestamp.getTime())) return false;

  const diff = Math.abs(Date.now() - timestamp.getTime());
  return diff <= MAX_TIMESTAMP_SKEW_MS;
}

/**
 * Validates and verifies the webhook signature using timing-safe comparison.
 * Properly handles the sha256= prefix and validates hex format.
 *
 * IMPORTANT: This function never throws - it returns false for any invalid input.
 */
function isValidSignature(
  signatureHeader: string | undefined,
  payload: Buffer,
  secret: string
): boolean {
  // Guard: header must exist
  if (!signatureHeader) return false;

  // Guard: header must match exact format sha256=<64 hex chars>
  const match = signatureHeader.match(/^sha256=([0-9a-f]{64})$/);
  if (!match) return false;

  const providedSignature = match[1];

  // Compute expected signature
  const expectedSignature = crypto
    .createHmac('sha256', secret)
    .update(payload)
    .digest('hex');

  // Convert to buffers for timing-safe comparison
  // Both are now guaranteed to be 64 hex chars = 32 bytes when decoded
  const providedBuffer = Buffer.from(providedSignature, 'hex');
  const expectedBuffer = Buffer.from(expectedSignature, 'hex');

  // Length check (should always pass given regex, but defense in depth)
  if (providedBuffer.length !== expectedBuffer.length) return false;

  return crypto.timingSafeEqual(providedBuffer, expectedBuffer);
}

// IMPORTANT: Use raw body for signature verification
app.post(
  '/webhooks/interactor',
  express.raw({ type: 'application/json' }),
  (req, res) => {
    const signatureHeader = req.headers['x-interactor-signature'] as string;
    const timestampHeader = req.headers['x-interactor-timestamp'] as string;
    const payload = req.body; // Keep as Buffer

    // Step 1: Validate timestamp (prevent replay attacks)
    if (!validateTimestamp(timestampHeader)) {
      console.warn('Webhook rejected: invalid or stale timestamp');
      return res.status(401).json({ error: 'invalid_timestamp' });
    }

    // Step 2: Verify signature
    if (!isValidSignature(signatureHeader, payload, process.env.INTERACTOR_WEBHOOK_SECRET!)) {
      console.warn('Webhook rejected: invalid signature');
      return res.status(401).json({ error: 'invalid_signature' });
    }

    // Step 3: Parse and handle event
    let event: WebhookEvent;
    try {
      event = JSON.parse(payload.toString());
    } catch {
      return res.status(400).json({ error: 'invalid_json' });
    }

    console.log(`Received event: ${event.type} (${event.id})`);

    // Handle asynchronously - respond immediately
    handleWebhookEvent(event).catch((err) => {
      console.error(`Failed to process event ${event.id}:`, err);
    });

    // Always respond quickly (< 5 seconds)
    res.status(200).json({ received: true });
  }
);

async function handleWebhookEvent(event: WebhookEvent) {
  switch (event.type) {
    case 'credential.expired':
    case 'credential.revoked':
      // Notify user to reconnect their account
      await notifyUserToReconnect(
        event.data.namespace,
        event.data.service_name
      );
      break;

    case 'workflow.instance.halted':
      // Notify user they have a pending approval
      await notifyUserOfPendingApproval(
        event.data.namespace,
        event.data.instance_id,
        event.data.halting_presentation
      );
      break;

    case 'workflow.instance.completed':
      // Process completed workflow
      await processCompletedWorkflow(
        event.data.instance_id,
        event.data.workflow_data
      );
      break;

    case 'workflow.instance.failed':
      // Handle workflow failure
      await handleWorkflowFailure(
        event.data.namespace,
        event.data.instance_id,
        event.data.error,
        event.data.failed_state
      );
      break;

    case 'agent.room.message':
      // Forward message to real-time channel (if not using SSE)
      await forwardMessageToClient(
        event.data.namespace,
        event.data.room_id,
        event.data.message_id,
        event.data.content
      );
      break;
  }
}

// Webhook event types for type safety
type WebhookEventType =
  | 'credential.created'
  | 'credential.refreshed'
  | 'credential.expired'
  | 'credential.revoked'
  | 'workflow.instance.created'
  | 'workflow.instance.completed'
  | 'workflow.instance.failed'
  | 'workflow.instance.halted'
  | 'agent.room.message'
  | 'agent.room.closed';

// SSE-only event types (not available via webhooks)
type SSEEventType =
  | 'state_changed'
  | 'workflow_data_updated'
  | 'halted'
  | 'resumed'
  | 'completed'
  | 'message'
  | 'message_start'
  | 'message_delta'
  | 'message_end'
  | 'tool_use'
  | 'tool_result'
  | 'heartbeat';

interface WebhookEvent<T = Record<string, unknown>> {
  id: string;
  type: WebhookEventType;
  timestamp: string;
  data: T;
}

// Specific payload types for each event
interface CredentialEventData {
  credential_id: string;
  service_id: string;
  service_name?: string;
  namespace: string;
  reason?: string;
  scopes?: string[];
  expires_at?: string; // For credential.refreshed
}

interface WorkflowEventData {
  instance_id: string;
  workflow_name: string;
  workflow_id?: string;
  namespace: string;
  status?: 'created' | 'running' | 'halted' | 'completed' | 'failed';
  current_state?: string;
  final_state?: string;
  failed_state?: string;
  error?: { code: string; message: string };
  initial_input?: Record<string, unknown>;
  workflow_data?: Record<string, unknown>;
  output?: Record<string, unknown>;
  halting_presentation?: Record<string, unknown>;
}

interface AgentMessageEventData {
  room_id: string;
  assistant_id: string;
  namespace: string;
  message_id: string;
  role: 'user' | 'assistant';
  content: string;
}

interface AgentRoomClosedEventData {
  room_id: string;
  assistant_id: string;
  namespace: string;
  reason: 'user_closed' | 'timeout' | 'error';
  message_count: number;
  duration_seconds: number;
}

Python/Flask Verification

python
import hmac
import hashlib
import os
import re
from datetime import datetime, timezone, timedelta
from flask import Flask, request, jsonify

app = Flask(__name__)

MAX_TIMESTAMP_SKEW = timedelta(minutes=5)
SIGNATURE_PATTERN = re.compile(r'^sha256=([0-9a-f]{64})$')


def validate_timestamp(timestamp_header: str | None) -> bool:
    """Validate timestamp to prevent replay attacks."""
    if not timestamp_header:
        return False
    try:
        timestamp = datetime.fromisoformat(timestamp_header.replace('Z', '+00:00'))
        now = datetime.now(timezone.utc)
        return abs(now - timestamp) <= MAX_TIMESTAMP_SKEW
    except (ValueError, TypeError):
        return False


def is_valid_signature(signature_header: str | None, payload: bytes, secret: str) -> bool:
    """
    Validate and verify webhook signature with timing-safe comparison.
    Returns False for any invalid input - never raises exceptions.
    """
    if not signature_header:
        return False

    # Validate format: sha256=<64 hex chars>
    match = SIGNATURE_PATTERN.match(signature_header)
    if not match:
        return False

    provided_signature = match.group(1)
    expected_signature = hmac.new(
        secret.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(provided_signature, expected_signature)


@app.route('/webhooks/interactor', methods=['POST'])
def handle_webhook():
    signature_header = request.headers.get('X-Interactor-Signature')
    timestamp_header = request.headers.get('X-Interactor-Timestamp')
    payload = request.get_data()

    # Step 1: Validate timestamp (prevent replay attacks)
    if not validate_timestamp(timestamp_header):
        print('Webhook rejected: invalid or stale timestamp')
        return jsonify({'error': 'invalid_timestamp'}), 401

    # Step 2: Verify signature
    if not is_valid_signature(signature_header, payload, os.environ['INTERACTOR_WEBHOOK_SECRET']):
        print('Webhook rejected: invalid signature')
        return jsonify({'error': 'invalid_signature'}), 401

    # Step 3: Parse and handle event
    try:
        event = request.get_json()
    except Exception:
        return jsonify({'error': 'invalid_json'}), 400

    print(f"Received event: {event['type']} ({event['id']})")

    # Handle asynchronously (use Celery, RQ, or similar in production)
    handle_webhook_event(event)

    # Always respond quickly (< 5 seconds)
    return jsonify({'received': True}), 200

def handle_webhook_event(event: dict):
    event_type = event['type']
    data = event['data']

    if event_type in ['credential.expired', 'credential.revoked']:
        notify_user_to_reconnect(data['namespace'], data.get('service_name'))

    elif event_type == 'workflow.instance.halted':
        notify_user_of_pending_approval(
            data['namespace'],
            data['instance_id'],
            data.get('halting_presentation')
        )

    elif event_type == 'workflow.instance.completed':
        process_completed_workflow(data['instance_id'], data.get('workflow_data'))

    elif event_type == 'workflow.instance.failed':
        handle_workflow_failure(
            data['namespace'],
            data['instance_id'],
            data.get('error'),
            data.get('failed_state')
        )

    elif event_type == 'agent.room.message':
        forward_message_to_client(
            data['namespace'],
            data['room_id'],
            data['message_id'],
            data['content']
        )

Elixir/Phoenix Verification

elixir
defmodule MyAppWeb.WebhookController do
  use MyAppWeb, :controller

  import Plug.Conn, only: [get_req_header: 2]

  # Maximum body size for webhooks (1MB should be plenty)
  @max_body_length 1_048_576
  # Maximum timestamp skew (5 minutes in seconds)
  @max_timestamp_skew 300
  # Regex to validate signature format: sha256=<64 hex chars>
  @signature_pattern ~r/^sha256=([0-9a-f]{64})$/

  def interactor(conn, _params) do
    signature_header = get_req_header(conn, "x-interactor-signature") |> List.first()
    timestamp_header = get_req_header(conn, "x-interactor-timestamp") |> List.first()

    with {:ok, payload, conn} <- read_body(conn, length: @max_body_length),
         :ok <- validate_timestamp(timestamp_header),
         secret <- Application.fetch_env!(:my_app, :interactor_webhook_secret),
         :ok <- verify_signature(payload, signature_header, secret),
         {:ok, event} <- Jason.decode(payload) do
      # Handle asynchronously to respond quickly
      Task.start(fn -> handle_event(event) end)

      conn
      |> put_status(200)
      |> json(%{received: true})
    else
      {:more, _partial, conn} ->
        conn
        |> put_status(413)
        |> json(%{error: "payload_too_large"})

      {:error, :invalid_timestamp} ->
        conn
        |> put_status(401)
        |> json(%{error: "invalid_timestamp"})

      {:error, :invalid_signature} ->
        conn
        |> put_status(401)
        |> json(%{error: "invalid_signature"})

      {:error, _reason} ->
        conn
        |> put_status(400)
        |> json(%{error: "invalid_json"})
    end
  end

  defp validate_timestamp(nil), do: {:error, :invalid_timestamp}

  defp validate_timestamp(timestamp_header) do
    case DateTime.from_iso8601(timestamp_header) do
      {:ok, timestamp, _offset} ->
        now = DateTime.utc_now()
        diff = abs(DateTime.diff(now, timestamp, :second))

        if diff <= @max_timestamp_skew do
          :ok
        else
          {:error, :invalid_timestamp}
        end

      {:error, _} ->
        {:error, :invalid_timestamp}
    end
  end

  defp verify_signature(_payload, nil, _secret), do: {:error, :invalid_signature}

  defp verify_signature(payload, signature_header, secret) do
    case Regex.run(@signature_pattern, signature_header) do
      [_, provided_hex] ->
        expected_hex =
          :crypto.mac(:hmac, :sha256, secret, payload)
          |> Base.encode16(case: :lower)

        if Plug.Crypto.secure_compare(provided_hex, expected_hex) do
          :ok
        else
          {:error, :invalid_signature}
        end

      _ ->
        {:error, :invalid_signature}
    end
  end

  defp handle_event(%{"type" => "credential.expired", "data" => data}) do
    MyApp.Notifications.notify_reconnect(data["namespace"], data["service_name"])
  end

  defp handle_event(%{"type" => "workflow.instance.halted", "data" => data}) do
    MyApp.Notifications.notify_pending_approval(
      data["namespace"],
      data["instance_id"],
      data["halting_presentation"]
    )
  end

  defp handle_event(%{"type" => "workflow.instance.completed", "data" => data}) do
    MyApp.Workflows.process_completed(data["instance_id"], data["workflow_data"])
  end

  defp handle_event(%{"type" => "workflow.instance.failed", "data" => data}) do
    MyApp.Workflows.handle_failure(
      data["namespace"],
      data["instance_id"],
      data["error"],
      data["failed_state"]
    )
  end

  defp handle_event(%{"type" => "agent.room.message", "data" => data}) do
    MyApp.Chat.forward_message(
      data["namespace"],
      data["room_id"],
      data["message_id"],
      data["content"]
    )
  end

  defp handle_event(_event), do: :ok
end

Retry Policy

Interactor retries failed webhook deliveries with exponential backoff:

Attempt Delay Total Time
1 Immediate 0
2 1 minute 1 min
3 5 minutes 6 min
4 30 minutes 36 min
5 2 hours 2h 36min

After 5 failed attempts, the webhook is disabled. Re-enable via the toggle endpoint.

HTTP Response Semantics

Your webhook handler's HTTP response determines Interactor's retry behavior:

HTTP Status Interactor Behavior Your Action
200-299 ✅ Success - no retry Event processed successfully
400 ❌ Permanent failure - no retry Bad request, fix your handler
401 ❌ Permanent failure - no retry Signature invalid, check secret
403 ❌ Permanent failure - no retry Forbidden, check permissions
404 ❌ Permanent failure - no retry Endpoint not found, check URL
408 🔄 Retry with backoff Request timeout, respond faster
429 🔄 Retry with backoff Rate limited, will retry later
500 🔄 Retry with backoff Server error, will retry
502-504 🔄 Retry with backoff Gateway/timeout, will retry
Timeout (>30s) 🔄 Retry with backoff No response received, will retry
Connection refused 🔄 Retry with backoff Server unreachable, will retry

Important: Return 200 OK immediately, then process asynchronously. If you return 4xx errors for transient issues, Interactor won't retry.

Best Practices for Reliability

  1. Respond quickly - Return 200 within 5 seconds
  2. Process asynchronously - Queue events for background processing
  3. Be idempotent - Handle duplicate deliveries gracefully
  4. Log event IDs - Track which events you've processed

Idempotent Event Processing

typescript
// Example: Idempotent event processing with Redis
const IDEMPOTENCY_TTL = 7 * 24 * 60 * 60; // 7 days in seconds

async function handleWebhookEvent(event: WebhookEvent) {
  const idempotencyKey = `webhook:processed:${event.id}`;

  // Atomic check-and-set to prevent race conditions
  const wasSet = await redis.set(idempotencyKey, Date.now(), 'NX', 'EX', IDEMPOTENCY_TTL);

  if (!wasSet) {
    console.log(`Event ${event.id} already processed, skipping`);
    return;
  }

  try {
    await processEvent(event);
    console.log(`Successfully processed event ${event.id}`);
  } catch (error) {
    // Delete the key so retry can process it
    await redis.del(idempotencyKey);
    throw error;
  }
}

Idempotency Storage Recommendations:

Storage TTL Use Case
Redis 7 days High-throughput, distributed systems
PostgreSQL 30 days Audit trail needed, lower throughput
In-memory Session Development/testing only

Dead-Letter Queue (DLQ) Strategy

For events that repeatedly fail processing, implement a DLQ:

typescript
const MAX_PROCESS_ATTEMPTS = 3;
const DLQ_KEY = 'webhook:dlq';

async function handleWebhookEvent(event: WebhookEvent) {
  const attemptKey = `webhook:attempts:${event.id}`;
  const attempts = await redis.incr(attemptKey);
  await redis.expire(attemptKey, 24 * 60 * 60); // 24 hour window

  try {
    await processEvent(event);
    await redis.del(attemptKey);
  } catch (error) {
    if (attempts >= MAX_PROCESS_ATTEMPTS) {
      // Move to DLQ for manual review
      await redis.rpush(DLQ_KEY, JSON.stringify({
        event,
        error: error.message,
        failedAt: new Date().toISOString(),
        attempts
      }));
      await redis.del(attemptKey);
      console.error(`Event ${event.id} moved to DLQ after ${attempts} attempts`);

      // Alert operations team
      await alertOps(`Webhook event ${event.id} failed ${attempts} times`);
    } else {
      console.warn(`Event ${event.id} failed (attempt ${attempts}/${MAX_PROCESS_ATTEMPTS})`);
      throw error; // Will be retried by Interactor
    }
  }
}

// Periodic DLQ processor (run via cron)
async function processDLQ() {
  while (true) {
    const item = await redis.lpop(DLQ_KEY);
    if (!item) break;

    const { event, error, failedAt } = JSON.parse(item);
    console.log(`DLQ item: ${event.id} failed at ${failedAt}: ${error}`);
    // Manual review or automated recovery logic
  }
}

Monitoring & Observability

Track webhook health with these metrics:

Prometheus Metrics Example:

typescript
import { Counter, Histogram, Gauge } from 'prom-client';

// Webhook metrics
const webhookReceived = new Counter({
  name: 'interactor_webhook_received_total',
  help: 'Total webhooks received',
  labelNames: ['event_type', 'status'] // status: success, invalid_signature, invalid_timestamp, processing_error
});

const webhookProcessingDuration = new Histogram({
  name: 'interactor_webhook_processing_duration_seconds',
  help: 'Webhook processing duration in seconds',
  labelNames: ['event_type'],
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5]
});

const webhookDLQSize = new Gauge({
  name: 'interactor_webhook_dlq_size',
  help: 'Current size of the dead-letter queue'
});

// Usage in handler
app.post('/webhooks/interactor', async (req, res) => {
  const timer = webhookProcessingDuration.startTimer();

  try {
    // ... validation ...

    if (!isValidSignature(...)) {
      webhookReceived.inc({ event_type: 'unknown', status: 'invalid_signature' });
      return res.status(401).json({ error: 'invalid_signature' });
    }

    const event = JSON.parse(payload.toString());
    webhookReceived.inc({ event_type: event.type, status: 'success' });

    await handleWebhookEvent(event);
    timer({ event_type: event.type });

    res.status(200).json({ received: true });
  } catch (error) {
    webhookReceived.inc({ event_type: 'unknown', status: 'processing_error' });
    timer({ event_type: 'error' });
    throw error;
  }
});

Key Metrics to Monitor:

Metric Alert Threshold Description
webhook_received_total{status="invalid_signature"} >5 in 5min Possible secret mismatch or attack
webhook_processing_duration_seconds p99 >5s Risk of timeout, scale handlers
webhook_dlq_size >0 Events need manual review
webhook_received_total{status="processing_error"} >10 in 5min Handler bugs, investigate logs

Server-Sent Events (SSE)

For real-time streaming in browsers and clients.

Workflow Instance Stream

Stream updates for a specific workflow instance:

bash
curl -N https://core.interactor.com/api/v1/workflows/instances/inst_xyz/stream \
  -H "Authorization: Bearer <token>" \
  -H "Accept: text/event-stream"

Events:

event: state_changed
data: {"state": "manager_approval", "previous_state": "submit", "thread_id": "thread_main"}

event: workflow_data_updated
data: {"key": "submitted_at", "value": "2026-01-20T12:00:00Z"}

event: halted
data: {"state": "manager_approval", "presentation": {...}}

event: resumed
data: {"state": "manager_approval", "input": {"approved": true}}

event: completed
data: {"status": "completed", "final_state": "approved", "output": {...}}

Chat Room Stream

Stream messages in a chat room:

bash
curl -N https://core.interactor.com/api/v1/agents/rooms/room_xyz/stream \
  -H "Authorization: Bearer <token>" \
  -H "Accept: text/event-stream"

Events:

event: message
data: {"id": "msg_1", "role": "user", "content": "Hello"}

event: message_start
data: {"id": "msg_2", "role": "assistant"}

event: message_delta
data: {"id": "msg_2", "delta": "Hi there! "}

event: message_delta
data: {"id": "msg_2", "delta": "How can I "}

event: message_delta
data: {"id": "msg_2", "delta": "help you today?"}

event: message_end
data: {"id": "msg_2", "role": "assistant", "content": "Hi there! How can I help you today?"}

event: tool_use
data: {"id": "call_1", "tool": "search_products", "parameters": {"query": "laptop"}}

event: tool_result
data: {"id": "call_1", "tool": "search_products", "result": {"products": [...]}}

event: heartbeat
data: {"timestamp": "2026-01-20T12:00:30Z"}

SSE Security Best Practices

Short-Lived Tokens

Since EventSource doesn't support custom headers, tokens must be passed in the URL. Use short-lived tokens to minimize exposure:

typescript
// Backend: Generate short-lived SSE token (5 minute expiry)
app.post('/api/sse-token', authenticate, async (req, res) => {
  const sseToken = jwt.sign(
    {
      sub: req.user.id,
      purpose: 'sse',
      roomId: req.body.roomId // Scope token to specific resource
    },
    process.env.SSE_TOKEN_SECRET,
    { expiresIn: '5m' }
  );

  res.json({ token: sseToken, expiresIn: 300 });
});

// Frontend: Request token before connecting
async function connectToSSE(roomId: string) {
  const { token } = await fetch('/api/sse-token', {
    method: 'POST',
    headers: { 'Authorization': `Bearer ${authToken}` },
    body: JSON.stringify({ roomId })
  }).then(r => r.json());

  return new EventSource(`/api/sse/rooms/${roomId}?token=${token}`);
}

Server-Side Proxy Pattern

Proxy SSE connections through your backend to avoid exposing Interactor tokens:

typescript
// Your backend proxies SSE from Interactor
app.get('/api/sse/rooms/:roomId', authenticate, (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  // Connect to Interactor with server-side token
  const upstream = new EventSource(
    `https://core.interactor.com/api/v1/agents/rooms/${req.params.roomId}/stream`,
    { headers: { 'Authorization': `Bearer ${process.env.INTERACTOR_ACCESS_TOKEN}` } }
  );

  // Forward events to client
  upstream.onmessage = (event) => {
    res.write(`event: ${event.type}\ndata: ${event.data}\n\n`);
  };

  req.on('close', () => upstream.close());
});

CORS Configuration

If connecting directly to Interactor from the browser:

typescript
// Ensure your domain is whitelisted in Interactor settings
// Interactor will include these headers:
// Access-Control-Allow-Origin: https://yourdomain.com
// Access-Control-Allow-Credentials: true

// Your CSP should allow connections:
// connect-src 'self' https://core.interactor.com;

Heartbeat & Connection Health

Interactor sends heartbeat events every 30 seconds. Use them to detect stale connections:

typescript
const HEARTBEAT_TIMEOUT_MS = 45_000; // 30s interval + 15s grace period
let lastHeartbeat = Date.now();
let healthCheckInterval: NodeJS.Timeout;

function setupHealthCheck(eventSource: EventSource, onStale: () => void) {
  eventSource.addEventListener('heartbeat', () => {
    lastHeartbeat = Date.now();
  });

  healthCheckInterval = setInterval(() => {
    const timeSinceHeartbeat = Date.now() - lastHeartbeat;
    if (timeSinceHeartbeat > HEARTBEAT_TIMEOUT_MS) {
      console.warn(`SSE connection stale (${timeSinceHeartbeat}ms since last heartbeat)`);
      onStale();
    }
  }, 10_000); // Check every 10 seconds
}

function cleanup() {
  clearInterval(healthCheckInterval);
  eventSource.close();
}

Health Thresholds:

Condition Threshold Action
Normal Heartbeat within 30s Connection healthy
Warning 30-45s since heartbeat Log warning, prepare reconnect
Stale >45s since heartbeat Force reconnect
Failed 3 consecutive reconnect failures Alert user, escalate to support

SSE Client Implementations

Browser (Native EventSource)

typescript
const token = 'your_access_token';
const roomId = 'room_xyz';

// Note: EventSource doesn't support custom headers
// Pass token as query parameter
const eventSource = new EventSource(
  `https://core.interactor.com/api/v1/agents/rooms/${roomId}/stream?token=${token}`
);

// ⚠️ SECURITY NOTE: Token in URL may appear in server access logs.
// For enhanced security, use short-lived tokens specifically for SSE connections.
// Consider using a dedicated SSE token endpoint that issues time-limited tokens.

// Handle different event types
eventSource.addEventListener('message', (event) => {
  const data = JSON.parse(event.data);
  displayMessage(data);
});

eventSource.addEventListener('message_start', (event) => {
  const data = JSON.parse(event.data);
  startStreamingMessage(data.id);
});

eventSource.addEventListener('message_delta', (event) => {
  const data = JSON.parse(event.data);
  appendToStreamingMessage(data.id, data.delta);
});

eventSource.addEventListener('message_end', (event) => {
  const data = JSON.parse(event.data);
  finalizeStreamingMessage(data.id, data.content);
});

eventSource.addEventListener('tool_use', (event) => {
  const data = JSON.parse(event.data);
  showToolUsage(data.tool, data.parameters);
});

eventSource.addEventListener('tool_result', (event) => {
  const data = JSON.parse(event.data);
  showToolResult(data.tool, data.result);
});

eventSource.addEventListener('heartbeat', (event) => {
  // Connection is alive
  updateLastHeartbeat();
});

eventSource.onerror = (error) => {
  console.error('SSE error:', error);
  // Implement reconnection logic
  if (eventSource.readyState === EventSource.CLOSED) {
    setTimeout(() => reconnect(), 5000);
  }
};

// Clean up when done
function cleanup() {
  eventSource.close();
}

React Hook for Chat Streaming

tsx
import { useEffect, useState, useRef, useCallback } from 'react';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
  isStreaming?: boolean;
}

interface UseChatStreamOptions {
  roomId: string;
  token: string;
  onError?: (error: Error) => void;
}

export function useChatStream({ roomId, token, onError }: UseChatStreamOptions) {
  const [messages, setMessages] = useState<Message[]>([]);
  const [isConnected, setIsConnected] = useState(false);
  const [isStreaming, setIsStreaming] = useState(false);
  const eventSourceRef = useRef<EventSource | null>(null);
  const streamingContentRef = useRef<Map<string, string>>(new Map());

  // Use ref for onError to prevent infinite re-renders
  // when onError is an inline function
  const onErrorRef = useRef(onError);
  useEffect(() => {
    onErrorRef.current = onError;
  }, [onError]);

  const connect = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
    }

    const url = `https://core.interactor.com/api/v1/agents/rooms/${roomId}/stream?token=${token}`;
    const eventSource = new EventSource(url);
    eventSourceRef.current = eventSource;

    eventSource.onopen = () => {
      setIsConnected(true);
    };

    eventSource.addEventListener('message', (event) => {
      const data = JSON.parse(event.data);
      setMessages(prev => [...prev, data]);
    });

    eventSource.addEventListener('message_start', (event) => {
      const data = JSON.parse(event.data);
      setIsStreaming(true);
      streamingContentRef.current.set(data.id, '');
      setMessages(prev => [...prev, {
        id: data.id,
        role: 'assistant',
        content: '',
        isStreaming: true
      }]);
    });

    eventSource.addEventListener('message_delta', (event) => {
      const data = JSON.parse(event.data);
      const currentContent = streamingContentRef.current.get(data.id) || '';
      const newContent = currentContent + data.delta;
      streamingContentRef.current.set(data.id, newContent);

      setMessages(prev => prev.map(msg =>
        msg.id === data.id
          ? { ...msg, content: newContent }
          : msg
      ));
    });

    eventSource.addEventListener('message_end', (event) => {
      const data = JSON.parse(event.data);
      setIsStreaming(false);
      streamingContentRef.current.delete(data.id);

      setMessages(prev => prev.map(msg =>
        msg.id === data.id
          ? { ...msg, content: data.content, isStreaming: false }
          : msg
      ));
    });

    eventSource.onerror = () => {
      setIsConnected(false);
      onErrorRef.current?.(new Error('SSE connection error'));

      // Auto-reconnect after 5 seconds
      setTimeout(() => {
        if (eventSourceRef.current?.readyState === EventSource.CLOSED) {
          connect();
        }
      }, 5000);
    };
  }, [roomId, token]); // Note: onError removed, using ref instead

  const disconnect = useCallback(() => {
    if (eventSourceRef.current) {
      eventSourceRef.current.close();
      eventSourceRef.current = null;
    }
    // Clean up streaming content map to prevent memory leaks
    streamingContentRef.current.clear();
    setIsConnected(false);
    setIsStreaming(false);
  }, []);

  useEffect(() => {
    connect();
    return () => disconnect();
  }, [connect, disconnect]);

  return {
    messages,
    isConnected,
    isStreaming,
    reconnect: connect,
    disconnect
  };
}

// Usage in component
function ChatRoom({ roomId, token }: { roomId: string; token: string }) {
  const { messages, isConnected, isStreaming } = useChatStream({
    roomId,
    token,
    onError: (error) => console.error('Chat error:', error)
  });

  return (
    <div className="chat-room">
      <div className="status">
        {isConnected ? '🟢 Connected' : '🔴 Disconnected'}
        {isStreaming && ' (typing...)'}
      </div>

      <div className="messages">
        {messages.map((msg) => (
          <div key={msg.id} className={`message ${msg.role}`}>
            <div className="content">
              {msg.content}
              {msg.isStreaming && <span className="cursor">▊</span>}
            </div>
          </div>
        ))}
      </div>
    </div>
  );
}

Node.js SSE Client

typescript
import EventSource from 'eventsource';

class InteractorSSEClient {
  private eventSource: EventSource | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  constructor(
    private baseUrl: string,
    private token: string
  ) {}

  connectToRoom(roomId: string, handlers: {
    onMessage?: (message: any) => void;
    onMessageStart?: (data: any) => void;
    onMessageDelta?: (data: any) => void;
    onMessageEnd?: (data: any) => void;
    onToolUse?: (data: any) => void;
    onToolResult?: (data: any) => void;
    onHeartbeat?: (data: any) => void;
    onError?: (error: Error) => void;
  }) {
    const url = `${this.baseUrl}/agents/rooms/${roomId}/stream`;

    this.eventSource = new EventSource(url, {
      headers: {
        'Authorization': `Bearer ${this.token}`
      }
    });

    this.eventSource.onopen = () => {
      console.log('SSE connected to room:', roomId);
      this.reconnectAttempts = 0;
    };

    if (handlers.onMessage) {
      this.eventSource.addEventListener('message', (event) => {
        handlers.onMessage!(JSON.parse(event.data));
      });
    }

    if (handlers.onMessageStart) {
      this.eventSource.addEventListener('message_start', (event) => {
        handlers.onMessageStart!(JSON.parse(event.data));
      });
    }

    if (handlers.onMessageDelta) {
      this.eventSource.addEventListener('message_delta', (event) => {
        handlers.onMessageDelta!(JSON.parse(event.data));
      });
    }

    if (handlers.onMessageEnd) {
      this.eventSource.addEventListener('message_end', (event) => {
        handlers.onMessageEnd!(JSON.parse(event.data));
      });
    }

    if (handlers.onToolUse) {
      this.eventSource.addEventListener('tool_use', (event) => {
        handlers.onToolUse!(JSON.parse(event.data));
      });
    }

    if (handlers.onToolResult) {
      this.eventSource.addEventListener('tool_result', (event) => {
        handlers.onToolResult!(JSON.parse(event.data));
      });
    }

    if (handlers.onHeartbeat) {
      this.eventSource.addEventListener('heartbeat', (event) => {
        handlers.onHeartbeat!(JSON.parse(event.data));
      });
    }

    this.eventSource.onerror = (error) => {
      console.error('SSE error:', error);
      handlers.onError?.(new Error('SSE connection error'));

      // Auto-reconnect with backoff
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.pow(2, this.reconnectAttempts) * 1000;
        this.reconnectAttempts++;
        setTimeout(() => this.connectToRoom(roomId, handlers), delay);
      }
    };

    return this;
  }

  connectToWorkflow(instanceId: string, handlers: {
    onStateChanged?: (data: any) => void;
    onWorkflowDataUpdated?: (data: any) => void;
    onHalted?: (data: any) => void;
    onResumed?: (data: any) => void;
    onCompleted?: (data: any) => void;
    onHeartbeat?: (data: any) => void;
    onError?: (error: Error) => void;
  }) {
    const url = `${this.baseUrl}/workflows/instances/${instanceId}/stream`;

    this.eventSource = new EventSource(url, {
      headers: {
        'Authorization': `Bearer ${this.token}`
      }
    });

    this.eventSource.onopen = () => {
      console.log('SSE connected to workflow:', instanceId);
      this.reconnectAttempts = 0;
    };

    if (handlers.onStateChanged) {
      this.eventSource.addEventListener('state_changed', (event) => {
        handlers.onStateChanged!(JSON.parse(event.data));
      });
    }

    if (handlers.onWorkflowDataUpdated) {
      this.eventSource.addEventListener('workflow_data_updated', (event) => {
        handlers.onWorkflowDataUpdated!(JSON.parse(event.data));
      });
    }

    if (handlers.onHalted) {
      this.eventSource.addEventListener('halted', (event) => {
        handlers.onHalted!(JSON.parse(event.data));
      });
    }

    if (handlers.onResumed) {
      this.eventSource.addEventListener('resumed', (event) => {
        handlers.onResumed!(JSON.parse(event.data));
      });
    }

    if (handlers.onCompleted) {
      this.eventSource.addEventListener('completed', (event) => {
        handlers.onCompleted!(JSON.parse(event.data));
      });
    }

    if (handlers.onHeartbeat) {
      this.eventSource.addEventListener('heartbeat', (event) => {
        handlers.onHeartbeat!(JSON.parse(event.data));
      });
    }

    this.eventSource.onerror = (error) => {
      console.error('SSE error:', error);
      handlers.onError?.(new Error('SSE connection error'));

      // Auto-reconnect with backoff (same as room connection)
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.pow(2, this.reconnectAttempts) * 1000;
        this.reconnectAttempts++;
        setTimeout(() => this.connectToWorkflow(instanceId, handlers), delay);
      }
    };

    return this;
  }

  disconnect() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }
}

// Usage
const sseClient = new InteractorSSEClient(
  'https://core.interactor.com/api/v1',
  process.env.INTERACTOR_ACCESS_TOKEN!
);

sseClient.connectToRoom('room_xyz', {
  onMessageDelta: (data) => {
    process.stdout.write(data.delta);
  },
  onMessageEnd: (data) => {
    console.log('\n--- Message complete ---');
  },
  onError: (error) => {
    console.error('Error:', error);
  }
});

Rate Limits

Resource Limit
Webhooks per account 50
SSE connections per account 10 concurrent
Events per webhook Unlimited

Best Practices

Webhooks

  1. Always verify signatures - Reject requests with invalid signatures
  2. Respond quickly - Return 200 within 5 seconds, process asynchronously
  3. Handle duplicates - Events may be delivered more than once
  4. Use idempotent processing - Track event IDs to prevent double-processing
  5. Monitor delivery - Check webhook events list for failures
  6. Use HTTPS - Required for production webhooks

SSE

  1. Handle reconnection - SSE connections may drop; implement auto-reconnect
  2. Watch for heartbeats - Detect stale connections
  3. Close when done - Close connections when leaving pages/screens
  4. Limit connections - Max 10 concurrent SSE connections per account
  5. Use for frontend only - For backend, prefer webhooks

Local Development & Testing

Testing Webhooks Locally

Webhooks require a publicly accessible URL. For local development:

Option 1: ngrok (Recommended)

bash
# Install ngrok: https://ngrok.com/download
ngrok http 4000  # For Phoenix default port

# Use the generated URL for your webhook
# Example: https://abc123.ngrok.io/webhooks/interactor

Option 2: localtunnel

bash
npm install -g localtunnel
lt --port 4000

# Use the generated URL for your webhook

Option 3: Cloudflare Tunnel

bash
cloudflared tunnel --url http://localhost:4000

Testing Webhook Signature Verification

Create a test script to verify your signature implementation:

typescript
// test-webhook-signature.ts
import crypto from 'crypto';

const secret = 'whsec_your_test_secret';
const payload = JSON.stringify({
  id: 'evt_test',
  type: 'workflow.instance.completed',
  timestamp: new Date().toISOString(),
  data: { instance_id: 'inst_test' }
});

const signature = 'sha256=' + crypto
  .createHmac('sha256', secret)
  .update(payload)
  .digest('hex');

console.log('Test payload:', payload);
console.log('Test signature:', signature);

// Use curl to test:
// curl -X POST http://localhost:4000/webhooks/interactor \
//   -H "Content-Type: application/json" \
//   -H "X-Interactor-Signature: ${signature}" \
//   -d '${payload}'

Use the Test Endpoint

Interactor provides a test endpoint to send sample events:

bash
curl -X POST https://core.interactor.com/api/v1/webhooks/wh_abc/test \
  -H "Authorization: Bearer <token>"

This sends a test event to your webhook URL to verify it's working.

Postman Collection

Import this collection to test webhook handling:

json
{
  "info": { "name": "Interactor Webhooks", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" },
  "variable": [
    { "key": "webhook_secret", "value": "whsec_your_test_secret" },
    { "key": "webhook_url", "value": "http://localhost:4000/webhooks/interactor" }
  ],
  "item": [
    {
      "name": "Test Webhook - Credential Expired",
      "request": {
        "method": "POST",
        "url": "{{webhook_url}}",
        "header": [
          { "key": "Content-Type", "value": "application/json" },
          { "key": "X-Interactor-Signature", "value": "sha256={{signature}}" },
          { "key": "X-Interactor-Timestamp", "value": "{{timestamp}}" }
        ],
        "body": {
          "mode": "raw",
          "raw": "{\"id\":\"evt_test_001\",\"type\":\"credential.expired\",\"timestamp\":\"{{timestamp}}\",\"data\":{\"credential_id\":\"cred_test\",\"service_id\":\"google_calendar\",\"namespace\":\"user_123\",\"reason\":\"refresh_token_invalid\"}}"
        }
      }
    }
  ]
}

Pre-request script to generate signature:

javascript
const crypto = require('crypto-js');
const timestamp = new Date().toISOString();
const payload = pm.request.body.raw.replace(/\{\{timestamp\}\}/g, timestamp);
const signature = crypto.HmacSHA256(payload, pm.variables.get('webhook_secret')).toString();

pm.variables.set('timestamp', timestamp);
pm.variables.set('signature', signature);
pm.request.body.raw = payload;

Unit Test Template (TypeScript/Jest)

typescript
import crypto from 'crypto';
import request from 'supertest';
import app from '../app'; // Your Express app

describe('Webhook Handler', () => {
  const WEBHOOK_SECRET = 'whsec_test_secret_123';

  beforeAll(() => {
    process.env.INTERACTOR_WEBHOOK_SECRET = WEBHOOK_SECRET;
  });

  function generateSignature(payload: string): string {
    return 'sha256=' + crypto.createHmac('sha256', WEBHOOK_SECRET).update(payload).digest('hex');
  }

  function generateTimestamp(offsetMs = 0): string {
    return new Date(Date.now() + offsetMs).toISOString();
  }

  it('accepts valid webhook', async () => {
    const payload = JSON.stringify({
      id: 'evt_test_001',
      type: 'workflow.instance.completed',
      timestamp: generateTimestamp(),
      data: { instance_id: 'inst_123' }
    });

    const res = await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', generateSignature(payload))
      .set('X-Interactor-Timestamp', generateTimestamp())
      .send(payload);

    expect(res.status).toBe(200);
    expect(res.body.received).toBe(true);
  });

  it('rejects invalid signature', async () => {
    const payload = JSON.stringify({ id: 'evt_test', type: 'test' });

    const res = await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', 'sha256=invalid')
      .set('X-Interactor-Timestamp', generateTimestamp())
      .send(payload);

    expect(res.status).toBe(401);
    expect(res.body.error).toBe('invalid_signature');
  });

  it('rejects malformed signature header', async () => {
    const payload = JSON.stringify({ id: 'evt_test', type: 'test' });

    const res = await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', 'not_sha256_format')
      .set('X-Interactor-Timestamp', generateTimestamp())
      .send(payload);

    expect(res.status).toBe(401);
  });

  it('rejects stale timestamp (replay attack)', async () => {
    const payload = JSON.stringify({ id: 'evt_test', type: 'test' });
    const staleTimestamp = generateTimestamp(-10 * 60 * 1000); // 10 minutes ago

    const res = await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', generateSignature(payload))
      .set('X-Interactor-Timestamp', staleTimestamp)
      .send(payload);

    expect(res.status).toBe(401);
    expect(res.body.error).toBe('invalid_timestamp');
  });

  it('handles duplicate events idempotently', async () => {
    const eventId = 'evt_duplicate_test';
    const payload = JSON.stringify({
      id: eventId,
      type: 'workflow.instance.completed',
      timestamp: generateTimestamp(),
      data: { instance_id: 'inst_123' }
    });

    // First request
    await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', generateSignature(payload))
      .set('X-Interactor-Timestamp', generateTimestamp())
      .send(payload);

    // Second request (duplicate)
    const res = await request(app)
      .post('/webhooks/interactor')
      .set('Content-Type', 'application/json')
      .set('X-Interactor-Signature', generateSignature(payload))
      .set('X-Interactor-Timestamp', generateTimestamp())
      .send(payload);

    expect(res.status).toBe(200); // Should still succeed, just skip processing
  });
});

Event Replay Script

Replay historical events for debugging or recovery:

typescript
#!/usr/bin/env npx ts-node
// scripts/replay-webhook-events.ts
import crypto from 'crypto';
import fetch from 'node-fetch';

interface ReplayOptions {
  webhookUrl: string;
  webhookSecret: string;
  events: Array<{ id: string; type: string; data: unknown }>;
  delayMs?: number;
}

async function replayEvents({ webhookUrl, webhookSecret, events, delayMs = 100 }: ReplayOptions) {
  for (const event of events) {
    const timestamp = new Date().toISOString();
    const payload = JSON.stringify({
      ...event,
      timestamp,
      _replayed: true, // Mark as replayed for debugging
      _originalTimestamp: event.timestamp
    });

    const signature = 'sha256=' + crypto
      .createHmac('sha256', webhookSecret)
      .update(payload)
      .digest('hex');

    try {
      const res = await fetch(webhookUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'X-Interactor-Signature': signature,
          'X-Interactor-Timestamp': timestamp
        },
        body: payload
      });

      console.log(`[${event.id}] ${event.type}: ${res.status} ${res.statusText}`);
    } catch (error) {
      console.error(`[${event.id}] FAILED:`, error);
    }

    await new Promise(r => setTimeout(r, delayMs));
  }
}

// Usage: Fetch events from Interactor and replay locally
const events = [
  { id: 'evt_001', type: 'credential.expired', data: { credential_id: 'cred_abc', namespace: 'user_123' } },
  { id: 'evt_002', type: 'workflow.instance.completed', data: { instance_id: 'inst_xyz' } }
];

replayEvents({
  webhookUrl: 'http://localhost:4000/webhooks/interactor',
  webhookSecret: process.env.INTERACTOR_WEBHOOK_SECRET!,
  events
});

Error Handling

Webhook API Errors

Error Code HTTP Status Description Resolution
webhook_not_found 404 Webhook ID doesn't exist Verify webhook ID, may have been deleted
invalid_url 400 URL not valid HTTPS Use https:// URL (HTTP only in dev)
invalid_events 400 Unknown event types in subscription Check /event-types for valid events
webhook_disabled 400 Webhook disabled after failures Fix endpoint issues, then toggle to re-enable
max_webhooks_exceeded 400 Account webhook limit reached Delete unused webhooks or contact support
url_unreachable 400 Cannot reach webhook URL Ensure URL is publicly accessible
invalid_secret_format 500 Internal error generating secret Retry request, contact support if persists
rate_limited 429 Too many API requests Wait and retry with exponential backoff
unauthorized 401 Invalid or expired token Refresh authentication token
forbidden 403 Insufficient permissions Check API token scopes

Webhook Delivery Errors

Your endpoint may receive these error scenarios:

Scenario Your Response Interactor Behavior
Signature mismatch Return 401 Logged as authentication failure
Timestamp too old Return 401 Logged as authentication failure
Unknown event type Return 200 Treated as success (forward compatible)
Processing error (recoverable) Return 500 Retried with backoff
Processing error (permanent) Return 400 Not retried, logged as permanent failure
Timeout (no response) N/A Retried with backoff

SSE Errors

Error HTTP Status Cause Resolution
Connection refused 401 Invalid or expired token Refresh token and reconnect
Connection refused 403 No access to resource Check permissions for room/workflow
Resource not found 404 Invalid room_id or instance_id Verify resource exists
Connection dropped N/A Network issues Implement auto-reconnect with backoff
Rate limited 429 Too many connections Close unused connections, respect limits
Server error 500 Interactor service issue Retry with backoff, check status page

Rate Limit Exceeded Behavior

When you exceed rate limits, Interactor returns:

http
HTTP/1.1 429 Too Many Requests
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1706176800

{
  "error": "rate_limited",
  "message": "Too many requests. Please retry after 60 seconds.",
  "retry_after": 60
}

Rate limit headers:

  • X-RateLimit-Limit: Maximum requests per window
  • X-RateLimit-Remaining: Requests remaining in current window
  • X-RateLimit-Reset: Unix timestamp when the window resets
  • Retry-After: Seconds to wait before retrying

Handling rate limits:

typescript
async function callInteractorAPI(endpoint: string, options: RequestInit) {
  const response = await fetch(`https://core.interactor.com/api/v1${endpoint}`, options);

  if (response.status === 429) {
    const retryAfter = parseInt(response.headers.get('Retry-After') || '60', 10);
    console.warn(`Rate limited. Retrying after ${retryAfter}s`);
    await new Promise(r => setTimeout(r, retryAfter * 1000));
    return callInteractorAPI(endpoint, options); // Retry once
  }

  return response;
}

Output Format

When implementing webhooks/streaming, provide this summary:

markdown
## Webhooks & Streaming Implementation Report

**Date**: YYYY-MM-DD

### Webhooks Configured
| Webhook ID | URL | Events | Status |
|------------|-----|--------|--------|
| wh_abc | https://app.com/webhooks | credential.*, workflow.* | ✓ Active |

### Event Handlers
| Event | Handler | Status |
|-------|---------|--------|
| credential.expired | notifyUserToReconnect() | ✓ Implemented |
| credential.revoked | notifyUserToReconnect() | ✓ Implemented |
| workflow.instance.halted | notifyPendingApproval() | ✓ Implemented |
| workflow.instance.completed | processWorkflowResult() | ✓ Implemented |
| workflow.instance.failed | handleWorkflowFailure() | ✓ Implemented |
| agent.room.message | forwardToRealtime() | ✓ Implemented |

### SSE Streams
| Stream | Purpose | Status |
|--------|---------|--------|
| Room stream | Real-time chat UI | ✓ Implemented |
| Workflow stream | Progress tracking | ✓ Implemented |

### Implementation Checklist

**Security**
- [ ] Webhook endpoint uses HTTPS (required for production)
- [ ] Signature verification with timing-safe comparison
- [ ] Signature header format validated (`sha256=` + 64 hex chars)
- [ ] Timestamp validation to prevent replay attacks (5 min window)
- [ ] Webhook secret stored in environment variable (not in code)
- [ ] Key rotation procedure documented and tested
- [ ] SSE tokens are short-lived (5 min) or proxied through backend

**Reliability**
- [ ] Respond to webhooks within 5 seconds
- [ ] Async processing with background job queue
- [ ] Idempotent processing (track event IDs in Redis/DB)
- [ ] Dead-letter queue for failed events
- [ ] Event handlers for all subscribed events
- [ ] Unknown event types handled gracefully (ignore, don't fail)

**Observability**
- [ ] Webhook received counter (by event_type, status)
- [ ] Processing duration histogram
- [ ] DLQ size gauge
- [ ] Error rate alerting configured
- [ ] Signature validation failure alerting

**SSE**
- [ ] Auto-reconnect with exponential backoff
- [ ] Heartbeat monitoring (45s timeout)
- [ ] Connection cleanup on component unmount
- [ ] Connection health indicator in UI
- [ ] Max reconnect attempts with user notification

**Testing**
- [ ] Unit tests for signature verification
- [ ] Unit tests for timestamp validation
- [ ] Integration tests with test webhook endpoint
- [ ] Replay script available for debugging

Related Skills

  • interactor-auth: Setup authentication (prerequisite)
  • interactor-credentials: Credential events to monitor
  • interactor-agents: Chat streaming events
  • interactor-workflows: Workflow status events

Didn't find tool you were looking for?

Be as detailed as possible for better results