Savvi Studio

Database Libraries

Purpose: Understand the fp-ts and promises database libraries
Last Updated: 2024-11-26

Overview

The database layer has two complementary libraries:

  • FP library (src/lib/db/fp/) - Functional foundation with fp-ts
  • Promises library (src/lib/db/promises/) - Convenience API wrapping FP

Architecture: Promises library builds on top of FP library for resource safety.

Why Two Libraries?

FP Library Rationale

Problem: Need reliable resource cleanup, pool management, and connection handling

Solution: Use fp-ts for:

  • Explicit error handling with TaskEither
  • Automatic resource cleanup with bracket pattern
  • Composable database operations
  • Type-safe error types

Promises Library Purpose

Problem: fp-ts has learning curve, existing code uses promises

Solution: Convenience wrapper that:

  • Maintains 100% backward compatibility
  • Unwraps TaskEither to throw errors
  • Same API as before migration
  • Gradual migration path

Architecture Layers

┌────────────────────────────────────┐
│ Promises Library (Convenience)     │
│ src/lib/db/promises/               │
│ - withClient                       │
│ - asUser, asAdmin, asWebhook       │
│ - Throws on errors                 │
└──────────────┬─────────────────────┘
               │ calls
┌──────────────▼─────────────────────┐
│ FP Library (Foundation)            │
│ src/lib/db/fp/                     │
│ - TaskEither, ReaderTaskEither     │
│ - Bracket pattern                  │
│ - Tagged union errors              │
└──────────────┬─────────────────────┘
               │ uses
┌──────────────▼─────────────────────┐
│ PostgreSQL Connection Pool         │
│ - pg.Pool                          │
│ - Connection management            │
└────────────────────────────────────┘

FP Library Details

Core Modules

src/lib/db/fp/errors.ts - Tagged union error types

type DbError =
  | PoolAcquisitionError
  | ConnectionError
  | QueryError
  | TransactionError
  | AuthenticationError
  | ReleaseError;

// Each error has _tag for discrimination
interface PoolAcquisitionError {
  _tag: 'PoolAcquisitionError';
  message: string;
  cause?: unknown;
}

src/lib/db/fp/pool.ts - Pool management with TaskEither

// Get pools
getDefaultPool(): TaskEither<DbError, Pool>
getMgmtPool(): TaskEither<DbError, Pool>
getPool(key: string, config: PoolConfig): TaskEither<DbError, Pool>

// Bracket pattern for cleanup
withPool<A>(
  poolTE: TaskEither<DbError, Pool>,
  use: (pool: Pool) => TaskEither<DbError, A>
): TaskEither<DbError, A>

src/lib/db/fp/connection.ts - Connection with ReaderTaskEither

// Core connection function
withConnection<A>(
  options: ConnectionOptions,
  use: (client: PoolClient) => TaskEither<DbError, A>
): ReaderTaskEither<Pool, DbError, A>

