Agent skill
effect-concurrency
Use when Effect concurrency patterns including fibers, fork, join, parallel execution, and race conditions. Use for concurrent operations in Effect applications.
Install this agent skill to your Project
npx add-skill https://github.com/TheBushidoCollective/han/tree/main/plugins/frameworks/effect/skills/effect-concurrency
SKILL.md
Effect Concurrency
Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.
Fibers Fundamentals
What are Fibers?
Fibers are lightweight virtual threads that execute effects concurrently:
import { Effect, Fiber } from "effect"
// Every effect runs on a fiber
const effect = Effect.succeed(42)
// When run, this executes on a fiber
// Effects are descriptions - fibers are executions
// Effect: lazy, immutable description
// Fiber: running execution with state
Forking Effects
Create independent concurrent fibers:
import { Effect, Fiber } from "effect"
const task = Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* Effect.log("Task completed")
return 42
})
const program = Effect.gen(function* () {
// Fork creates a new fiber
const fiber = yield* Effect.fork(task)
// fiber: RuntimeFiber<number, never>
yield* Effect.log("Main fiber continues")
// Join waits for fiber to complete
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Result: ${result}`)
return result
})
Fiber Operations
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Join - wait for result
const result = yield* Fiber.join(fiber)
// Await - get Exit value (success/failure/interruption)
const exit = yield* Fiber.await(fiber)
// Interrupt - cancel execution
yield* Fiber.interrupt(fiber)
// Poll - check if complete (non-blocking)
const status = yield* Fiber.poll(fiber)
})
Parallel Execution
Effect.all - Run Multiple Effects
import { Effect } from "effect"
// Parallel execution (default)
const program = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
])
// All requests run concurrently
return results
})
// Sequential execution
const sequential = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
], { concurrency: 1 })
return results
})
// Limited concurrency
const limited = Effect.gen(function* () {
const results = yield* Effect.all(
Array.from({ length: 100 }, (_, i) => fetchUser(`${i}`)),
{ concurrency: 10 } // Max 10 concurrent
)
return results
})
Effect.all with Batching
import { Effect } from "effect"
// Batching for efficiency
const batchFetch = Effect.gen(function* () {
const userIds = Array.from({ length: 1000 }, (_, i) => `${i}`)
const results = yield* Effect.all(
userIds.map(id => fetchUser(id)),
{
concurrency: 50, // 50 concurrent requests
batching: true // Enable batching optimization
}
)
return results
})
Effect.forEach - Concurrent Iteration
import { Effect } from "effect"
const processUsers = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => Effect.gen(function* () {
const user = yield* fetchUser(id)
const processed = yield* processUser(user)
return processed
}),
{ concurrency: "unbounded" } // No limit
)
// With concurrency limit
const processUsersLimited = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => processUser(id),
{ concurrency: 10 }
)
Racing Effects
Effect.race - First to Complete
import { Effect } from "effect"
const fetchWithFallback = (id: string) =>
Effect.race(
fetchFromPrimaryDb(id),
fetchFromSecondaryDb(id)
)
// Returns whichever completes first
// Racing multiple effects
const fastestSource = Effect.race(
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
)
Effect.raceAll - Race Multiple Effects
import { Effect } from "effect"
const sources = [
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
]
// First to succeed wins
const fastest = Effect.raceAll(sources)
Timeout Racing
import { Effect } from "effect"
const withTimeout = <A, E, R>(
effect: Effect.Effect<A, E, R>,
duration: Duration.Duration
) =>
Effect.race(
effect,
Effect.sleep(duration).pipe(
Effect.andThen(Effect.fail({ _tag: "Timeout" }))
)
)
const program = Effect.gen(function* () {
const result = yield* withTimeout(
slowOperation(),
Duration.seconds(5)
)
return result
})
Interruption
Fiber Interruption
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Cancel after 1 second
yield* Effect.sleep("1 second")
yield* Fiber.interrupt(fiber)
yield* Effect.log("Task cancelled")
})
// Automatic interruption on parent exit
const autoInterrupt = Effect.gen(function* () {
const fiber = yield* Effect.fork(infiniteLoop)
// fiber will be interrupted when this effect completes
})
Uninterruptible Regions
import { Effect } from "effect"
const criticalSection = Effect.gen(function* () {
// This region cannot be interrupted
yield* Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* updateDatabase()
yield* commitTransaction()
})
)
})
// Interruptible regions within uninterruptible
const mixed = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalOperation1()
// Allow interruption here
yield* Effect.interruptible(
nonCriticalOperation()
)
yield* criticalOperation2()
})
)
Daemon Fibers
Fork Daemon - Independent Fibers
import { Effect } from "effect"
const program = Effect.gen(function* () {
// Regular fork - interrupted when parent exits
const regularFiber = yield* Effect.fork(task)
// Daemon fork - survives parent exit
const daemonFiber = yield* Effect.forkDaemon(backgroundTask)
// Parent exits, regularFiber interrupted, daemonFiber continues
})
// Background worker example
const startBackgroundWorker = Effect.gen(function* () {
yield* Effect.forkDaemon(
Effect.gen(function* () {
while (true) {
yield* processQueue()
yield* Effect.sleep("1 second")
}
})
)
})
Scoped Concurrency
Effect.forkScoped - Fiber Cleanup
import { Effect, Scope } from "effect"
const program = Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
// Fibers are tied to scope
const fiber1 = yield* Effect.forkScoped(task1)
const fiber2 = yield* Effect.forkScoped(task2)
// Do work
yield* doWork()
// Scope exit automatically interrupts fibers
})
)
// fiber1 and fiber2 are interrupted here
})
Fork In Scope
import { Effect } from "effect"
const managedConcurrency = Effect.gen(function* () {
const scope = yield* Scope.make()
// Fork in specific scope
const fiber = yield* Effect.forkIn(task, scope)
// Work continues
yield* doWork()
// Close scope, interrupt fiber
yield* Scope.close(scope, Exit.succeed(undefined))
})
Advanced Patterns
Worker Pool
import { Effect, Queue } from "effect"
interface Task {
id: string
data: unknown
}
const createWorkerPool = (workers: number) =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<Task>(100)
// Start workers
const workerFibers = yield* Effect.all(
Array.from({ length: workers }, () =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(queue)
yield* processTask(task)
})
)
)
)
)
return {
submit: (task: Task) => Queue.offer(queue, task),
shutdown: () =>
Effect.all(
workerFibers.map(fiber => Fiber.interrupt(fiber))
)
}
})
Parallel Map-Reduce
import { Effect, Chunk } from "effect"
const parallelMapReduce = <A, B, E, R>(
items: A[],
map: (item: A) => Effect.Effect<B, E, R>,
reduce: (acc: B, item: B) => B,
initial: B,
concurrency: number
) =>
Effect.gen(function* () {
const mapped = yield* Effect.forEach(
items,
map,
{ concurrency }
)
return mapped.reduce(reduce, initial)
})
Request Deduplication
import { Effect, Request, RequestResolver } from "effect"
interface GetUser extends Request.Request<User, UserNotFound> {
readonly _tag: "GetUser"
readonly id: string
}
const GetUserResolver = RequestResolver.makeBatched(
(requests: GetUser[]) =>
Effect.gen(function* () {
const ids = requests.map(r => r.id)
const users = yield* fetchUsersBatch(ids)
// Resolve all requests
return Effect.forEach(requests, (request) => {
const user = users.find(u => u.id === request.id)
return user
? Request.complete(request, user)
: Request.fail(request, { _tag: "UserNotFound", id: request.id })
})
})
)
// Multiple concurrent requests for same ID deduplicated
const program = Effect.gen(function* () {
const results = yield* Effect.all([
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver)
])
// Only one actual fetch for ID "1"
})
Best Practices
-
Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.
-
Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.
-
Handle Interruption: Ensure cleanup code runs in uninterruptible regions.
-
Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.
-
Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.
-
Batch Requests: Use request resolvers to batch and deduplicate.
-
Timeout Long Operations: Add timeouts to prevent hanging.
-
Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.
-
Use Daemon Sparingly: Only fork daemons when truly independent.
-
Test Concurrent Code: Write tests for race conditions and interruption.
Common Pitfalls
-
Forgetting to Join: Forking without joining loses results.
-
No Concurrency Limits: Unbounded concurrency can exhaust resources.
-
Not Handling Interruption: Missing cleanup in interruptible regions.
-
Race Conditions: Sharing mutable state between fibers.
-
Deadlocks: Circular dependencies between fibers.
-
Ignoring Failures: Not checking fiber exit status.
-
Memory Leaks: Daemon fibers that never terminate.
-
Over-Forking: Creating too many fibers unnecessarily.
-
Missing Timeouts: Long-running operations without limits.
-
Wrong Execution Mode: Using sequential when parallel is intended.
When to Use This Skill
Use effect-concurrency when you need to:
- Execute multiple operations in parallel
- Build high-performance data pipelines
- Handle concurrent user requests
- Implement background workers
- Race multiple data sources
- Add timeouts to operations
- Build concurrent job processors
- Manage fiber lifecycles
- Implement request deduplication
- Optimize throughput with batching
Resources
Official Documentation
Related Skills
- effect-core-patterns - Basic Effect operations
- effect-resource-management - Resource cleanup with scopes
Didn't find tool you were looking for?