Savvi Studio

Cursors and fp-ts Integration

Purpose: How cursors work with fp-ts in database operations
Last Updated: 2024-11-26

Overview

This document explains how cursors integrate with fp-ts for type-safe database operations. While Streams explains the relationship between streams and fp-ts, this document focuses on how cursors are created and returned from fp-ts database operations.

The Complete Data Flow

fp-ts (Database Layer)
↓
Creates Cursor
↓
Returns Cursor in TaskEither
↓
Application extracts Cursor
↓
Cursor → Stream (Transformation)
↓
Results

Creating Cursors with fp-ts

Pattern: TaskEither Returns Cursor

Database operations use fp-ts TaskEither for error handling and return cursors for pagination:

import * as TE from 'fp-ts/TaskEither';
import { QueryCursor } from '@/lib/db/api/QueryCursor';
import { withClient } from '@/lib/db/connection';

// fp-ts returns cursor wrapped in TaskEither
const getUserCursor = (): TE.TaskEither<DbError, QueryCursor<User>> =>
  withClient(
    {},
    (client) => TE.tryCatch(
      () => new QueryCursor(client, 'SELECT * FROM users WHERE active = true'),
      (error) => queryError('Failed to create cursor', undefined, error)
    )
  );

Why This Pattern?

Type-safe error handling: Cursor creation might fail (connection issues, invalid query), so we wrap it in TaskEither.

Resource management: fp-ts manages the database connection using the bracket pattern.

Composability: Can compose cursor creation with other database operations.

Usage Patterns

Pattern 1: Extract and Use Cursor

// Get cursor from fp-ts operation
const result = await getUserCursor()();

if (E.isLeft(result)) {
  // Handle typed error
  console.error('Failed to create cursor:', result.left);
  return;
}

// Extract cursor and use it
const cursor = result.right;

// Now iterate with cursor's AsyncIterable
for await (const user of cursor) {
  console.log(user);
}

Pattern 2: Chain with Stream Transformation

// Combine fp-ts cursor creation with stream transformation
const processUsers = await pipe(
  getUserCursor(),  // TaskEither<DbError, Cursor>
  TE.map(cursor =>   // Extract cursor on success
    cursor
      .stream()       // Convert to stream
      .filter(user => user.age > 18)
      .map(user => user.email)
      .collect()      // Returns Promise
  ),
  TE.chain(TE.fromTask)  // Unwrap the Promise into TaskEither
)();

if (E.isRight(result)) {
  const emails = result.right;
  console.log('Emails:', emails);
}

Pattern 3: Cursor as Part of Larger Operation

interface UserWithPosts {
  user: User;
  posts: Post[];
}

const getUserWithPostsCursor = (
  userId: number
): TE.TaskEither<DbError, QueryCursor<UserWithPosts>> =>
  pipe(
    // First, verify user exists
    withClient(
      {},
      (client) => TE.tryCatch(
        () => client.query('SELECT * FROM users WHERE id = $1', [userId]),
        (error) => queryError('Failed to get user', undefined, error)
      )
    ),
    // Then create cursor for user's posts
    TE.chain((userResult) =>
      withClient(
        {},
        (client) => TE.tryCatch(
          () => new QueryCursor(
            client,
            `SELECT u.*, p.* FROM users u 
             LEFT JOIN posts p ON u.id = p.user_id 
             WHERE u.id = $1`,
            [userId]
          ),
          (error) => queryError('Failed to create cursor', undefined, error)
        )
      )
    )
  );

Error Handling

Cursor Creation Errors

Cursor creation can fail for several reasons:

type CursorCreationError =
  | ConnectionError      // Can't connect to database
  | QueryError           // Invalid SQL query
  | PermissionError      // Insufficient permissions
  | PoolAcquisitionError;// Can't acquire connection from pool

// All these are captured in TaskEither
const result = await getUserCursor()();

if (E.isLeft(result)) {
  switch (result.left._tag) {
    case 'ConnectionError':
      console.error('Connection failed');
      break;
    case 'QueryError':
      console.error('Invalid query');
      break;
    // ... handle other cases
  }
}

Cursor Iteration Errors

Once cursor is created, iteration errors are handled by cursor itself:

const cursor = result.right;

try {
  for await (const item of cursor) {
    await processItem(item);
  }
} catch (error) {
  // Handle iteration errors
  console.error('Error during iteration:', error);
}

Best Practices

✅ Do

Use fp-ts for cursor creation

// Good: Type-safe cursor creation
const cursor = await pipe(
  createCursor(),
  TE.map(cursor => cursor)
)();

Handle errors explicitly

// Good: Check for errors before using cursor
if (E.isLeft(result)) {
  handleError(result.left);
  return;
}
const cursor = result.right;

Combine with streams for transformation

// Good: fp-ts for creation, streams for transformation
const result = await getUserCursor()();
if (E.isRight(result)) {
  await result.right
    .stream()
    .map(transform)
    .collect();
}

❌ Don't

Don't ignore fp-ts wrapper

// Bad: Assumes success without checking
const cursor = (await getUserCursor()()).right;  // Might be undefined!

Don't mix error handling styles

// Bad: Using try/catch for fp-ts operations
try {
  const result = await getUserCursor()();
  // fp-ts already captured errors in Either
} catch (error) {
  // This won't catch fp-ts errors
}

Comparison with Streams

Aspect Cursors + fp-ts Streams
Purpose Create cursors with error handling Transform cursor data
Pattern TaskEither wrapping Async generators
When Database cursor creation Data transformation
Errors Tagged union in Either Standard try/catch
Layer Database access Data processing

Source Code References


Key takeaway: fp-ts creates and returns cursors with type-safe error handling. Once you have the cursor, you can use it with streams for transformation. The pipeline: fp-ts (create cursor) → cursor (paginate) → stream (transform).