Agent skill

flowing

Lightweight DAG workflow runner with checkpoint resume and detachable tasks. Use when orchestrating 3+ sequential or parallel tool calls into a single invocation, or when pipelines need resume-from-failure without re-running succeeded steps.

Stars 113
Forks 4

Install this agent skill to your Project

npx add-skill https://github.com/oaustegard/claude-skills/tree/main/flowing

Metadata

Additional technical details for this skill

version
1.0.0

SKILL.md

Flowing — DAG Workflow Runner

Batch independent operations into one python3 invocation. Declare steps, wire dependencies, run once.

Quick Start

python
from flowing import task, Flow

@task
def fetch_data():
    return {"items": [1, 2, 3]}

@task(depends_on=[fetch_data])
def process(fetch_data):
    return sum(fetch_data["items"])

@task(depends_on=[process])
def store(process):
    print(f"Result: {process}")

Flow(store).run()

Core API

@task decorator

python
@task(
    depends_on=[other_task],  # DAG edges
    retry=2,                  # retry count (0 = no retry)
    retry_backoff_base_ms=1000,
    retry_max_backoff_ms=30_000,
    timeout_s=60.0,
    detached=True,            # non-blocking side-effect
    name="custom_name",       # override function name
)
def my_step(other_task):      # param name = dependency task name
    return result

Flow class

python
flow = Flow(terminal_task, max_workers=5, fail_fast=True)
results = flow.run()          # execute full DAG
flow.summary()                # human-readable status
flow.value(some_task)         # get succeeded task's return value

Resume from failure

When a step fails mid-pipeline, fix the issue and continue without re-running succeeded steps:

python
flow = Flow(terminal)
results = flow.run()                    # step_3 fails
flow.override(step_3, corrected_value)  # inject fix
results = flow.resume()                 # step_1, step_2 cached; step_4+ runs
  • flow.resume(): Resets FAILED/SKIPPED tasks, keeps SUCCEEDED results cached
  • flow.override(task_def, value): Manually inject a succeeded result

Detached tasks (non-blocking side-effects)

python
@task(depends_on=[create_issue], detached=True)
def store_memory(create_issue):
    remember(create_issue["url"], ...)
  • Run in a final layer after the main DAG completes
  • Failures collected in flow.detached_failures, never trigger fail_fast
  • Dependencies must all be SUCCEEDED (same as normal tasks)

When to use

  • 3+ independent operations (recall, SQL, web search) that can parallelize
  • Multi-step pipelines where late failures shouldn't waste early work
  • Side-effects (memory storage, notifications) that shouldn't block the critical path

When NOT to use

  • Next step depends on reasoning about prior result (use a think loop)
  • Single sequential operation
  • Async/distributed workflows (this is single-container, ThreadPoolExecutor)

Didn't find tool you were looking for?

Be as detailed as possible for better results