Agent skill
Streams
This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.
Stars
163
Forks
31
Install this agent skill to your Project
npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/streams
SKILL.md
Streams in Effect
Overview
Effect Streams provide:
- Lazy evaluation - Elements produced on demand
- Resource safety - Automatic cleanup
- Backpressure - Producer/consumer coordination
- Composition - Transform, filter, merge streams
- Error handling - Typed errors in stream pipeline
typescript
Stream<A, E, R>
// Produces values of type A
// May fail with error E
// Requires environment R
Creating Streams
From Values
typescript
import { Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
const fromArray = Stream.fromIterable([1, 2, 3])
const empty = Stream.empty
const single = Stream.succeed(42)
const infinite = Stream.iterate(1, (n) => n + 1)
From Effects
typescript
const fromEffect = Stream.fromEffect(fetchData())
const polling = Stream.repeatEffect(checkStatus())
const scheduled = Stream.repeatEffectWithSchedule(
checkStatus(),
Schedule.spaced("5 seconds")
)
From Async Sources
typescript
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
asyncGenerator(),
(error) => new StreamError({ cause: error })
)
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value)
eventEmitter.on("data", handler)
return Effect.sync(() => eventEmitter.off("data", handler))
})
// From queue
const fromQueue = Stream.fromQueue(queue)
Generating Streams
typescript
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))
const range = Stream.range(1, 100)
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
Stream.take(5)
)
Transforming Streams
map - Transform Elements
typescript
const doubled = numbers.pipe(
Stream.map((n) => n * 2)
)
const enriched = users.pipe(
Stream.mapEffect((user) => fetchProfile(user.id))
)
const parallel = items.pipe(
Stream.mapEffect(process, { concurrency: 10 })
)
filter - Select Elements
typescript
const evens = numbers.pipe(
Stream.filter((n) => n % 2 === 0)
)
const valid = items.pipe(
Stream.filterEffect((item) => validate(item))
)
flatMap - Nested Streams
typescript
const expanded = numbers.pipe(
Stream.flatMap((n) => Stream.make(n, n * 10, n * 100))
)
// 1, 10, 100, 2, 20, 200, ...
take/drop
typescript
const first5 = numbers.pipe(Stream.take(5))
const skip5 = numbers.pipe(Stream.drop(5))
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10))
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))
Combining Streams
concat - Sequential
typescript
const combined = Stream.concat(stream1, stream2)
// or
const combined = stream1.pipe(Stream.concat(stream2))
merge - Interleaved
typescript
// Interleave elements from both
const merged = Stream.merge(stream1, stream2)
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })
zip - Pair Elements
typescript
const zipped = Stream.zip(names, ages)
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(
names,
ages,
(name, age) => ({ name, age })
)
interleave
typescript
const interleaved = Stream.interleave(stream1, stream2)
// a1, b1, a2, b2, ...
Consuming Streams
Running to Collection
typescript
const array = yield* Stream.runCollect(numbers)
const first = yield* Stream.runHead(numbers)
const sum = yield* Stream.runFold(
numbers,
0,
(acc, n) => acc + n
)
Running for Effects
typescript
yield* numbers.pipe(
Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)
yield* numbers.pipe(Stream.runDrain)
Running to Sink
typescript
import { Sink } from "effect"
const sum = yield* numbers.pipe(
Stream.run(Sink.sum)
)
const array = yield* numbers.pipe(
Stream.run(Sink.collectAll())
)
Chunking
Streams process elements in chunks for efficiency:
typescript
const chunked = numbers.pipe(
Stream.grouped(10)
)
const processed = numbers.pipe(
Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2))
)
const rechunked = numbers.pipe(
Stream.rechunk(100)
)
Error Handling
typescript
const safe = stream.pipe(
Stream.catchAll((error) => Stream.succeed(fallbackValue))
)
const handled = stream.pipe(
Stream.catchTag("NetworkError", (error) =>
Stream.succeed(cachedValue)
)
)
const resilient = stream.pipe(
Stream.retry(Schedule.exponential("1 second"))
)
const withFallback = stream.pipe(
Stream.orElse(() => fallbackStream)
)
Resource Management
typescript
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024)
const bytes = fs.readSync(fd, buffer)
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
})
)
)
)
// Scoped streams
const scoped = Stream.scoped(
Effect.acquireRelease(openConnection, closeConnection)
)
Sinks
Sinks consume stream elements:
typescript
import { Sink } from "effect"
Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)
const maxSink = Sink.foldLeft(
Number.NEGATIVE_INFINITY,
(max, n: number) => Math.max(max, n)
)
Common Patterns
Batched Processing
typescript
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) =>
Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
)
)
Rate Limiting
typescript
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape"
})
)
Debouncing
typescript
const debounced = stream.pipe(
Stream.debounce("500 millis")
)
Windowing
typescript
// Time-based windows
const windows = stream.pipe(
Stream.groupedWithin(1000, "1 second")
)
Best Practices
- Use chunking for efficiency - Batch operations when possible
- Handle backpressure - Use appropriate buffer strategies
- Clean up resources - Use acquireRelease for external resources
- Process in parallel - Use concurrency option in mapEffect
- Handle errors early - Catch/retry before final consumption
Additional Resources
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks
Didn't find tool you were looking for?