Cursor, Stream, and Query Architecture
Executive Summary
This document proposes a clean separation of concerns between pagination (Cursor) and transformation (Stream), with a schema rename from cursors to queries for better clarity and extensibility.
Status: 🟢 Approved - Ready for Implementation
Created: 2025-11-16
Updated: 2025-11-16
Authors: Platform Engineering Team
Table of Contents
- Problem Statement
- Current State Analysis
- Proposed Architecture
- Schema Rename: cursors → queries
- Implementation Plan
Problem Statement
The Conflation
We currently mix two orthogonal concerns in our cursor implementation:
Concern 1: Pagination (Original SQL cursor system)
- Token-based resumption
- Incremental page loading
- Opaque encrypted tokens
- Core responsibility: "Give me the next chunk of data"
- Lives in: SQL (
cursorsschema) + TypeScript wrapper
Concern 2: Transformation (Added in recent refactor)
- map, filter, collect operations
- Java Streams-style collectors
- Lazy evaluation
- Core responsibility: "Transform data as it flows"
- Lives in: TypeScript only (application layer)
Why This Matters
- Conflates responsibilities: Cursor does too much
- Naming confusion: "Cursor" implies pagination, but also does stream processing
- API clarity: Users should explicitly convert to stream for transformations
Current State Analysis
SQL Layer (db/001_cursors.sql)
Pure pagination system - well-designed for its purpose:
-- Cursor registry: declarative cursor type definitions
CREATE TABLE cursors.cursor_types (
cursor_type text PRIMARY KEY,
source_relation regclass NOT NULL,
sort_columns text[] NOT NULL,
query_function_name text NOT NULL,
owner_role text NOT NULL
);
-- Core operations: pure pagination
cursors.init(cursor_type, filters) -> token
cursors.next(token, limit) -> page
cursors.fetch(cursor_type, filters, limit) -> page
Key characteristics:
- ✅ Opaque encrypted tokens (security)
- ✅ Token encodes: cursor_type, filters, sort_keys
- ✅ Generates query functions at registration
- ✅ RLS enforcement via function ownership
- ✅ Simple equality filters only
TypeScript Layer (src/lib/db/api/QueryCursor.ts)
Pagination + Transformation - mixed concerns:
class QueryCursor<TEntity> extends AbstractCursor<TEntity> {
// Pagination (original)
async next(): Promise<CursorPage<TEntity>>
async count(): Promise<number>
// Transformation (new) - mixed with pagination!
map<U>(fn: (item: T) => U): Cursor<U>
filter(pred: (item: T) => boolean): Cursor<T>
// Collectors (new)
toArray(), toSet(), groupingBy(), etc.
}
Issues:
- ❌ Cursor does both pagination AND streaming
- ❌ Not clear where pagination ends and transformation begins
Storage Layer (tools/forge/storage/)
File system cursors - same mixed concerns:
class FileSystemCursor<T> extends AbstractCursor<T> {
// Pagination over file listings
// Transformation via inherited map/filter
}
Proposed Architecture
Core Separation of Concerns
┌──────────────────────────────────────────────────────┐
│ Application Layer │
├──────────────────────────────────────────────────────┤
│ Stream<T>: Transformation + Collection │
│ - map(fn), filter(pred), flatMap(fn) │
│ - collect(collector), toArray(), groupingBy() │
│ - In-memory transformations │
└────────────────┬─────────────────────────────────────┘
│ uses
┌────────────────▼─────────────────────────────────────┐
│ Cursor<T>: Pure Pagination │
│ - next() -> CursorPage<T> │
│ - pages() -> AsyncIterable<CursorPage<T>> │
│ - count() -> Promise<number> │
│ - stream() -> Stream<T> (conversion) │
└────────────────┬─────────────────────────────────────┘
│ backed by
┌────────────────▼─────────────────────────────────────┐
│ Query Layer (SQL) │
│ - queries.fetch(cursor_type, filters) -> page │
│ - queries.next(token, limit) -> page │
│ - Token-based pagination │
│ - RLS enforcement │
└──────────────────────────────────────────────────────┘
1. Cursor Interface (Pure Pagination)
/**
* Cursor - Pure pagination interface
*
* Responsible ONLY for:
* - Token-based resumption
* - Incremental page loading
* - Page metadata (hasMore, totalCount)
*/
interface Cursor<T> {
// Pagination primitives
next(): Promise<CursorPage<T>>;
count(): Promise<number>;
// Async iteration over PAGES (not items!)
pages(): AsyncIterable<CursorPage<T>>;
// Convert to stream for transformations
stream(): Stream<T>;
// NO map/filter/collect here!
}
interface CursorPage<T> {
items: T[];
nextToken: string | null;
hasMore: boolean;
totalCount: number;
}
2. Stream Interface (Transformation)
/**
* Stream - Transformation and collection
*
* Responsible for:
* - Lazy transformations (map, filter, flatMap)
* - Collection (toArray, groupingBy, etc.)
*/
interface Stream<T> extends AsyncIterable<T> {
// Transformations (return new Stream)
map<U>(fn: (item: T) => U | Promise<U>): Stream<U>;
filter(predicate: (item: T) => boolean | Promise<boolean>): Stream<T>;
flatMap<U>(fn: (item: T) => Stream<U>): Stream<U>;
take(n: number): Stream<T>;
// Terminal operations (materialize)
collect<R>(collector: Collector<T, R>): Promise<R>;
toArray(): Promise<T[]>;
toSet(): Promise<Set<T>>;
groupingBy<K>(classifier: (item: T) => K): Promise<Map<K, T[]>>;
// Async iteration over ITEMS
[Symbol.asyncIterator](): AsyncIterator<T>;
}
/**
* Implementation: Functional composition with generators
*
* Key insights from FP/type theory:
* 1. Transform = async generator transformer function
* 2. Composition = function composition (elegant!)
* 3. No need for separate TransformedStream class
* 4. Single transform pipeline per stream
*/
// Transform type: maps one async iterable to another
type Transform<A, B> = (source: AsyncIterable<A>) => AsyncIterable<B>;
// Identity transform (no-op)
const identity: Transform<any, any> = (source) => source;
// Compose two transforms (right-to-left)
function compose<A, B, C>(
f: Transform<A, B>,
g: Transform<B, C>
): Transform<A, C> {
return (source) => g(f(source));
}
// Base class with functional composition
abstract class BaseStream<T> implements Stream<T> {
constructor(
protected readonly source: AsyncIterable<any>,
private readonly transform: Transform<any, T> = identity
) {}
// Apply transform pipeline
async *[Symbol.asyncIterator](): AsyncIterator<T> {
yield* this.transform(this.source);
}
// Transformations compose new pipeline
map<U>(fn: (item: T) => U | Promise<U>): Stream<U> {
const mapTransform: Transform<T, U> = async function* (source) {
for await (const item of source) {
yield await fn(item);
}
};
return new BaseStream(
this.source,
compose(this.transform, mapTransform)
);
}
filter(predicate: (item: T) => boolean | Promise<boolean>): Stream<T> {
const filterTransform: Transform<T, T> = async function* (source) {
for await (const item of source) {
if (await predicate(item)) {
yield item;
}
}
};
return new BaseStream(
this.source,
compose(this.transform, filterTransform)
);
}
flatMap<U>(fn: (item: T) => Stream<U>): Stream<U> {
const flatMapTransform: Transform<T, U> = async function* (source) {
for await (const item of source) {
yield* fn(item);
}
};
return new BaseStream(
this.source,
compose(this.transform, flatMapTransform)
);
}
take(n: number): Stream<T> {
const takeTransform: Transform<T, T> = async function* (source) {
let count = 0;
for await (const item of source) {
if (count++ >= n) break;
yield item;
}
};
return new BaseStream(
this.source,
compose(this.transform, takeTransform)
);
}
// Collectors (shared by all streams)
async toArray(): Promise<T[]> {
const result: T[] = [];
for await (const item of this) {
result.push(item);
}
return result;
}
async toSet(): Promise<Set<T>> {
const result = new Set<T>();
for await (const item of this) {
result.add(item);
}
return result;
}
async groupingBy<K>(classifier: (item: T) => K): Promise<Map<K, T[]>> {
const result = new Map<K, T[]>();
for await (const item of this) {
const key = classifier(item);
if (!result.has(key)) {
result.set(key, []);
}
result.get(key)!.push(item);
}
return result;
}
// ... other collectors
}
// Wraps cursor, flattens pages to items
class CursorStream<T> extends BaseStream<T> {
constructor(cursor: Cursor<T>) {
// Source is pages, initial transform flattens to items
super(
cursor.pages(),
async function* (pages) {
for await (const page of pages) {
yield* page.items;
}
}
);
}
}
3. Usage Examples
Basic Pagination (No Transformation)
import { QueryCursor } from '@/lib/db/api/QueryCursor';
// Get cursor from query layer
const cursor = await QueryCursor.create(client, {
query: { cursorType: 'nodes', filters: { nodeType: ['document'] } },
pageSize: 100
});
// Iterate over pages
for await (const page of cursor.pages()) {
console.log(`Got ${page.items.length} items`);
// Process page...
}
// Or get next page manually
const firstPage = await cursor.next();
Transformation with Stream
// Convert cursor to stream for transformations
const nodes = await cursor
.stream() // ← Explicit conversion: Cursor → Stream
.filter(n => n.metadata.status === 'active')
.map(n => ({ id: n.id, name: n.name }))
.toArray();
// Advanced: grouping
const byType = await cursor
.stream()
.groupingBy(n => n.type);
Schema Rename: cursors → queries
Rationale
The current cursors schema is well-designed but narrowly named. Renaming to queries provides:
- Accurate naming: "queries" better represents the pagination mechanism
- Extensibility: Room for future query features (templates, DSL, etc.)
- Clarity: Cursors are a pagination tool within the queries system
- Future-proof: Graph queries and custom queries fit naturally
Migration Approach
Simple find/replace in SQL and TypeScript (no production code exists):
-
SQL files: Replace
cursors.withqueries.in:db/001_cursors.sql→db/001_queries.sqldb/315_graph_cursors.sql→db/315_graph_queries.sql- All function/table/type names
-
TypeScript: Replace imports and references:
@db/cursors→@db/queries- Generated function names stay descriptive
-
Generated code: Re-run codegen after SQL changes:
pnpm db:codegen
No migration script needed - just rename and update references.
Implementation Plan
Phase 1: Separate Cursor and Stream ✅
Task 1.1: Update Cursor Interface
// Remove from Cursor<T>:
- map<U>(fn): Cursor<U>
- filter(pred): Cursor<T>
- collect(), toArray(), toSet(), groupingBy()
// Add to Cursor<T>:
+ stream(): Stream<T>
Task 1.2: Create Stream Class
// src/lib/cursor/stream.ts
export class CursorStream<T> implements Stream<T> {
constructor(private cursor: Cursor<T>) {}
// Implement map, filter, collect, etc.
// (Move from AbstractCursor)
}
Task 1.3: Update QueryCursor
export class QueryCursor<TEntity> implements Cursor<TEntity> {
// Keep: next(), count(), pages()
// Add: stream() { return new CursorStream(this); }
// Remove: map(), filter(), collect(), toArray(), etc.
}
Task 1.4: Update Documentation
- Update examples to use
.stream() - Add migration guide for existing code
Impact:
- ✅ Clear separation of concerns
- ✅ Explicit conversion with
.stream() - ⚠️ Existing code needs
.stream()before.map()
Migration Example:
// Before
const names = await cursor.map(u => u.name).toArray();
// After
const names = await cursor.stream().map(u => u.name).toArray();
Phase 2: Schema Rename cursors → queries
Task 2.1: Rename SQL Files
db/001_cursors.sql→db/001_queries.sqldb/315_graph_cursors.sql→db/315_graph_queries.sql
Task 2.2: Find/Replace in SQL
# In all SQL files:
cursors.cursor_types → queries.cursor_types
cursors.cursor_columns → queries.cursor_columns
cursors.page → queries.cursor_page
cursors.cursor_info → queries.cursor_info
cursors.encode_cursor → queries.encode_cursor
cursors.decode_cursor → queries.decode_cursor
cursors.fetch → queries.fetch
cursors.next → queries.next
# etc.
Task 2.3: Update TypeScript Imports
// In generated/index.ts and usage:
import { cursors } from '@/lib/db/generated';
// becomes:
import { queries } from '@/lib/db/generated';
// Usage:
cursors.fetch(...) → queries.fetch(...)
cursors.next(...) → queries.next(...)
Task 2.4: Regenerate Code
pnpm db:codegen
Task 2.5: Update Documentation
- Update all references to cursors schema
- Update examples to use queries
Impact:
- ✅ Better naming
- ✅ Extensible for future features
- ⚠️ Simple find/replace (no migration needed)
Phase 3: Verification & Testing
Task 3.1: Unit Tests
- Test Cursor interface (pagination only)
- Test Stream interface (transformations)
- Test cursor.stream() conversion
Task 3.2: Integration Tests
- Test with real database queries
- Verify pagination still works
- Verify transformations work via stream
Task 3.3: Performance
- Benchmark pagination performance
- Verify no regression from separation
Summary
Core Decisions ✅
-
✅ Separate Cursor and Stream
- Cursor = Pagination only
- Stream = Transformation only
- Explicit
.stream()conversion
-
❌ No DAG/Algebra
- Too complex for current needs
- Can add later if needed
-
✅ Schema Rename: cursors → queries
- Simple find/replace approach
- No migration needed (no production code)
- Better naming and extensibility
Implementation Steps
-
Phase 1: Separate Cursor/Stream interfaces
- Remove map/filter from Cursor
- Add stream() method
- Create CursorStream class
-
Phase 2: Rename schema
- Find/replace cursors → queries in SQL
- Update TypeScript imports
- Regenerate code
-
Phase 3: Test and verify
- Unit tests for both interfaces
- Integration tests
- Performance validation
Benefits
- ✅ Clear separation of concerns
- ✅ Better API semantics
- ✅ Extensible for future features
- ✅ Simple migration path
- ✅ No production code to migrate
References
- Database Patterns:
.cursorrules/database-patterns.md - SQL Cursors:
db/001_cursors.sql - Graph Cursors:
db/315_graph_cursors.sql - Current Implementation:
src/lib/db/api/QueryCursor.ts - Collectors:
src/lib/cursor/collectors.ts
Document Version: 2.0
Last Updated: 2025-11-16
Status: 🟢 Approved - Ready for Implementation