Agent skill

ingesting-data

Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.

Stars 333
Forks 51

Install this agent skill to your Project

npx add-skill https://github.com/ancoleman/ai-design-components/tree/main/skills/ingesting-data

SKILL.md

Data Ingestion Patterns

This skill provides patterns for getting data INTO systems from external sources.

When to Use This Skill

  • Importing CSV, JSON, Parquet, or Excel files
  • Loading data from S3, GCS, or Azure Blob storage
  • Consuming REST/GraphQL API feeds
  • Building ETL/ELT pipelines
  • Database migration and CDC (Change Data Capture)
  • Streaming data ingestion from Kafka/Kinesis

Ingestion Pattern Decision Tree

What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md

Quick Start by Language

Python (Recommended for ETL)

dlt (data load tool) - Modern Python ETL:

python
import dlt

# Define a source
@dlt.source
def github_source(repo: str):
    @dlt.resource(write_disposition="merge", primary_key="id")
    def issues():
        response = requests.get(f"https://api.github.com/repos/{repo}/issues")
        yield response.json()
    return issues

# Load to destination
pipeline = dlt.pipeline(
    pipeline_name="github_issues",
    destination="postgres",  # or duckdb, bigquery, snowflake
    dataset_name="github_data"
)

load_info = pipeline.run(github_source("owner/repo"))
print(load_info)

Polars for file processing (faster than pandas):

python
import polars as pl

# Read CSV with schema inference
df = pl.read_csv("data.csv")

# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")

# Read JSON lines
df = pl.read_ndjson("events.jsonl")

# Write to database
df.write_database(
    table_name="events",
    connection="postgresql://user:pass@localhost/db",
    if_table_exists="append"
)

TypeScript/Node.js

S3 ingestion:

typescript
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";

const s3 = new S3Client({ region: "us-east-1" });

async function ingestFromS3(bucket: string, key: string) {
  const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
  const body = await response.Body?.transformToString();

  // Parse CSV
  const records = parse(body, { columns: true, skip_empty_lines: true });

  // Insert to database
  await db.insert(eventsTable).values(records);
}

API feed polling:

typescript
import { Hono } from "hono";

// Webhook receiver for real-time ingestion
const app = new Hono();

app.post("/webhooks/stripe", async (c) => {
  const event = await c.req.json();

  // Validate webhook signature
  const signature = c.req.header("stripe-signature");
  // ... validation logic

  // Ingest event
  await db.insert(stripeEventsTable).values({
    eventId: event.id,
    type: event.type,
    data: event.data,
    receivedAt: new Date()
  });

  return c.json({ received: true });
});

Rust

High-performance file ingestion:

rust
use polars::prelude::*;
use aws_sdk_s3::Client;

async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
    // Download from S3
    let resp = client.get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    let bytes = resp.body.collect().await?.into_bytes();

    // Parse with Polars
    let df = ParquetReader::new(Cursor::new(bytes))
        .finish()?;

    Ok(df)
}

Go

Concurrent file processing:

go
package main

import (
    "context"
    "encoding/csv"
    "github.com/aws/aws-sdk-go-v2/service/s3"
)

func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
    resp, err := client.GetObject(ctx, &s3.GetObjectInput{
        Bucket: &bucket,
        Key:    &key,
    })
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    reader := csv.NewReader(resp.Body)
    records, err := reader.ReadAll()
    if err != nil {
        return err
    }

    // Batch insert to database
    return batchInsert(ctx, records)
}

Ingestion Patterns

1. Batch Ingestion (Files/Storage)

For periodic bulk loads:

Source → Extract → Transform → Load → Validate
  ↓         ↓          ↓         ↓        ↓
 S3      Download   Clean/Map  Insert   Count check

Key considerations:

  • Use chunked reading for large files (>100MB)
  • Implement idempotency with checksums
  • Track file processing state
  • Handle partial failures

2. Streaming Ingestion (Real-time)

For continuous data flow:

Source → Buffer → Process → Load → Ack
  ↓        ↓         ↓        ↓      ↓
Kafka   In-memory  Transform  DB   Commit offset

Key considerations:

  • At-least-once vs exactly-once semantics
  • Backpressure handling
  • Dead letter queues for failures
  • Checkpoint management

3. API Polling (Feeds)

For external API data:

Schedule → Fetch → Dedupe → Load → Update cursor
   ↓         ↓        ↓       ↓         ↓
 Cron     API call  By ID   Insert   Last timestamp

