Agent skill

drasi-queries

Guide for writing Drasi continuous queries using Cypher. Use this when asked to create, modify, or troubleshoot Drasi queries.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/drasi-queries

SKILL.md

Drasi Query Development Skill

Use this skill when developing Drasi ContinuousQuery definitions with Cypher. If the user is not working with Drasi, do not apply this skill.

IMPORTANT: Use the context7 MCP server with library ID /drasi-project/docs to get the latest Drasi documentation and Cypher syntax. Do not assume—verify current supported features.

Verify-first any Drasi capability or function behavior that may be version-dependent:

text
[VERIFY]
EvidenceType = Docs | ReleaseNotes | Issue | Repro
WhereToCheck = <URL, repo, command, or repro steps>

Capability & Version Notes

  • Tested against Drasi X.Y (record drasi version output and keep this line current; version not verified in this repo).
  • Cypher support follows Drasi's documented subset; GQL support is additive and must be explicitly enabled via queryLanguage: GQL.
  • Verified against Drasi Cypher docs: features not listed in the supported subset (e.g., collect, DISTINCT, ORDER BY, LIMIT) should be treated as unsupported unless release notes state otherwise.

Drasi Function Guidance (Select the Right Tool)

Task type Function Notes / When to use
Detect state transition drasi.previousDistinctValue() Ignores repeated identical values.
Compare last-known values drasi.previousValue() Detect any change between revisions.
Evaluate sustained truth drasi.trueFor() Requires continuous "true" state for duration.
Evaluate timed condition drasi.trueLater() Pending result until specified timestamp.
Retrieve version snapshot drasi.getVersionByTimestamp() Requires temporal index.
Compute trend drasi.linearGradient() Aggregate slope detection (telemetry, rate rise).
Change timestamps drasi.changeDateTime() Windowing and recency checks against element's last change time. Evidence: Drasi custom functions docs (link below).
Historical lookup (range) drasi.getVersionsByTimeRange() Requires temporal index. Evidence: Drasi custom functions docs (link below).

