Savvi Studio

Database Layer fp-ts Migration

Overview

The database layer has been completely rewritten to use fp-ts for functional error handling and resource management, while maintaining backward compatibility with the existing API.

What Changed

Removed Dependencies

  • src/lib/common/resource: The entire custom resource abstraction has been removed
  • The acquire/release pattern is now handled by fp-ts TaskEither and ReaderTaskEither
  • No more custom Managed, AcquireFunction, run() utilities

New Architecture

1. Error Handling (src/lib/db/errors.ts)

  • Tagged union types for all database errors
  • Discriminated by _tag property for exhaustive pattern matching
  • Error constructors for each error type
  • Enables composable error handling with fp-ts
type DbError =
    | PoolAcquisitionError
    | ConnectionError
    | QueryError
    | TransactionError
    | AuthenticationError
    | ReleaseError;

2. Pool Management (src/lib/db/pool.ts)

  • Uses TaskEither<DbError, Pool> for all pool operations
  • Functional bracket pattern with automatic cleanup
  • ReaderTaskEither for dependency injection (coming in connection layer)

Key Functions:

// Get pools
getDefaultPool(): TaskEither<DbError, Pool>
getMgmtPool(): TaskEither<DbError, Pool>
getPoolByKind(kind: PoolKind): TaskEither<DbError, Pool>

// Create custom pools
createPool(config: PoolConfig): TaskEither<DbError, Pool>
getPool(key: string, config: PoolConfig): TaskEither<DbError, Pool>

// Bracket pattern for automatic cleanup
withPool<A>(
    poolTE: TaskEither<DbError, Pool>,
    use: (pool: Pool) => TaskEither<DbError, A>
): TaskEither<DbError, A>

// Convenience wrappers
withDefaultPool<A>(use: (pool: Pool) => TaskEither<DbError, A>): TaskEither<DbError, A>
withMgmtPool<A>(use: (pool: Pool) => TaskEither<DbError, A>): TaskEither<DbError, A>

3. Connection Management (src/lib/db/connection.ts)

  • Uses ReaderTaskEither<Pool, DbError, PoolClient> for elegant dependency injection
  • Automatic resource cleanup with bracket pattern
  • Connection context using AsyncLocalStorage for reuse

Key Concepts:

// Core connection function using ReaderTaskEither
withConnection<A>(
    options: ConnectionOptions,
    use: (client: PoolClient) => TaskEither<DbError, A>
): ReaderTaskEither<Pool, DbError, A>