// Convenience wrappers
withClient<A>(
  options: ConnectionOptions,
  use: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>

src/lib/db/fp/context.ts - AsyncLocalStorage for connection reuse

// Automatic connection reuse
const connectionContext = new AsyncLocalStorage<PoolClient>();

// Get active connection
getActiveConnection(): PoolClient | undefined

src/lib/db/fp/ephemeral.ts - Test database utilities

createEphemeralDatabase(
  mgmtClient: PoolClient,
  prefix: string
): TaskEither<DbError, EphemeralDatabase>

destroyEphemeralDatabase(
  mgmtClient: PoolClient,
  db: EphemeralDatabase
): TaskEither<DbError, void>

Error Handling Pattern

Tagged unions enable exhaustive checking:

import * as E from 'fp-ts/Either';

const result = await myOperation()();

if (E.isLeft(result)) {
  const error = result.left;
  
  // Exhaustive pattern matching
  switch (error._tag) {
    case 'PoolAcquisitionError':
      console.error('Failed to get connection:', error.message);
      break;
    case 'QueryError':
      console.error('Query failed:', error.query, error.message);
      break;
    case 'TransactionError':
      console.error('Transaction failed:', error.message);
      break;
    // TypeScript ensures all cases handled
  }
}

Resource Safety with Bracket

Bracket pattern ensures cleanup:

import { withPool } from '@/lib/db/fp/pool';
import { getDefaultPool } from '@/lib/db/fp/pool';

const operation = withPool(
  getDefaultPool(),
  (pool) => {
    // Use pool
    // Pool is automatically managed
    return TE.right(result);
  }
);

// Even if operation fails, resources are cleaned up
await operation();

Composable Operations

ReaderTaskEither enables composition:

import * as RTE from 'fp-ts/ReaderTaskEither';
import { pipe } from 'fp-ts/function';

// Define operations
const getUser = (id: number): RTE.ReaderTaskEither<Pool, DbError, User> =>
  withConnection({}, (client) => {
    // Get user from database
  });

const getPosts = (userId: number): RTE.ReaderTaskEither<Pool, DbError, Post[]> =>
  withConnection({}, (client) => {
    // Get posts from database
  });

// Compose them
const getUserWithPosts = (id: number): RTE.ReaderTaskEither<Pool, DbError, UserWithPosts> =>
  pipe(
    getUser(id),
    RTE.chain(user =>
      pipe(
        getPosts(user.id),
        RTE.map(posts => ({ user, posts }))
      )
    )
  );

// Execute with pool
const result = await pipe(
  getDefaultPool(),
  TE.chain(pool => getUserWithPosts(123)(pool))
)();

Promises Library Details

Core Module

src/lib/db/promises/client.ts - Backward-compatible API

// Unwraps TaskEither, throws on error
async function withClient<A>(
  use: (client: PoolClient) => Promise<A>
): Promise<A>

async function withClient<A>(
  options: ConnectionOptions,
  use: (client: PoolClient) => Promise<A>
): Promise<A>

Usage Patterns

Simple queries:

import { withClient } from '@/lib/db';

const result = await withClient(async (client) => {
  return await client.query('SELECT 1');
});

With authentication:

import { asUser } from '@/lib/db';

const data = await asUser(async (client) => {
  // Runs as authenticated user
  return await client.query('SELECT * FROM my_data');
});

With admin role:

import { asAdmin } from '@/lib/db';

await asAdmin(async (client) => {
  // Runs with admin privileges
  await client.query('ALTER TABLE ...');
});

With webhook role:

import { asWebhook } from '@/lib/db';

await asWebhook(async (client) => {
  // Runs as webhook user
  await client.query('INSERT INTO events ...');
});

Connection Options

interface ConnectionOptions {
  // Reuse connection from AsyncLocalStorage
  reuseExisting?: boolean;
  
  // Setup callback (e.g., BEGIN transaction)
  setup?: (client: PoolClient) => Promise<void>;
  
  // Teardown callback (e.g., COMMIT/ROLLBACK)
  teardown?: (client: PoolClient, error?: unknown) => Promise<void>;
}

Transaction example:

await withClient(
  {
    setup: async (client) => {
      await client.query('BEGIN');
    },
    teardown: async (client, error) => {
      if (error) {
        await client.query('ROLLBACK');
      } else {
        await client.query('COMMIT');
      }
    }
  },
  async (client) => {
    await client.query('INSERT INTO ...');
    await client.query('UPDATE ...');
  }
);

Connection Management

Pool Configuration

Default pool (application queries):

{
  max: 10,                    // Max connections
  idleTimeoutMillis: 10000,   // Close idle after 10s
  connectionTimeoutMillis: 1000,
  application_name: 'studio-default-pool'
}

Management pool (admin operations):

{
  max: 5,                     // Fewer connections
  idleTimeoutMillis: 10000,
  connectionTimeoutMillis: 1000,
  application_name: 'studio-mgmt-pool'
}

Connection Reuse

AsyncLocalStorage enables automatic reuse:

await withClient(async (client1) => {
  // Outer connection
  
  await withClient(async (client2) => {
    // Automatically reuses client1
    console.log(client1 === client2); // true
  });
});

Disable reuse when needed:

await withClient(async (client1) => {
  await withClient(
    { reuseExisting: false },
    async (client2) => {
      // Forces new connection
      console.log(client1 === client2); // false
    }
  );
});

When to Use Which Pool

Default pool: Most application code

import { withClient } from '@/lib/db';
await withClient(async (client) => { /* ... */ });

Management pool: Database admin operations

import { withMgmtClient } from '@/lib/db';
await withMgmtClient(async (client) => {
  await client.query('CREATE DATABASE ...');
});

Custom pool: Special requirements

import { getPool } from '@/lib/db/fp/pool';

const customPool = await getPool('analytics', {
  max: 20,
  application_name: 'analytics-pool'
})();

// Use custom pool

When to Use FP vs Promises

Use FP Library When:

✅ Building new features with functional patterns
✅ Need explicit error handling
✅ Composing complex database operations
✅ Want maximum type safety
✅ Working on infrastructure code

Example:

import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/fp/connection';

const operation = pipe(
  withClient({}, (client) => {
    // Operation with explicit error handling
    return TE.tryCatch(
      () => client.query('...'),
      (error) => queryError('Failed', undefined, error)
    );
  })
);

const result = await operation();
// result: Either<DbError, QueryResult>

Use Promises Library When:

✅ Working with existing code
✅ Simple one-off queries
✅ Rapid prototyping
✅ Team prefers promises
✅ Standard error handling (try/catch)

Example:

import { withClient } from '@/lib/db';

try {
  const result = await withClient(async (client) => {
    return await client.query('...');
  });
} catch (error) {
  console.error('Query failed:', error);
}

Migration Path

Gradual Migration Strategy

Phase 1: New code uses FP library

// New feature - use FP
import { withClient } from '@/lib/db/fp/connection';

Phase 2: Refactor as needed

// Refactor existing code when touching it
// Old: import { withClient } from '@/lib/db/client';
// New: import { withClient } from '@/lib/db/fp/connection';

Phase 3: Full migration (optional)

  • Only if team wants full fp-ts adoption
  • Can coexist indefinitely

Converting Existing Code

Before (promises):

import { withClient } from '@/lib/db';

const users = await withClient(async (client) => {
  return await client.query('SELECT * FROM users');
});

After (fp-ts):

import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/fp/connection';

const getUsers = pipe(
  withClient({}, (client) => 
    TE.tryCatch(
      () => client.query('SELECT * FROM users'),
      (error) => queryError('Failed to get users', undefined, error)
    )
  )
);

const result = await getUsers();
// Handle Either<DbError, QueryResult>

Best Practices

Connection Lifecycle

✅ Use withClient for automatic cleanup:

await withClient(async (client) => {
  // Client automatically released
});

❌ Don't manually manage connections:

// Don't do this
const pool = await getPool(...);
const client = await pool.connect();
try {
  // ...
} finally {
  client.release(); // Easy to forget
}

Error Handling

✅ Handle errors appropriately:

import * as E from 'fp-ts/Either';

const result = await operation()();

if (E.isLeft(result)) {
  // Log and handle
  logger.error('Operation failed', result.left);
  throw new ApplicationError('Database operation failed');
}

❌ Don't swallow errors:

// Don't do this
const result = await operation()();
if (E.isLeft(result)) {
  // Silent failure - bad!
}

Connection Reuse

✅ Reuse connections for related operations:

await withClient(async (client) => {
  // All operations share same connection
  const user = await getUser(client, id);
  const posts = await getPosts(client, user.id);
  const comments = await getComments(client, posts);
});

❌ Don't create multiple connections unnecessarily:

// Inefficient - 3 separate connections
const user = await withClient(async (client) => 
  await getUser(client, id));
const posts = await withClient(async (client) => 
  await getPosts(client, user.id));
const comments = await withClient(async (client) => 
  await getComments(client, posts));

Troubleshooting

Connection Pool Exhausted

Symptom: "Unable to acquire connection" errors

Cause: Too many concurrent operations or connections not released

Solution:

// Check for leaks
SELECT count(*) FROM pg_stat_activity 
WHERE application_name LIKE 'studio%';

// Ensure using withClient (auto cleanup)
// Increase pool size if needed

Memory Leaks

Symptom: Memory grows over time

Cause: Holding references to clients or pools

Solution:

  • Always use withClient or withPool
  • Don't store PoolClient references
  • Let bracket pattern handle cleanup

Type Errors with fp-ts

Symptom: Complex type errors

Solution:

  • Use type annotations explicitly
  • Break down complex pipes
  • Refer to fp-ts documentation
  • Consider using promises library for simpler cases

Quick Reference

Task FP Library Promises Library
Simple query withClient from fp/connection withClient from promises/client
Error handling Either<DbError, A> try/catch
Return value TaskEither<DbError, A> Promise<A>
Composition pipe, chain, map async/await
Learning curve Steeper Minimal

Next steps: See Testing to learn how to test database code, or Best Practices for usage patterns.