Evidence: Drasi custom functions docs (https://github.com/drasi-project/docs/blob/main/docs/content/reference/query-language/drasi-custom-functions.md).

When to Use Drasi vs Brokers

  • Use Drasi for continuous queries over changing graphs, temporal reasoning (e.g., trueFor, historical versions), and when you need query results that reflect graph state transitions.
  • Use Event Hubs/Kafka for high-throughput event streaming with replay and retention when you want to reprocess raw events or build multiple downstream projections.
  • Use Service Bus/queues for command-style workflows, strict ordering per key, or guaranteed processing with dead-letter workflows.
  • If you need change-graph outputs with declarative querying, Drasi is the right fit; if you need raw event replay and fanout without query semantics, prefer a broker.

Query Performance & Reliability Guardrails

  • Avoid Cartesian joins (MATCH (a), (b)) unless absolutely required; they scale poorly.
  • Filter early with WHERE before aggregations to reduce per-diff work.
  • Prefer indexed properties in Source data where possible; query cost is driven by change volume + aggregation.
  • Reactions are at-least-once: design consumers to be idempotent and safe on duplicates.
  • Document a replay plan when queries drive external side effects (e.g., Service Bus/webhooks).

Supported Cypher Features (Safe to Use)

Feature Example
MATCH, WHERE, RETURN MATCH (n:Node) WHERE n.id > 5 RETURN n
WITH ... count() WITH n.category AS cat, count(*) AS cnt
Aggregations max(), min(), sum(), avg(), count()
coalesce() RETURN coalesce(n.value, 0)
drasi.changeDateTime() WHERE drasi.changeDateTime() > datetime()
drasi.previousValue() WHERE i.amount > drasi.previousValue(i.amount)
drasi.previousDistinctValue() WHERE drasi.previousDistinctValue(c.status) = 'pending'
Graph relationships MATCH (a)-[:LIKES]->(b)
Identifier escaping Use backticks for special chars: MATCH (n:`Special-Label`)

Unsupported Features (Will Fail)

Feature Status Workaround
collect() ⚠️ Not in supported subset Use WITH ... count() pattern
DISTINCT ❌ Not in supported subset Use WITH ... count() for grouping
ORDER BY ❌ Not in supported subset Sort client-side
LIMIT ❌ Not in supported subset Filter client-side

Source: Drasi Cypher support docs (https://github.com/drasi-project/docs/blob/main/docs/content/reference/query-language/cypher.md)

Query Patterns

Aggregation Pattern (Correct)

cypher
MATCH (w:WishlistItem)
WITH w.text AS item, count(*) AS frequency
WHERE frequency > 0
RETURN item, frequency

Duplicate Detection Pattern

cypher
MATCH (w1:WishlistItem), (w2:WishlistItem)
WHERE w1.toyName = w2.toyName
  AND w1.childId <> w2.childId
RETURN w1.childId AS child1,
       w2.childId AS child2,
       w1.toyName AS duplicate

Time-Windowed Pattern

cypher
MATCH (w:WishlistItem)
WHERE drasi.changeDateTime() > datetime() - duration('PT24H')
WITH w.toyName AS toy, count(*) AS requests
WHERE requests >= 5
RETURN toy, requests

Critical: Label Configuration

Queries must use the middleware label, NOT the event hub name:

yaml
# In Source definition
middleware:
  - kind: map
    my-event-hub:
      insert:
        - label: WishlistItem # <-- Use THIS in queries
yaml
# Map event types to labels (event type != hub/topic name)
middleware:
  - kind: map
    toy-events-hub:
      insert:
        - label: WishlistItem
          when:
            eventType: wishlist.item.created
        - label: ChildProfile
          when:
            eventType: child.profile.updated
cypher
# ✅ CORRECT
MATCH (w:WishlistItem)

# ❌ WRONG
MATCH (w:`my-event-hub`)

Deployment Commands

Always use Drasi CLI, NOT kubectl. Deploy in this order and verify each step:

  1. Source
  2. Namespace (if used)
  3. ContinuousQuery
  4. Reaction
bash
drasi apply -f queries.yaml -n drasi-system
drasi describe query my-query -n drasi-system

GQL Support

New Feature: Drasi now supports Graph Query Language (GQL) in addition to Cypher!

yaml
kind: ContinuousQuery
spec:
  queryLanguage: GQL  # or "Cypher" (default)
  query: |
    MATCH (v:Vehicle)
    WHERE v.color = 'Red'
    RETURN v.color

GQL-Specific Features:

  • FILTER - Post-query filtering (like SQL HAVING)
  • LET - Create computed variables
  • YIELD - Project and rename columns
  • NEXT - Chain multiple query statements
  • GROUP BY - Explicit aggregation grouping

Example with FILTER (post-aggregation):

gql
MATCH (v:Vehicle)
RETURN v.color AS color, count(v) AS vehicle_count
GROUP BY color
NEXT FILTER vehicle_count > 5
RETURN color, vehicle_count

Common Parser Errors:

  • "Identifier not found in scope" - Check variable names match your MATCH clause
  • Unexpected token errors - Ensure proper escaping with backticks for special characters

Security Best Practices

Input Validation

  • Never concatenate user input directly into Cypher queries
  • Validate and sanitize all external inputs before use in queries
  • Use typed parameters where possible to prevent injection attacks

Principle of Least Privilege

  • Grant Sources only the minimum permissions needed to read data
  • Configure Reactions with minimal write access to target systems
  • Use separate service accounts for different query contexts

Query Safety

  • Avoid runtime construction of query strings; keep queries static and validate inputs before they reach Drasi.
  • Prefer managed identities or workload identities for downstream reactions instead of embedding secrets.

Contract Evolution Guidance

  • Additive schema changes: add new properties/labels without removing existing ones; update queries to tolerate missing fields.
  • Renames: dual-write old and new property names/labels during a transition period, then remove after consumers migrate.
  • Deprecations: annotate query comments with deprecation date and migration path.

YAML Structure Best Practices

Naming Conventions

  • Use descriptive, lowercase names with hyphens: wishlist-trending-items
  • Prefix with domain/feature: inventory-low-stock-alert
  • Include purpose in the name: order-fraud-detection

Complete ContinuousQuery Example

yaml
apiVersion: v1
kind: ContinuousQuery
name: wishlist-trending-items # Descriptive, hyphenated name
spec:
  # Specify query language explicitly
  queryLanguage: Cypher

  # Document the query purpose
  # Purpose: Detect trending wishlist items requested by 5+ children in 24h
  # Trigger: New WishlistItem events from event hub
  # Output: List of trending toy names with request counts

  sources:
    input:
      - wishlist-source # Reference to Source definition

  query: |
    MATCH (w:WishlistItem)
    WHERE drasi.changeDateTime() > datetime() - duration('PT24H')
    WITH w.toyName AS toy, count(*) AS requests
    WHERE requests >= 5
    RETURN toy, requests

Complete Source Example

yaml
apiVersion: v1
kind: Source
name: wishlist-source
spec:
  kind: EventHub # or PostgreSQL, CosmosDB, etc.
  properties:
    connectionString: ${EVENTHUB_CONNECTION_STRING} # Use env vars for secrets
    consumerGroup: drasi-consumer
  middleware:
    - kind: map
      my-event-hub:
        insert:
          - label: WishlistItem # Label used in MATCH clauses
            properties:
              - name: toyName
              - name: childId
              - name: priority

Complete Reaction Example

yaml
apiVersion: v1
kind: Reaction
name: trending-alert-reaction
spec:
  kind: SignalR # or Webhook, StoredProc, etc.
  queries:
    - wishlist-trending-items # Reference to ContinuousQuery
  properties:
    connectionString: ${SIGNALR_CONNECTION_STRING}
    hubName: trending-alerts

Testing Guidance

Local Loop

  1. drasi apply to a test namespace
  2. Inspect output with a debug Reaction and drasi logs
  3. Validate query lag and expected match counts
  4. Adjust and redeploy using versioned YAML

Local Testing

  1. Use representative sample data that covers edge cases
  2. Test with the Drasi debug reaction to inspect query output:
    bash
    drasi apply -f debug-reaction.yaml -n drasi-system
    drasi logs reaction debug-reaction -n drasi-system -f
    

Validation Checklist

  • Query returns expected results with sample data
  • Aggregations produce correct counts/sums
  • Time-windowed queries trigger at appropriate intervals
  • Middleware labels match MATCH clause labels exactly
  • No unsupported Cypher features used
  • Duplicate detection queries include deterministic keys for idempotent reactions
  • Time-window boundaries behave as expected at edge values

Common Test Scenarios

  • Empty result set handling
  • Single item vs. multiple items
  • Boundary conditions for thresholds
  • Time window edge cases

Replay & Idempotency

  • Reactions are at-least-once: design outputs to be idempotent with deterministic keys (e.g., ${queryId}:${entityId}:${windowStart}).
  • For external side effects, document a replay plan and how duplicates are deduplicated.

Replay checklist:

  • Rebuild source with known input range
  • Clear or version downstream projections
  • Validate idempotency keys and dedupe logic
  • Re-run query and compare counts before/after

Backpressure & Retry Bounds

  • Tighten WHERE filters early to reduce per-change workload and avoid backlog growth.
  • Bound retries in downstream reactions; send poison messages to DLQ after max attempts.
  • Monitor backlog and tune reaction concurrency to match downstream capacity.

Troubleshooting

If query shows TerminalError:

  1. Check for unsupported Cypher features (see table above)
  2. Verify middleware label matches MATCH clause exactly
  3. Use drasi describe query <name> -n drasi-system for error details

Debug Steps:

bash
# Check query status
drasi list query -n drasi-system

# Get detailed error information
drasi describe query my-query -n drasi-system

# View reaction logs for output inspection
drasi logs reaction my-reaction -n drasi-system -f

Observability & Cost

  • Propagate correlation IDs from sources through reactions (headers or message properties).
  • Emit metrics: query lag, reaction throughput, DLQ counts, and duplicate rate.
  • Maintain a runbook: query purpose, rollback steps, replay steps, and alert thresholds.
  • Minimize payload size in reactions; project only required fields.
  • Avoid high-cardinality aggregations unless explicitly needed.

References

MCP Server

  • Use context7 MCP server with library ID: /drasi-project/docs

Official Documentation

Known Issues

  • Issue #308 - drasi apply 500 errors on reapply
  • Issue #43 - collect() function implementation (in progress)

Didn't find tool you were looking for?

Be as detailed as possible for better results