Stream Best Practices
Purpose: Guidelines for working with streams
Last Updated: 2024-11-26
Overview
This document provides guidelines and anti-patterns for working with streams.
Source Code Reference
For stream implementation details, see:
src/lib/stream/base-stream.ts- Base stream classsrc/lib/stream/transform.ts- Transform compositionsrc/lib/stream/collectors.ts- Collection strategiessrc/lib/stream/types.ts- Stream interfaces and types
Design Guidelines
✅ Do
Use streams for transformations
- Keep transformations in streams
- Compose operations functionally
- Leverage lazy evaluation
Keep transformations pure
- Avoid side effects in map/filter
- Use forEach for side effects
- Make transformations predictable
Use appropriate collectors
- Use reduce for aggregation
- Use forEach for side effects
- Use collect only when needed
Handle errors properly
- Wrap async operations in try/catch
- Propagate errors appropriately
- Provide meaningful error messages
❌ Don't
Mix pagination into streams
- Keep pagination in cursors
- Use cursors for position tracking
- Don't manage cursor state in streams
Create unnecessary intermediate arrays
- Use lazy evaluation
- Stream through data
- Only collect when needed
Hold large state in transformations
- Keep transformations stateless when possible
- Use external state carefully
- Consider memory implications
Skip error handling
- Always handle async errors
- Don't ignore failure cases
- Provide fallback behavior
Performance Considerations
Memory Efficiency
Good: Process one item at a time
await cursor
.stream()
.map(transform)
.forEach(process);
Bad: Load all into memory first
const all = await cursor.stream().collect();
all.forEach(process);
Transformation Order
Optimized: Filter early, expensive operations last
await stream
.filter(item => item.isValid) // Filter first
.map(expensiveTransform) // Transform only valid items
.collect();
Unoptimized: Expensive operations on all items
await stream
.map(expensiveTransform) // Transform everything
.filter(item => item.isValid) // Then filter
.collect();
Common Pitfalls
Pitfall 1: Collecting Unnecessarily
// ❌ Bad: Collect then iterate
const items = await stream.collect();
items.forEach(process);
// ✅ Good: Stream through
await stream.forEach(process);
Pitfall 2: Side Effects in map()
// ❌ Bad: Side effects in map
await stream
.map(item => {
database.save(item); // Side effect!
return item;
})
.collect();
// ✅ Good: Use forEach for side effects
await stream
.forEach(item => database.save(item));
Pitfall 3: Not Handling Async Errors
// ❌ Bad: No error handling
await stream
.map(async item => await fetch(item.url))
.collect();
// ✅ Good: Handle errors
await stream
.map(async item => {
try {
return await fetch(item.url);
} catch (error) {
console.error(`Failed to fetch ${item.url}:`, error);
return null;
}
})
.filter(result => result !== null)
.collect();
Testing Best Practices
describe('Stream', () => {
it('transforms items correctly', async () => {
async function* source() {
yield* [1, 2, 3, 4, 5];
}
const result = await new BaseStream(source())
.map(x => x * 2)
.filter(x => x > 5)
.collect();
expect(result).toEqual([6, 8, 10]);
});
it('handles empty streams', async () => {
async function* source() {
// Empty
}
const result = await new BaseStream(source())
.map(x => x * 2)
.collect();
expect(result).toEqual([]);
});
});
Related Documentation
- Usage Patterns - Common usage examples
- Transformations - Available operations
- Streams Architecture - Overview and concepts
- Source Code - Implementation details
Next steps: See Usage Patterns for practical examples.