Key considerations:

  • Rate limiting and backoff
  • Incremental loading (cursors, timestamps)
  • API pagination handling
  • Retry with exponential backoff

4. Change Data Capture (CDC)

For database replication:

Source DB → Capture changes → Transform → Target DB
    ↓             ↓               ↓            ↓
 Postgres    Debezium/WAL      Map schema   Insert/Update

Key considerations:

  • Initial snapshot + streaming changes
  • Schema evolution handling
  • Ordering guarantees
  • Conflict resolution

Library Recommendations

Use Case Python TypeScript Rust Go
ETL Framework dlt, Meltano, Dagster - - -
Cloud Storage boto3, gcsfs, adlfs @aws-sdk/, @google-cloud/ aws-sdk-s3, object_store aws-sdk-go-v2
File Processing polars, pandas, pyarrow papaparse, xlsx, parquetjs polars-rs, arrow-rs encoding/csv, parquet-go
Streaming confluent-kafka, aiokafka kafkajs rdkafka-rs franz-go, sarama
CDC Debezium, pg_logical - - -

Reference Documentation

  • references/cloud-storage.md - S3, GCS, Azure Blob patterns
  • references/file-formats.md - CSV, JSON, Parquet, Excel handling
  • references/api-feeds.md - REST polling, webhooks, GraphQL subscriptions
  • references/streaming-sources.md - Kafka, Kinesis, Pub/Sub
  • references/database-migration.md - Schema migration, CDC patterns
  • references/etl-tools.md - dlt, Meltano, Airbyte, Fivetran

Scripts

  • scripts/validate_csv_schema.py - Validate CSV against expected schema
  • scripts/test_s3_connection.py - Test S3 bucket connectivity
  • scripts/generate_dlt_pipeline.py - Generate dlt pipeline scaffold

Chaining with Database Skills

After ingestion, chain to appropriate database skill:

Destination Chain to Skill
PostgreSQL, MySQL databases-relational
MongoDB, DynamoDB databases-document
Qdrant, Pinecone databases-vector (after embedding)
ClickHouse, TimescaleDB databases-timeseries
Neo4j databases-graph

For vector databases, chain through ai-data-engineering for embedding:

ingesting-data → ai-data-engineering → databases-vector

Expand your agent's capabilities with these related and highly-rated skills.

ancoleman/ai-design-components

designing-sdks

Design production-ready SDKs with retry logic, error handling, pagination, and multi-language support. Use when building client libraries for APIs or creating developer-facing SDK interfaces.

333 51
Explore
ancoleman/ai-design-components

administering-linux

Manage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.

333 51
Explore
ancoleman/ai-design-components

implementing-api-patterns

API design and implementation across REST, GraphQL, gRPC, and tRPC patterns. Use when building backend services, public APIs, or service-to-service communication. Covers REST frameworks (FastAPI, Axum, Gin, Hono), GraphQL libraries (Strawberry, async-graphql, gqlgen, Pothos), gRPC (Tonic, Connect-Go), tRPC for TypeScript, pagination strategies (cursor-based, offset-based), rate limiting, caching, versioning, and OpenAPI documentation generation. Includes frontend integration patterns for forms, tables, dashboards, and ai-chat skills.

333 51
Explore
ancoleman/ai-design-components

prompt-engineering

Engineer effective LLM prompts using zero-shot, few-shot, chain-of-thought, and structured output techniques. Use when building LLM applications requiring reliable outputs, implementing RAG systems, creating AI agents, or optimizing prompt quality and cost. Covers OpenAI, Anthropic, and open-source models with multi-language examples (Python/TypeScript).

333 51
Explore
ancoleman/ai-design-components

deploying-applications

Deployment patterns from Kubernetes to serverless and edge functions. Use when deploying applications, setting up CI/CD, or managing infrastructure. Covers Kubernetes (Helm, ArgoCD), serverless (Vercel, Lambda), edge (Cloudflare Workers, Deno), IaC (Pulumi, OpenTofu, SST), and GitOps patterns.

333 51
Explore
ancoleman/ai-design-components

optimizing-costs

Optimize cloud infrastructure costs through FinOps practices, commitment discounts, right-sizing, and automated cost management. Use when reducing cloud spend, implementing budget controls, or establishing cost visibility across AWS, Azure, GCP, and Kubernetes environments.

333 51
Explore

Didn't find tool you were looking for?

Be as detailed as possible for better results