Streams vs fp-ts
Purpose: Clarifying the relationship between streams and fp-ts
Last Updated: 2024-11-26
Overview
This project uses both streams (src/lib/stream/) and fp-ts. They serve different, complementary purposes and can be used together.
Different Concerns
Streams: Data Transformation
- Purpose: Transform and collect paginated data
- Pattern: Async generators with lazy evaluation
- Location:
src/lib/stream/ - Use case: Processing cursor results
// Stream example
const results = await cursor
.stream()
.map(item => transform(item))
.filter(item => item.isValid)
.collect();
fp-ts: Error Handling & Effects
- Purpose: Type-safe error handling and effect management
- Pattern: TaskEither, ReaderTaskEither for functional effects
- Location: Used in
src/lib/db/for database operations - Use case: Database operations with explicit error handling
// fp-ts example
const result = await withClient(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM users'),
(error) => queryError('Failed', undefined, error)
)
)();
Complementary Usage
These patterns work together naturally:
// fp-ts for database query (error handling)
const getUserCursor = (): TE.TaskEither<DbError, Cursor<User>> =>
withClient(
{},
(client) => TE.tryCatch(
() => new QueryCursor(client, 'SELECT * FROM users'),
(error) => queryError('Failed to create cursor', undefined, error)
)
);
// Stream for data transformation (lazy evaluation)
const processUsers = async () => {
const cursorResult = await getUserCursor()();
if (E.isLeft(cursorResult)) {
console.error('Database error:', cursorResult.left);
return;
}
const cursor = cursorResult.right;
// Now use stream for transformation
await cursor
.stream()
.filter(user => user.isActive)
.map(user => ({ id: user.id, name: user.name }))
.forEach(async (user) => {
await notifyUser(user);
});
};
When to Use What
Use Streams When
- ✅ Processing paginated cursor results
- ✅ Transforming data (map, filter, etc.)
- ✅ Need lazy evaluation
- ✅ Working with AsyncIterable data
- ✅ Want composable transformations
Use fp-ts When
- ✅ Performing database operations
- ✅ Need explicit error handling
- ✅ Managing resources (connections, pools)
- ✅ Composing effects
- ✅ Want type-safe error types
Use Both Together
- ✅ Database query returns cursor → fp-ts
- ✅ Transform cursor results → streams
- ✅ Save transformed results → fp-ts
Architecture Layers
┌─────────────────────────────────────────┐
│ Application Layer │
│ - Business logic │
│ - Combines both patterns │
└─────────────────────────────────────────┘
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ Database Layer │ │ Stream Layer │
│ (fp-ts) │ │ (generators) │
│ │ │ │
│ - TaskEither │ │ - map/filter │
│ - Error handling │ │ - Lazy eval │
│ - Resources │ │ - Collect │
└──────────────────┘ └──────────────────┘
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ PostgreSQL │ │ Cursors │
│ - Queries │ │ - Pagination │
└──────────────────┘ └──────────────────┘
Design Rationale
Why Not Use fp-ts Streams?
fp-ts has its own stream-like abstractions, but we chose async generators because:
- Native JavaScript: Async generators are built into JavaScript
- Simpler: No additional fp-ts learning curve for streams
- AsyncIterable: Works with standard JavaScript iteration
- Focused: Optimized for our cursor pagination use case
- Composable: Still composes well with fp-ts effects
Why Not Use Generators for Error Handling?
We use fp-ts TaskEither for database operations because:
- Explicit errors: Tagged union types at compile time
- Composability: Chain operations with proper error propagation
- Resource safety: Bracket pattern for cleanup
- Type safety: Compiler catches error handling issues
- Industry standard: Well-established functional pattern
Migration Notes
When migrating database code:
Before (streams only):
// Old: Streams handled everything
const users = await cursor
.stream()
.map(user => user)
.collect();
// Errors are thrown, not typed
After (fp-ts + streams):
// New: fp-ts for query, streams for transformation
const result = await pipe(
getUserCursor(), // fp-ts: type-safe query
TE.map(cursor =>
cursor
.stream() // streams: transformation
.filter(user => user.isActive)
.collect()
)
)();
if (E.isLeft(result)) {
// Handle typed error
handleDbError(result.left);
} else {
// Use results
const users = await result.right;
}
Related Documentation
- Streams Architecture - Stream patterns
- Database fp-ts Migration - fp-ts usage
- Cursors Architecture - Cursor patterns
- Database Patterns - Combining both
Summary
| Aspect | Streams | fp-ts |
|---|---|---|
| Purpose | Data transformation | Error handling & effects |
| Pattern | Async generators | TaskEither, ReaderTaskEither |
| Location | src/lib/stream/ |
src/lib/db/ (database layer) |
| When | Processing results | Database operations |
| Composition | Chainable transformations | Composable effects |
| Errors | Standard try/catch | Tagged unions |
Key insight: They're complementary! Use fp-ts for database operations with type-safe errors, then use streams to transform the cursor results with lazy evaluation.
Next steps: See Database fp-ts Migration for using fp-ts with database operations, or Usage Patterns for stream examples.