Agent skill
run-resource-design
Guide for designing Run resources in OptAIC. Use when creating PipelineRun, ExperimentRun, BacktestRun, PortfolioOptimizationRun, TrainingRun, InferenceRun, or MonitoringRun. Covers execution tracking, metrics, output artifacts, and lineage.
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/run-resource-design
SKILL.md
Run Resource Design Patterns
Guide for designing Run resources that track execution results and produce versioned outputs.
When to Use
Apply when:
- Creating execution tracking for pipeline/model/backtest runs
- Designing output artifact storage patterns
- Implementing metrics and status tracking
- Building lineage tracking for reproducibility
Critical Concept: Runs are Activities of Flows
Runs are NOT the same as Flow Execution Resources.
| Concept | Type | Lifecycle | Analogy |
|---|---|---|---|
| Flow Execution Resource | Static | Created with Instance | The "deployment" |
| Run | Dynamic | Created each trigger | The "execution" |
Flow Execution Resource = Prefect Deployment (static, created once)
Run = Prefect Flow Run (dynamic, created each trigger, many per Flow)
Instance.refresh_flow ──┬──> PipelineRun (2024-01-01)
├──> PipelineRun (2024-01-02)
└──> PipelineRun (2024-01-03)
Core Concept: Execution Record
Runs track execution activities of Flow Execution Resources:
Run = Execution Activity
├── parent_instance_id # Which Instance was executed
├── flow_kind # Which flow type (refresh, train, infer, monitor)
├── orchestrator_run_id # Prefect flow run ID
├── status # pending|running|completed|failed
├── started_at / ended_at # Timing
├── metrics_json # Computed metrics
├── outputs_ref # Path to output artifacts
└── input_versions # Versions of upstream resources used
Run Types
| Type | Parent Instance | Flow Kind | Key Outputs |
|---|---|---|---|
PipelineRun |
DatasetInstance | refresh | rows_added, last_date |
ExperimentRun |
ExperimentInstance | preview | preview_data, statistics |
BacktestRun |
BacktestInstance | backtest | equity_curve, trades, metrics |
PortfolioOptimizationRun |
PortfolioOptimizerInstance | optimize | weights, metrics |
TrainingRun |
ModelInstance | training | model_artifact, metrics |
InferenceRun |
ModelInstance | inference | predictions, confidence |
MonitoringRun |
ModelInstance/DatasetInstance | monitoring | drift_metrics, alerts |
Run Creation via RunExecutionService
Runs are created by triggering a Flow Execution Resource:
python
# libs/orchestration/run_service.py
async def submit_pipeline_run(
session: AsyncSession,
actor: ActorContext,
dataset_id: UUID,
mode: str = "incremental",
) -> PipelineRun:
# 1. Load Instance and get Flow deployment ID
instance = await session.get(DatasetInstance, dataset_id)
deployment_id = instance.prefect_deployment_id
# 2. Check upstream freshness (flow-to-flow lineage)
report = await lineage_resolver.check_upstream_freshness(
session, dataset_id, freshness_checker
)
if not report.all_ready:
raise UpstreamNotReadyError(...)
# 3. Create Run resource record
run = PipelineRun(
parent_id=dataset_id,
flow_kind="refresh",
status="pending",
)
session.add(run)
# 4. Trigger Prefect deployment
result = await orchestrator.submit_run(
run_id=run.id,
deployment_id=deployment_id,
parameters={"mode": mode},
)
run.orchestrator_run_id = result.orchestrator_run_id
# 5. Emit activity
await emit_activity("pipeline_run.started", run.id, {...})
return run
Status Flow
pending → running → completed
↘ failed
↘ cancelled
Run Lifecycle
- Submit: Create Run in
pending - Start: Transition to
running, setstarted_at - Progress: Update
progress_pct, emit activities - Complete: Set
ended_at, store outputs, transition tocompleted - Fail: Set error info, transition to
failed
Output Artifacts
python
run_outputs = {
"metrics_json": {
"sharpe_ratio": 1.85,
"max_drawdown": -0.12,
"total_return": 0.15
},
"artifacts_ref": {
"equity_curve": "s3://runs/{run_id}/equity_curve.parquet",
"trades": "s3://runs/{run_id}/trades.parquet",
"weights_history": "s3://runs/{run_id}/weights.parquet"
}
}
Lineage Tracking
Track which versions of upstream resources were used:
python
input_versions = {
"signal_instance_id": "uuid",
"signal_version_id": "version-uuid",
"price_dataset_version_id": "version-uuid",
"model_artifact_version": "v1.2.3"
}
Run Completion Updates Flow Status
When a Run completes, it updates the parent Flow's status:
python
async def _on_run_completed(session: AsyncSession, run: PipelineRun):
# 1. Update Instance's freshness status
instance = await session.get(DatasetInstance, run.parent_id)
instance.freshness_status = "ready"
instance.last_run_at = run.finished_at
# 2. Update StatusStore for freshness calculations
await status_store.mark_run_success(
resource_id=instance.resource_id,
last_data_date=run.metrics_json.get("last_data_date"),
rows_processed=run.metrics_json.get("rows_processed"),
)
# 3. Propagate staleness to downstream resources
affected = await lineage_resolver.propagate_staleness(
session, instance.resource_id
)
# 4. Publish real-time status update
await centrifugo.publish(
channel=f"instance:{instance.resource_id}:status",
data={"status": "ready", "last_run_id": str(run.id)},
)
# 5. Emit completion activity
await emit_activity("pipeline_run.completed", run.id, {
"metrics": run.metrics_json,
"affected_downstream": [str(r) for r in affected],
})
Implementation Checklist
- Create extension table with run-specific fields
- Include
flow_kindandorchestrator_run_idfields - Implement status transitions with validation
- Track timing (started_at, ended_at)
- Store metrics in
metrics_json - Store large outputs externally (
artifacts_ref) - Track input versions for lineage
- Emit activities at lifecycle transitions
- Update Flow status on completion (via StatusStore)
- Propagate staleness to downstream (via LineageResolver)
- Publish real-time updates (via Centrifugo)
Reference Files
Didn't find tool you were looking for?