Agent skill
Concurrency
This skill should be used when the user asks about "Effect concurrency", "fibers", "Fiber", "forking", "Effect.fork", "Effect.forkDaemon", "parallel execution", "Effect.all concurrency", "Deferred", "Queue", "PubSub", "Semaphore", "Latch", "fiber interruption", "Effect.race", "Effect.raceAll", "concurrent effects", or needs to understand how Effect handles parallel and concurrent execution.
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/concurrency
SKILL.md
Concurrency in Effect
Overview
Effect provides lightweight fiber-based concurrency:
- Fibers - Lightweight threads managed by Effect runtime
- Structured concurrency - Parent fibers supervise children
- Safe interruption - Clean cancellation with resource cleanup
- Concurrent primitives - Queue, Deferred, Semaphore, PubSub
Basic Parallel Execution
Effect.all with Concurrency
import { Effect } from "effect"
const results = yield* Effect.all(
[fetchUser(1), fetchUser(2), fetchUser(3)],
{ concurrency: "unbounded" }
)
const results = yield* Effect.all(tasks, { concurrency: 5 })
const results = yield* Effect.all(tasks)
Effect.forEach with Concurrency
const users = yield* Effect.forEach(
userIds,
(id) => fetchUser(id),
{ concurrency: 10 }
)
Fibers
Creating Fibers with fork
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
yield* doOtherWork()
const result = yield* Fiber.join(fiber)
})
Fork Variants
const fiber = yield* Effect.fork(task)
const fiber = yield* Effect.forkDaemon(task)
const fiber = yield* Effect.forkIn(scope)(task)
const fiber = yield* Effect.forkWithErrorHandler(task, onError)
Fiber Operations
import { Fiber } from "effect"
const result = yield* Fiber.join(fiber)
const exit = yield* Fiber.await(fiber)
yield* Fiber.interrupt(fiber)
const maybeResult = yield* Fiber.poll(fiber)
Racing
Effect.race - First to Complete
const fastest = yield* Effect.race(
fetchFromServer1(),
fetchFromServer2()
)
Effect.raceAll - Race Many
const fastest = yield* Effect.raceAll([
fetchFromCDN1(),
fetchFromCDN2(),
fetchFromCDN3()
])
Effect.raceFirst - Include Failures
const first = yield* Effect.raceFirst(task1, task2)
Deferred - One-Time Promise
import { Deferred } from "effect"
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<string, never>()
const fiber = yield* Effect.fork(
Effect.gen(function* () {
const value = yield* Deferred.await(deferred)
yield* Effect.log(`Got: ${value}`)
})
)
yield* Deferred.succeed(deferred, "Hello!")
yield* Fiber.join(fiber)
})
Queue - Concurrent Queue
import { Queue } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100)
yield* Effect.fork(
Effect.forEach(
[1, 2, 3, 4, 5],
(n) => Queue.offer(queue, n)
)
)
const items = yield* Effect.forEach(
Array.from({ length: 5 }),
() => Queue.take(queue)
)
})
Queue Variants
const bounded = yield* Queue.bounded<number>(100)
const unbounded = yield* Queue.unbounded<number>()
const dropping = yield* Queue.dropping<number>(100)
const sliding = yield* Queue.sliding<number>(100)
PubSub - Publish/Subscribe
import { PubSub } from "effect"
const program = Effect.gen(function* () {
const pubsub = yield* PubSub.bounded<string>(100)
const sub1 = yield* PubSub.subscribe(pubsub)
const sub2 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publish(pubsub, "Hello!")
const msg1 = yield* Queue.take(sub1)
const msg2 = yield* Queue.take(sub2)
})
Semaphore - Limit Concurrency
import { Effect } from "effect"
const program = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(3)
yield* Effect.forEach(
tasks,
(task) => semaphore.withPermits(1)(task),
{ concurrency: "unbounded" }
)
})
Latch - Coordination Point
import { Latch } from "effect"
const program = Effect.gen(function* () {
const latch = yield* Latch.make(false)
yield* Effect.fork(
Effect.forEach(
workers,
(worker) =>
Effect.gen(function* () {
yield* Latch.await(latch)
yield* worker.start()
}),
{ concurrency: "unbounded" }
)
)
yield* Latch.open(latch)
})
Interruption
Interrupting Fibers
const fiber = yield* Effect.fork(longTask)
yield* Fiber.interrupt(fiber)
Uninterruptible Regions
const critical = Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* performOperations()
yield* commitTransaction()
})
)
Interruptible Within Uninterruptible
const program = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalSetup()
// This part can be interrupted
yield* Effect.interruptible(longOperation)
yield* criticalTeardown()
})
)
Supervision
Structured concurrency ensures child fibers are managed:
const parent = Effect.gen(function* () {
const child1 = yield* Effect.fork(task1)
const child2 = yield* Effect.fork(task2)
// If parent fails/interrupts, children are interrupted
yield* failingOperation()
})
// child1 and child2 automatically interrupted
Daemon Fibers
Escape supervision with daemon:
const daemon = yield* Effect.forkDaemon(backgroundTask)
Common Patterns
Timeout with Fallback
const withTimeout = task.pipe(
Effect.timeout("5 seconds"),
Effect.map(Option.getOrElse(() => defaultValue))
)
Worker Pool
const workerPool = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(numWorkers)
return (task: Effect.Effect<A>) =>
semaphore.withPermits(1)(task)
})
Parallel with Error Collection
const results = yield* Effect.all(
tasks,
{
concurrency: "unbounded",
mode: "either" // Collect all results
}
)
Best Practices
- Use Effect.all concurrency for simple parallelism
- Use Semaphore to limit concurrent operations
- Prefer structured concurrency over daemon fibers
- Handle interruption in long-running effects
- Use Queue for producer/consumer patterns
- Use Deferred for one-time coordination
Additional Resources
For comprehensive concurrency documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Fibers" for fiber management
- "Basic Concurrency" for parallel execution
- "Deferred" for synchronization primitives
- "Queue" for concurrent queues
- "PubSub" for publish/subscribe
- "Semaphore" for concurrency limiting
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
agent-ops-spec
Manage specification documents in .agent/specs/. Use when user provides requirements, acceptance criteria, or feature descriptions that need to be tracked and validated against implementation.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-testing
Test strategy, execution, and coverage analysis. Use when designing tests, running test suites, or analyzing test results beyond baseline checks.
agent-ops-state
Maintain .agent state files. Use at session start, after meaningful steps, and before concluding: read/update constitution/memory/focus/issues/baseline consistently.
Didn't find tool you were looking for?