Savvi Studio

Stream Usage Patterns

Purpose: Common patterns for using streams
Last Updated: 2024-11-26

Overview

This guide covers common usage patterns for streams with cursors and other data sources.

Source Code Reference

For stream implementation details, see:

Basic Usage

From Cursor

const cursor = getSomeCursor();

const results = await cursor
  .stream()
  .map(item => item.name)
  .filter(name => name.length > 3)
  .collect();

From AsyncIterable

import { BaseStream } from '@/lib/stream';

async function* generator() {
  yield* [1, 2, 3, 4, 5];
}

const stream = new BaseStream(generator());
const doubled = await stream
  .map(n => n * 2)
  .collect();

Chained Transformations

const processed = await cursor
  .stream()
  .map(item => ({ ...item, processed: true }))
  .filter(item => item.isValid)
  .take(100)
  .collect();

Collection Methods

collect() / toArray()

// Collect all items into array
const items = await stream.collect();

reduce()

// Reduce to single value
const total = await cursor
  .stream()
  .map(item => item.value)
  .reduce((sum, val) => sum + val, 0);

forEach()

// Side effects for each item
await cursor
  .stream()
  .forEach(item => console.log(item));

Real-World Examples

Transform and Save

await cursor
  .stream()
  .map(item => transform(item))
  .forEach(async (item) => {
    await database.save(item);
  });

Batch Processing

For batching examples, see src/lib/stream/transform.ts for the batch transform implementation.

Filtering and Mapping

const validEmails = await users
  .stream()
  .filter(user => user.email)
  .map(user => user.email)
  .collect();

Memory-Efficient Patterns

Stream Processing (Good)

// Constant memory usage
await cursor
  .stream()
  .map(transform)
  .forEach(process);

Loading All (Bad)

// Loads everything into memory
const all = await cursor.stream().collect();
all.forEach(process);

Next steps: See Transformations for available operations.