// Convenience functions that resolve the pool
withClient<A>(
    options: ConnectionOptions,
    use: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>

withMgmtClient<A>(
    options: ConnectionOptions,
    use: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>

// Composable operations
liftDbOperation<A>(
    operation: (client: PoolClient) => TaskEither<DbError, A>
): ReaderTaskEither<Pool, DbError, A>

runDbOperation<A>(
    operation: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>

ConnectionOptions:

interface ConnectionOptions {
    reuseExisting?: boolean;  // Reuse from AsyncLocalStorage
    setup?: (client: PoolClient) => TaskEither<DbError, void>;
    teardown?: (client: PoolClient, error?: DbError) => TaskEither<DbError, void>;
}

4. Client Wrapper (src/lib/db/client.ts)

  • Maintains 100% backward compatibility
  • Unwraps TaskEither to throw errors (for existing code)
  • Same API: withClient, asUser, asAdmin, asWebhook, etc.

Backward Compatible API:

// These work exactly as before
await withClient(async (client) => {
    return await client.query('SELECT 1');
});

await asUser(async (client) => {
    // Authenticated session with automatic transaction
});

await asAdmin(async (client) => {
    // Execute with admin role
});

5. Ephemeral Databases (src/lib/db/ephemeral.ts)

  • Rewritten using TaskEither
  • Broken into small, well-typed helper functions
  • Easier to debug and understand

Helper Functions:

createDatabase(mgmtClient, dbName): TaskEither<DbError, void>
createEphemeralPool(mgmtClient, dbName): TaskEither<DbError, Pool>
releaseClientSafe(client): TaskEither<DbError, void>
runAndValidateMigrations(client, dbName, pool): TaskEither<DbError, EphemeralDatabase>
connectAndMigrate(pool, dbName): TaskEither<DbError, EphemeralDatabase>
terminateConnectionsTE(mgmtClient, dbName): TaskEither<DbError, void>
dropDatabase(mgmtClient, dbName): TaskEither<DbError, void>
executeWithCleanup(mgmtClient, db, callback): TaskEither<DbError, T>
executeWithErrorCleanup(mgmtClient, db, error): TaskEither<DbError, T>

Benefits

1. Type Safety

  • Explicit error types at compile time
  • No more untyped thrown errors
  • Exhaustive pattern matching on errors

2. Composability

  • Chain operations with pipe and chain
  • ReaderTaskEither for dependency injection
  • Easy to compose database operations

3. Testability

  • Pure functions are easier to test
  • Mock TaskEither for testing
  • Clear separation of concerns

4. Resource Safety

  • Bracket pattern ensures cleanup
  • No resource leaks
  • Automatic error recovery

5. Backward Compatibility

  • Existing code continues to work
  • Gradual migration possible
  • No breaking changes

Usage Examples

Basic Query (fp-ts style)

import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/connection';

const getUsers = (): TE.TaskEither<DbError, User[]> =>
    withClient(
        {},
        (client) => TE.tryCatch(
            () => client.query('SELECT * FROM users'),
            (error) => queryError('Failed to fetch users', undefined, error)
        )
    );

// Execute
const result = await getUsers()();
if (E.isLeft(result)) {
    console.error('Error:', result.left);
} else {
    console.log('Users:', result.right);
}

Transaction (fp-ts style)

const updateUser = (id: number, name: string): TE.TaskEither<DbError, void> =>
    withClient(
        {
            setup: (client) => TE.tryCatch(
                () => client.query('BEGIN'),
                (error) => transactionError('Failed to begin', error)
            ),
            teardown: (client, error) => TE.tryCatch(
                () => client.query(error ? 'ROLLBACK' : 'COMMIT'),
                (err) => transactionError('Failed to commit/rollback', err)
            ),
        },
        (client) => TE.tryCatch(
            () => client.query('UPDATE users SET name = $1 WHERE id = $2', [name, id]),
            (error) => queryError('Failed to update user', undefined, error)
        )
    );

Composing Operations

import * as RTE from 'fp-ts/ReaderTaskEither';

// Define composable operations
const getUser = (id: number): RTE.ReaderTaskEither<Pool, DbError, User> =>
    withConnection(
        {},
        (client) => TE.tryCatch(
            () => client.query('SELECT * FROM users WHERE id = $1', [id]),
            (error) => queryError('Failed to get user', undefined, error)
        )
    );

const getPosts = (userId: number): RTE.ReaderTaskEither<Pool, DbError, Post[]> =>
    withConnection(
        {},
        (client) => TE.tryCatch(
            () => client.query('SELECT * FROM posts WHERE user_id = $1', [userId]),
            (error) => queryError('Failed to get posts', undefined, error)
        )
    );

// Compose them
const getUserWithPosts = (userId: number): RTE.ReaderTaskEither<Pool, DbError, UserWithPosts> =>
    pipe(
        getUser(userId),
        RTE.chain(user =>
            pipe(
                getPosts(user.id),
                RTE.map(posts => ({ user, posts }))
            )
        )
    );

// Execute with default pool
const result = await pipe(
    getDefaultPool(),
    TE.chain(pool => getUserWithPosts(123)(pool))
)();

Backward Compatible (existing code)

// This still works!
const users = await withClient(async (client) => {
    return await client.query('SELECT * FROM users');
});

// This still works!
await asUser(async (client) => {
    await client.query('INSERT INTO posts ...');
});

Migration Path

For New Code

Use fp-ts style with TaskEither and ReaderTaskEither:

import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/connection';

For Existing Code

No changes needed! The backward-compatible API works as before:

import { withClient, asUser } from '@/lib/db/client';

Gradual Migration

You can mix both styles:

// Old style
const users = await asUser(async (client) => {
    return await client.query('SELECT * FROM users');
});

// New style
const posts = await pipe(
    withClient(
        {},
        (client) => TE.tryCatch(
            () => client.query('SELECT * FROM posts'),
            (error) => queryError('Failed', undefined, error)
        )
    )
)();

Testing

Testing TaskEither Operations

import * as TE from 'fp-ts/TaskEither';
import * as E from 'fp-ts/Either';

describe('getUser', () => {
    it('should return user', async () => {
        const mockClient = {
            query: vi.fn().mockResolvedValue({ rows: [{ id: 1, name: 'John' }] })
        };
        
        const result = await getUser(1)(mockPool)();
        
        expect(E.isRight(result)).toBe(true);
        if (E.isRight(result)) {
            expect(result.right).toEqual({ id: 1, name: 'John' });
        }
    });
    
    it('should handle errors', async () => {
        const mockClient = {
            query: vi.fn().mockRejectedValue(new Error('Connection failed'))
        };
        
        const result = await getUser(1)(mockPool)();
        
        expect(E.isLeft(result)).toBe(true);
        if (E.isLeft(result)) {
            expect(result.left._tag).toBe('QueryError');
        }
    });
});

Key Takeaways

  1. The old API still works - No breaking changes
  2. Use fp-ts for new code - Better type safety and composability
  3. ReaderTaskEither is powerful - Elegant dependency injection
  4. Error handling is explicit - No hidden exceptions
  5. Resources are managed - Automatic cleanup with bracket pattern

Files Changed

New Files

  • src/lib/db/errors.ts - Tagged union error types with constructors
  • src/lib/db/fp.ts - fp-ts utilities (runTE, unwrapTE)
  • src/lib/db/context.ts - AsyncLocalStorage for connection reuse

Rewritten Files

  • src/lib/db/pool.ts - Rewritten with TaskEither
  • src/lib/db/connection.ts - Rewritten with ReaderTaskEither
  • src/lib/db/client.ts - Backward-compatible wrapper using runTE
  • src/lib/db/ephemeral.ts - Rewritten with TaskEither + runTE
  • src/lib/db/types.ts - Simplified types
  • src/lib/db/index.ts - Comprehensive exports

Removed Files

  • src/lib/db/utils.ts - Empty file, deleted

Unchanged Files

  • ⚠️ src/lib/db/error.ts - Legacy error handler (marked deprecated)
  • ⚠️ src/lib/db/models.ts - Re-exports for backward compatibility

Can Be Removed (Outside db layer)

  • src/lib/common/resource/* - No longer used by db layer

Next Steps

  1. Test the new implementation thoroughly
  2. Update any direct imports of the old types
  3. Consider migrating other parts of the codebase to use fp-ts
  4. Remove src/lib/common/resource if no other code depends on it