Agent skill
ingesting-data
Data ingestion patterns for loading data from cloud storage, APIs, files, and streaming sources into databases. Use when importing CSV/JSON/Parquet files, pulling from S3/GCS buckets, consuming API feeds, or building ETL pipelines.
Install this agent skill to your Project
npx add-skill https://github.com/ancoleman/ai-design-components/tree/main/skills/ingesting-data
SKILL.md
Data Ingestion Patterns
This skill provides patterns for getting data INTO systems from external sources.
When to Use This Skill
- Importing CSV, JSON, Parquet, or Excel files
- Loading data from S3, GCS, or Azure Blob storage
- Consuming REST/GraphQL API feeds
- Building ETL/ELT pipelines
- Database migration and CDC (Change Data Capture)
- Streaming data ingestion from Kafka/Kinesis
Ingestion Pattern Decision Tree
What is your data source?
├── Cloud Storage (S3, GCS, Azure) → See cloud-storage.md
├── Files (CSV, JSON, Parquet) → See file-formats.md
├── REST/GraphQL APIs → See api-feeds.md
├── Streaming (Kafka, Kinesis) → See streaming-sources.md
├── Legacy Database → See database-migration.md
└── Need full ETL framework → See etl-tools.md
Quick Start by Language
Python (Recommended for ETL)
dlt (data load tool) - Modern Python ETL:
import dlt
# Define a source
@dlt.source
def github_source(repo: str):
@dlt.resource(write_disposition="merge", primary_key="id")
def issues():
response = requests.get(f"https://api.github.com/repos/{repo}/issues")
yield response.json()
return issues
# Load to destination
pipeline = dlt.pipeline(
pipeline_name="github_issues",
destination="postgres", # or duckdb, bigquery, snowflake
dataset_name="github_data"
)
load_info = pipeline.run(github_source("owner/repo"))
print(load_info)
Polars for file processing (faster than pandas):
import polars as pl
# Read CSV with schema inference
df = pl.read_csv("data.csv")
# Read Parquet (columnar, efficient)
df = pl.read_parquet("s3://bucket/data.parquet")
# Read JSON lines
df = pl.read_ndjson("events.jsonl")
# Write to database
df.write_database(
table_name="events",
connection="postgresql://user:pass@localhost/db",
if_table_exists="append"
)
TypeScript/Node.js
S3 ingestion:
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3";
import { parse } from "csv-parse/sync";
const s3 = new S3Client({ region: "us-east-1" });
async function ingestFromS3(bucket: string, key: string) {
const response = await s3.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
const body = await response.Body?.transformToString();
// Parse CSV
const records = parse(body, { columns: true, skip_empty_lines: true });
// Insert to database
await db.insert(eventsTable).values(records);
}
API feed polling:
import { Hono } from "hono";
// Webhook receiver for real-time ingestion
const app = new Hono();
app.post("/webhooks/stripe", async (c) => {
const event = await c.req.json();
// Validate webhook signature
const signature = c.req.header("stripe-signature");
// ... validation logic
// Ingest event
await db.insert(stripeEventsTable).values({
eventId: event.id,
type: event.type,
data: event.data,
receivedAt: new Date()
});
return c.json({ received: true });
});
Rust
High-performance file ingestion:
use polars::prelude::*;
use aws_sdk_s3::Client;
async fn ingest_parquet(client: &Client, bucket: &str, key: &str) -> Result<DataFrame> {
// Download from S3
let resp = client.get_object()
.bucket(bucket)
.key(key)
.send()
.await?;
let bytes = resp.body.collect().await?.into_bytes();
// Parse with Polars
let df = ParquetReader::new(Cursor::new(bytes))
.finish()?;
Ok(df)
}
Go
Concurrent file processing:
package main
import (
"context"
"encoding/csv"
"github.com/aws/aws-sdk-go-v2/service/s3"
)
func ingestCSV(ctx context.Context, client *s3.Client, bucket, key string) error {
resp, err := client.GetObject(ctx, &s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return err
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
records, err := reader.ReadAll()
if err != nil {
return err
}
// Batch insert to database
return batchInsert(ctx, records)
}
Ingestion Patterns
1. Batch Ingestion (Files/Storage)
For periodic bulk loads:
Source → Extract → Transform → Load → Validate
↓ ↓ ↓ ↓ ↓
S3 Download Clean/Map Insert Count check
Key considerations:
- Use chunked reading for large files (>100MB)
- Implement idempotency with checksums
- Track file processing state
- Handle partial failures
2. Streaming Ingestion (Real-time)
For continuous data flow:
Source → Buffer → Process → Load → Ack
↓ ↓ ↓ ↓ ↓
Kafka In-memory Transform DB Commit offset
Key considerations:
- At-least-once vs exactly-once semantics
- Backpressure handling
- Dead letter queues for failures
- Checkpoint management
3. API Polling (Feeds)
For external API data:
Schedule → Fetch → Dedupe → Load → Update cursor
↓ ↓ ↓ ↓ ↓
Cron API call By ID Insert Last timestamp
Key considerations:
- Rate limiting and backoff
- Incremental loading (cursors, timestamps)
- API pagination handling
- Retry with exponential backoff
4. Change Data Capture (CDC)
For database replication:
Source DB → Capture changes → Transform → Target DB
↓ ↓ ↓ ↓
Postgres Debezium/WAL Map schema Insert/Update
Key considerations:
- Initial snapshot + streaming changes
- Schema evolution handling
- Ordering guarantees
- Conflict resolution
Library Recommendations
| Use Case | Python | TypeScript | Rust | Go |
|---|---|---|---|---|
| ETL Framework | dlt, Meltano, Dagster | - | - | - |
| Cloud Storage | boto3, gcsfs, adlfs | @aws-sdk/, @google-cloud/ | aws-sdk-s3, object_store | aws-sdk-go-v2 |
| File Processing | polars, pandas, pyarrow | papaparse, xlsx, parquetjs | polars-rs, arrow-rs | encoding/csv, parquet-go |
| Streaming | confluent-kafka, aiokafka | kafkajs | rdkafka-rs | franz-go, sarama |
| CDC | Debezium, pg_logical | - | - | - |
Reference Documentation
references/cloud-storage.md- S3, GCS, Azure Blob patternsreferences/file-formats.md- CSV, JSON, Parquet, Excel handlingreferences/api-feeds.md- REST polling, webhooks, GraphQL subscriptionsreferences/streaming-sources.md- Kafka, Kinesis, Pub/Subreferences/database-migration.md- Schema migration, CDC patternsreferences/etl-tools.md- dlt, Meltano, Airbyte, Fivetran
Scripts
scripts/validate_csv_schema.py- Validate CSV against expected schemascripts/test_s3_connection.py- Test S3 bucket connectivityscripts/generate_dlt_pipeline.py- Generate dlt pipeline scaffold
Chaining with Database Skills
After ingestion, chain to appropriate database skill:
| Destination | Chain to Skill |
|---|---|
| PostgreSQL, MySQL | databases-relational |
| MongoDB, DynamoDB | databases-document |
| Qdrant, Pinecone | databases-vector (after embedding) |
| ClickHouse, TimescaleDB | databases-timeseries |
| Neo4j | databases-graph |
For vector databases, chain through ai-data-engineering for embedding:
ingesting-data → ai-data-engineering → databases-vector
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
designing-sdks
Design production-ready SDKs with retry logic, error handling, pagination, and multi-language support. Use when building client libraries for APIs or creating developer-facing SDK interfaces.
administering-linux
Manage Linux systems covering systemd services, process management, filesystems, networking, performance tuning, and troubleshooting. Use when deploying applications, optimizing server performance, diagnosing production issues, or managing users and security on Linux servers.
implementing-api-patterns
API design and implementation across REST, GraphQL, gRPC, and tRPC patterns. Use when building backend services, public APIs, or service-to-service communication. Covers REST frameworks (FastAPI, Axum, Gin, Hono), GraphQL libraries (Strawberry, async-graphql, gqlgen, Pothos), gRPC (Tonic, Connect-Go), tRPC for TypeScript, pagination strategies (cursor-based, offset-based), rate limiting, caching, versioning, and OpenAPI documentation generation. Includes frontend integration patterns for forms, tables, dashboards, and ai-chat skills.
prompt-engineering
Engineer effective LLM prompts using zero-shot, few-shot, chain-of-thought, and structured output techniques. Use when building LLM applications requiring reliable outputs, implementing RAG systems, creating AI agents, or optimizing prompt quality and cost. Covers OpenAI, Anthropic, and open-source models with multi-language examples (Python/TypeScript).
deploying-applications
Deployment patterns from Kubernetes to serverless and edge functions. Use when deploying applications, setting up CI/CD, or managing infrastructure. Covers Kubernetes (Helm, ArgoCD), serverless (Vercel, Lambda), edge (Cloudflare Workers, Deno), IaC (Pulumi, OpenTofu, SST), and GitOps patterns.
optimizing-costs
Optimize cloud infrastructure costs through FinOps practices, commitment discounts, right-sizing, and automated cost management. Use when reducing cloud spend, implementing budget controls, or establishing cost visibility across AWS, Azure, GCP, and Kubernetes environments.
Didn't find tool you were looking for?