Cursors and fp-ts Integration
Purpose: How cursors work with fp-ts in database operations
Last Updated: 2024-11-26
Overview
This document explains how cursors integrate with fp-ts for type-safe database operations. While Streams explains the relationship between streams and fp-ts, this document focuses on how cursors are created and returned from fp-ts database operations.
The Complete Data Flow
fp-ts (Database Layer)
↓
Creates Cursor
↓
Returns Cursor in TaskEither
↓
Application extracts Cursor
↓
Cursor → Stream (Transformation)
↓
Results
Creating Cursors with fp-ts
Pattern: TaskEither Returns Cursor
Database operations use fp-ts TaskEither for error handling and return cursors for pagination:
import * as TE from 'fp-ts/TaskEither';
import { QueryCursor } from '@/lib/db/api/QueryCursor';
import { withClient } from '@/lib/db/connection';
// fp-ts returns cursor wrapped in TaskEither
const getUserCursor = (): TE.TaskEither<DbError, QueryCursor<User>> =>
withClient(
{},
(client) => TE.tryCatch(
() => new QueryCursor(client, 'SELECT * FROM users WHERE active = true'),
(error) => queryError('Failed to create cursor', undefined, error)
)
);
Why This Pattern?
Type-safe error handling: Cursor creation might fail (connection issues, invalid query), so we wrap it in TaskEither.
Resource management: fp-ts manages the database connection using the bracket pattern.
Composability: Can compose cursor creation with other database operations.
Usage Patterns
Pattern 1: Extract and Use Cursor
// Get cursor from fp-ts operation
const result = await getUserCursor()();
if (E.isLeft(result)) {
// Handle typed error
console.error('Failed to create cursor:', result.left);
return;
}
// Extract cursor and use it
const cursor = result.right;
// Now iterate with cursor's AsyncIterable
for await (const user of cursor) {
console.log(user);
}
Pattern 2: Chain with Stream Transformation
// Combine fp-ts cursor creation with stream transformation
const processUsers = await pipe(
getUserCursor(), // TaskEither<DbError, Cursor>
TE.map(cursor => // Extract cursor on success
cursor
.stream() // Convert to stream
.filter(user => user.age > 18)
.map(user => user.email)
.collect() // Returns Promise
),
TE.chain(TE.fromTask) // Unwrap the Promise into TaskEither
)();
if (E.isRight(result)) {
const emails = result.right;
console.log('Emails:', emails);
}
Pattern 3: Cursor as Part of Larger Operation
interface UserWithPosts {
user: User;
posts: Post[];
}
const getUserWithPostsCursor = (
userId: number
): TE.TaskEither<DbError, QueryCursor<UserWithPosts>> =>
pipe(
// First, verify user exists
withClient(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM users WHERE id = $1', [userId]),
(error) => queryError('Failed to get user', undefined, error)
)
),
// Then create cursor for user's posts
TE.chain((userResult) =>
withClient(
{},
(client) => TE.tryCatch(
() => new QueryCursor(
client,
`SELECT u.*, p.* FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.id = $1`,
[userId]
),
(error) => queryError('Failed to create cursor', undefined, error)
)
)
)
);
Error Handling
Cursor Creation Errors
Cursor creation can fail for several reasons:
type CursorCreationError =
| ConnectionError // Can't connect to database
| QueryError // Invalid SQL query
| PermissionError // Insufficient permissions
| PoolAcquisitionError;// Can't acquire connection from pool
// All these are captured in TaskEither
const result = await getUserCursor()();
if (E.isLeft(result)) {
switch (result.left._tag) {
case 'ConnectionError':
console.error('Connection failed');
break;
case 'QueryError':
console.error('Invalid query');
break;
// ... handle other cases
}
}
Cursor Iteration Errors
Once cursor is created, iteration errors are handled by cursor itself:
const cursor = result.right;
try {
for await (const item of cursor) {
await processItem(item);
}
} catch (error) {
// Handle iteration errors
console.error('Error during iteration:', error);
}
Best Practices
✅ Do
Use fp-ts for cursor creation
// Good: Type-safe cursor creation
const cursor = await pipe(
createCursor(),
TE.map(cursor => cursor)
)();
Handle errors explicitly
// Good: Check for errors before using cursor
if (E.isLeft(result)) {
handleError(result.left);
return;
}
const cursor = result.right;
Combine with streams for transformation
// Good: fp-ts for creation, streams for transformation
const result = await getUserCursor()();
if (E.isRight(result)) {
await result.right
.stream()
.map(transform)
.collect();
}
❌ Don't
Don't ignore fp-ts wrapper
// Bad: Assumes success without checking
const cursor = (await getUserCursor()()).right; // Might be undefined!
Don't mix error handling styles
// Bad: Using try/catch for fp-ts operations
try {
const result = await getUserCursor()();
// fp-ts already captured errors in Either
} catch (error) {
// This won't catch fp-ts errors
}
Comparison with Streams
| Aspect | Cursors + fp-ts | Streams |
|---|---|---|
| Purpose | Create cursors with error handling | Transform cursor data |
| Pattern | TaskEither wrapping | Async generators |
| When | Database cursor creation | Data transformation |
| Errors | Tagged union in Either | Standard try/catch |
| Layer | Database access | Data processing |
Source Code References
- Abstract cursor:
src/lib/cursor/abstract-cursor.ts - Database cursor:
src/lib/db/api/QueryCursor.ts - fp-ts database layer:
src/lib/db/connection.ts - Error types:
src/lib/db/errors.ts
Related Documentation
- Streams vs fp-ts - How streams and fp-ts work together
- Cursors Architecture - Abstract cursor pattern
- Database fp-ts Migration - fp-ts in database layer
- Database Cursor Patterns - PostgreSQL implementation
Key takeaway: fp-ts creates and returns cursors with type-safe error handling. Once you have the cursor, you can use it with streams for transformation. The pipeline: fp-ts (create cursor) → cursor (paginate) → stream (transform).