Agent skill

add-datalake-consumer

Adds an event consumer that writes to Azure Data Lake (Parquet) following BI_SALES_RISK plan. Creates events/consumers/[Name]DataLakeCollector.ts subscribing to RabbitMQ, building Parquet rows, writing to /path_prefix/year=YYYY/month=MM/day=DD/. Use when adding DataLakeCollector in logging or similar “event to Data Lake” pipelines.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/add-datalake-consumer

SKILL.md

Add Data Lake Consumer

Event consumer that subscribes to RabbitMQ and writes to Azure Data Lake (Parquet). Pattern: logging’s DataLakeCollector for risk.evaluated (BI_SALES_RISK_IMPLEMENTATION_PLAN §3.5, §9.1). BI Sales Risk: Paths and Parquet columns MUST match documentation/requirements/BI_SALES_RISK_DATA_LAKE_LAYOUT.md (§2.1 risk.evaluated, §2.2 ml_outcomes, §4 config).

1. Consumer

Path: src/events/consumers/[Name]DataLakeCollector.ts

  • EventConsumer with queue, exchange: coder_events, bindings: e.g. ['risk.evaluated','ml.prediction.completed','opportunity.updated','forecast.generated'].
  • Handler: map event to row. For risk.evaluated use columns in Data Lake Layout §2.1. Build path: {path_prefix}/year={YYYY}/month={MM}/day={DD}/... (Layout §1).
  • Write via @azure/storage-blob (BlockBlob) or @azure/storage-blob + parquetjs (or Arrow) for Parquet. Buffer/batch by time or count if needed.
  • Config: data_lake.connection_string, data_lake.container, data_lake.path_prefix (e.g. /risk_evaluations).

2. Config

config/default.yaml:

yaml
data_lake:
  connection_string: ${DATA_LAKE_CONNECTION_STRING}
  container: ${DATA_LAKE_CONTAINER:-risk}
  path_prefix: ${DATA_LAKE_PATH_PREFIX:-/risk_evaluations}

rabbitmq:
  url: ${RABBITMQ_URL}
  exchange: coder_events
  queue: [module]_data_lake
  bindings:
    - risk.evaluated
    - ml.prediction.completed
    # ...

config/schema.json: add data_lake with connection_string, container, path_prefix.

3. Server

In server.ts: await dataLakeCollector.start() after RabbitMQ connect.

4. Checklist

  • Consumer in events/consumers/, subscribe to RabbitMQ (no Azure Service Bus)
  • Path: {path_prefix}/year=.../month=.../day=.../; format Parquet
  • Config: data_lake.* and schema; rabbitmq queue and bindings
  • Start collector in server

Didn't find tool you were looking for?

Be as detailed as possible for better results