Database Layer fp-ts Migration
Overview
The database layer has been completely rewritten to use fp-ts for functional error handling and resource management, while maintaining backward compatibility with the existing API.
What Changed
Removed Dependencies
- src/lib/common/resource: The entire custom resource abstraction has been removed
- The acquire/release pattern is now handled by fp-ts TaskEither and ReaderTaskEither
- No more custom
Managed,AcquireFunction,run()utilities
New Architecture
1. Error Handling (src/lib/db/errors.ts)
- Tagged union types for all database errors
- Discriminated by
_tagproperty for exhaustive pattern matching - Error constructors for each error type
- Enables composable error handling with fp-ts
type DbError =
| PoolAcquisitionError
| ConnectionError
| QueryError
| TransactionError
| AuthenticationError
| ReleaseError;
2. Pool Management (src/lib/db/pool.ts)
- Uses
TaskEither<DbError, Pool>for all pool operations - Functional bracket pattern with automatic cleanup
- ReaderTaskEither for dependency injection (coming in connection layer)
Key Functions:
// Get pools
getDefaultPool(): TaskEither<DbError, Pool>
getMgmtPool(): TaskEither<DbError, Pool>
getPoolByKind(kind: PoolKind): TaskEither<DbError, Pool>
// Create custom pools
createPool(config: PoolConfig): TaskEither<DbError, Pool>
getPool(key: string, config: PoolConfig): TaskEither<DbError, Pool>
// Bracket pattern for automatic cleanup
withPool<A>(
poolTE: TaskEither<DbError, Pool>,
use: (pool: Pool) => TaskEither<DbError, A>
): TaskEither<DbError, A>
// Convenience wrappers
withDefaultPool<A>(use: (pool: Pool) => TaskEither<DbError, A>): TaskEither<DbError, A>
withMgmtPool<A>(use: (pool: Pool) => TaskEither<DbError, A>): TaskEither<DbError, A>
3. Connection Management (src/lib/db/connection.ts)
- Uses
ReaderTaskEither<Pool, DbError, PoolClient>for elegant dependency injection - Automatic resource cleanup with bracket pattern
- Connection context using AsyncLocalStorage for reuse
Key Concepts:
// Core connection function using ReaderTaskEither
withConnection<A>(
options: ConnectionOptions,
use: (client: PoolClient) => TaskEither<DbError, A>
): ReaderTaskEither<Pool, DbError, A>
// Convenience functions that resolve the pool
withClient<A>(
options: ConnectionOptions,
use: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>
withMgmtClient<A>(
options: ConnectionOptions,
use: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>
// Composable operations
liftDbOperation<A>(
operation: (client: PoolClient) => TaskEither<DbError, A>
): ReaderTaskEither<Pool, DbError, A>
runDbOperation<A>(
operation: (client: PoolClient) => TaskEither<DbError, A>
): TaskEither<DbError, A>
ConnectionOptions:
interface ConnectionOptions {
reuseExisting?: boolean; // Reuse from AsyncLocalStorage
setup?: (client: PoolClient) => TaskEither<DbError, void>;
teardown?: (client: PoolClient, error?: DbError) => TaskEither<DbError, void>;
}
4. Client Wrapper (src/lib/db/client.ts)
- Maintains 100% backward compatibility
- Unwraps TaskEither to throw errors (for existing code)
- Same API:
withClient,asUser,asAdmin,asWebhook, etc.
Backward Compatible API:
// These work exactly as before
await withClient(async (client) => {
return await client.query('SELECT 1');
});
await asUser(async (client) => {
// Authenticated session with automatic transaction
});
await asAdmin(async (client) => {
// Execute with admin role
});
5. Ephemeral Databases (src/lib/db/ephemeral.ts)
- Rewritten using TaskEither
- Broken into small, well-typed helper functions
- Easier to debug and understand
Helper Functions:
createDatabase(mgmtClient, dbName): TaskEither<DbError, void>
createEphemeralPool(mgmtClient, dbName): TaskEither<DbError, Pool>
releaseClientSafe(client): TaskEither<DbError, void>
runAndValidateMigrations(client, dbName, pool): TaskEither<DbError, EphemeralDatabase>
connectAndMigrate(pool, dbName): TaskEither<DbError, EphemeralDatabase>
terminateConnectionsTE(mgmtClient, dbName): TaskEither<DbError, void>
dropDatabase(mgmtClient, dbName): TaskEither<DbError, void>
executeWithCleanup(mgmtClient, db, callback): TaskEither<DbError, T>
executeWithErrorCleanup(mgmtClient, db, error): TaskEither<DbError, T>
Benefits
1. Type Safety
- Explicit error types at compile time
- No more untyped thrown errors
- Exhaustive pattern matching on errors
2. Composability
- Chain operations with
pipeandchain - ReaderTaskEither for dependency injection
- Easy to compose database operations
3. Testability
- Pure functions are easier to test
- Mock TaskEither for testing
- Clear separation of concerns
4. Resource Safety
- Bracket pattern ensures cleanup
- No resource leaks
- Automatic error recovery
5. Backward Compatibility
- Existing code continues to work
- Gradual migration possible
- No breaking changes
Usage Examples
Basic Query (fp-ts style)
import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/connection';
const getUsers = (): TE.TaskEither<DbError, User[]> =>
withClient(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM users'),
(error) => queryError('Failed to fetch users', undefined, error)
)
);
// Execute
const result = await getUsers()();
if (E.isLeft(result)) {
console.error('Error:', result.left);
} else {
console.log('Users:', result.right);
}
Transaction (fp-ts style)
const updateUser = (id: number, name: string): TE.TaskEither<DbError, void> =>
withClient(
{
setup: (client) => TE.tryCatch(
() => client.query('BEGIN'),
(error) => transactionError('Failed to begin', error)
),
teardown: (client, error) => TE.tryCatch(
() => client.query(error ? 'ROLLBACK' : 'COMMIT'),
(err) => transactionError('Failed to commit/rollback', err)
),
},
(client) => TE.tryCatch(
() => client.query('UPDATE users SET name = $1 WHERE id = $2', [name, id]),
(error) => queryError('Failed to update user', undefined, error)
)
);
Composing Operations
import * as RTE from 'fp-ts/ReaderTaskEither';
// Define composable operations
const getUser = (id: number): RTE.ReaderTaskEither<Pool, DbError, User> =>
withConnection(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM users WHERE id = $1', [id]),
(error) => queryError('Failed to get user', undefined, error)
)
);
const getPosts = (userId: number): RTE.ReaderTaskEither<Pool, DbError, Post[]> =>
withConnection(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM posts WHERE user_id = $1', [userId]),
(error) => queryError('Failed to get posts', undefined, error)
)
);
// Compose them
const getUserWithPosts = (userId: number): RTE.ReaderTaskEither<Pool, DbError, UserWithPosts> =>
pipe(
getUser(userId),
RTE.chain(user =>
pipe(
getPosts(user.id),
RTE.map(posts => ({ user, posts }))
)
)
);
// Execute with default pool
const result = await pipe(
getDefaultPool(),
TE.chain(pool => getUserWithPosts(123)(pool))
)();
Backward Compatible (existing code)
// This still works!
const users = await withClient(async (client) => {
return await client.query('SELECT * FROM users');
});
// This still works!
await asUser(async (client) => {
await client.query('INSERT INTO posts ...');
});
Migration Path
For New Code
Use fp-ts style with TaskEither and ReaderTaskEither:
import * as TE from 'fp-ts/TaskEither';
import { pipe } from 'fp-ts/function';
import { withClient } from '@/lib/db/connection';
For Existing Code
No changes needed! The backward-compatible API works as before:
import { withClient, asUser } from '@/lib/db/client';
Gradual Migration
You can mix both styles:
// Old style
const users = await asUser(async (client) => {
return await client.query('SELECT * FROM users');
});
// New style
const posts = await pipe(
withClient(
{},
(client) => TE.tryCatch(
() => client.query('SELECT * FROM posts'),
(error) => queryError('Failed', undefined, error)
)
)
)();
Testing
Testing TaskEither Operations
import * as TE from 'fp-ts/TaskEither';
import * as E from 'fp-ts/Either';
describe('getUser', () => {
it('should return user', async () => {
const mockClient = {
query: vi.fn().mockResolvedValue({ rows: [{ id: 1, name: 'John' }] })
};
const result = await getUser(1)(mockPool)();
expect(E.isRight(result)).toBe(true);
if (E.isRight(result)) {
expect(result.right).toEqual({ id: 1, name: 'John' });
}
});
it('should handle errors', async () => {
const mockClient = {
query: vi.fn().mockRejectedValue(new Error('Connection failed'))
};
const result = await getUser(1)(mockPool)();
expect(E.isLeft(result)).toBe(true);
if (E.isLeft(result)) {
expect(result.left._tag).toBe('QueryError');
}
});
});
Key Takeaways
- The old API still works - No breaking changes
- Use fp-ts for new code - Better type safety and composability
- ReaderTaskEither is powerful - Elegant dependency injection
- Error handling is explicit - No hidden exceptions
- Resources are managed - Automatic cleanup with bracket pattern
Files Changed
New Files
- ✅
src/lib/db/errors.ts- Tagged union error types with constructors - ✅
src/lib/db/fp.ts- fp-ts utilities (runTE,unwrapTE) - ✅
src/lib/db/context.ts- AsyncLocalStorage for connection reuse
Rewritten Files
- ✅
src/lib/db/pool.ts- Rewritten with TaskEither - ✅
src/lib/db/connection.ts- Rewritten with ReaderTaskEither - ✅
src/lib/db/client.ts- Backward-compatible wrapper usingrunTE - ✅
src/lib/db/ephemeral.ts- Rewritten with TaskEither +runTE - ✅
src/lib/db/types.ts- Simplified types - ✅
src/lib/db/index.ts- Comprehensive exports
Removed Files
- ❌
src/lib/db/utils.ts- Empty file, deleted
Unchanged Files
- ⚠️
src/lib/db/error.ts- Legacy error handler (marked deprecated) - ⚠️
src/lib/db/models.ts- Re-exports for backward compatibility
Can Be Removed (Outside db layer)
- ❌
src/lib/common/resource/*- No longer used by db layer
Next Steps
- Test the new implementation thoroughly
- Update any direct imports of the old types
- Consider migrating other parts of the codebase to use fp-ts
- Remove
src/lib/common/resourceif no other code depends on it