Savvi Studio

Stream Best Practices

Purpose: Guidelines for working with streams
Last Updated: 2024-11-26

Overview

This document provides guidelines and anti-patterns for working with streams.

Source Code Reference

For stream implementation details, see:

Design Guidelines

✅ Do

Use streams for transformations

  • Keep transformations in streams
  • Compose operations functionally
  • Leverage lazy evaluation

Keep transformations pure

  • Avoid side effects in map/filter
  • Use forEach for side effects
  • Make transformations predictable

Use appropriate collectors

  • Use reduce for aggregation
  • Use forEach for side effects
  • Use collect only when needed

Handle errors properly

  • Wrap async operations in try/catch
  • Propagate errors appropriately
  • Provide meaningful error messages

❌ Don't

Mix pagination into streams

  • Keep pagination in cursors
  • Use cursors for position tracking
  • Don't manage cursor state in streams

Create unnecessary intermediate arrays

  • Use lazy evaluation
  • Stream through data
  • Only collect when needed

Hold large state in transformations

  • Keep transformations stateless when possible
  • Use external state carefully
  • Consider memory implications

Skip error handling

  • Always handle async errors
  • Don't ignore failure cases
  • Provide fallback behavior

Performance Considerations

Memory Efficiency

Good: Process one item at a time

await cursor
  .stream()
  .map(transform)
  .forEach(process);

Bad: Load all into memory first

const all = await cursor.stream().collect();
all.forEach(process);

Transformation Order

Optimized: Filter early, expensive operations last

await stream
  .filter(item => item.isValid)  // Filter first
  .map(expensiveTransform)       // Transform only valid items
  .collect();

Unoptimized: Expensive operations on all items

await stream
  .map(expensiveTransform)       // Transform everything
  .filter(item => item.isValid)  // Then filter
  .collect();

Common Pitfalls

Pitfall 1: Collecting Unnecessarily

// ❌ Bad: Collect then iterate
const items = await stream.collect();
items.forEach(process);
// ✅ Good: Stream through
await stream.forEach(process);

Pitfall 2: Side Effects in map()

// ❌ Bad: Side effects in map
await stream
  .map(item => {
    database.save(item);  // Side effect!
    return item;
  })
  .collect();
// ✅ Good: Use forEach for side effects
await stream
  .forEach(item => database.save(item));

Pitfall 3: Not Handling Async Errors

// ❌ Bad: No error handling
await stream
  .map(async item => await fetch(item.url))
  .collect();
// ✅ Good: Handle errors
await stream
  .map(async item => {
    try {
      return await fetch(item.url);
    } catch (error) {
      console.error(`Failed to fetch ${item.url}:`, error);
      return null;
    }
  })
  .filter(result => result !== null)
  .collect();

Testing Best Practices

describe('Stream', () => {
  it('transforms items correctly', async () => {
    async function* source() {
      yield* [1, 2, 3, 4, 5];
    }
    
    const result = await new BaseStream(source())
      .map(x => x * 2)
      .filter(x => x > 5)
      .collect();
    
    expect(result).toEqual([6, 8, 10]);
  });
  
  it('handles empty streams', async () => {
    async function* source() {
      // Empty
    }
    
    const result = await new BaseStream(source())
      .map(x => x * 2)
      .collect();
    
    expect(result).toEqual([]);
  });
});

Next steps: See Usage Patterns for practical examples.