Savvi Studio

Cursor, Stream, and Query Architecture

Executive Summary

This document proposes a clean separation of concerns between pagination (Cursor) and transformation (Stream), with a schema rename from cursors to queries for better clarity and extensibility.

Status: 🟢 Approved - Ready for Implementation
Created: 2025-11-16
Updated: 2025-11-16
Authors: Platform Engineering Team


Table of Contents

  1. Problem Statement
  2. Current State Analysis
  3. Proposed Architecture
  4. Schema Rename: cursors → queries
  5. Implementation Plan

Problem Statement

The Conflation

We currently mix two orthogonal concerns in our cursor implementation:

Concern 1: Pagination (Original SQL cursor system)

  • Token-based resumption
  • Incremental page loading
  • Opaque encrypted tokens
  • Core responsibility: "Give me the next chunk of data"
  • Lives in: SQL (cursors schema) + TypeScript wrapper

Concern 2: Transformation (Added in recent refactor)

  • map, filter, collect operations
  • Java Streams-style collectors
  • Lazy evaluation
  • Core responsibility: "Transform data as it flows"
  • Lives in: TypeScript only (application layer)

Why This Matters

  1. Conflates responsibilities: Cursor does too much
  2. Naming confusion: "Cursor" implies pagination, but also does stream processing
  3. API clarity: Users should explicitly convert to stream for transformations

Current State Analysis

SQL Layer (db/001_cursors.sql)

Pure pagination system - well-designed for its purpose:

-- Cursor registry: declarative cursor type definitions
CREATE TABLE cursors.cursor_types (
    cursor_type text PRIMARY KEY,
    source_relation regclass NOT NULL,
    sort_columns text[] NOT NULL,
    query_function_name text NOT NULL,
    owner_role text NOT NULL
);

-- Core operations: pure pagination
cursors.init(cursor_type, filters) -> token
cursors.next(token, limit) -> page
cursors.fetch(cursor_type, filters, limit) -> page

Key characteristics:

  • ✅ Opaque encrypted tokens (security)
  • ✅ Token encodes: cursor_type, filters, sort_keys
  • ✅ Generates query functions at registration
  • ✅ RLS enforcement via function ownership
  • ✅ Simple equality filters only

TypeScript Layer (src/lib/db/api/QueryCursor.ts)

Pagination + Transformation - mixed concerns:

class QueryCursor<TEntity> extends AbstractCursor<TEntity> {
    // Pagination (original)
    async next(): Promise<CursorPage<TEntity>>
    async count(): Promise<number>
    
    // Transformation (new) - mixed with pagination!
    map<U>(fn: (item: T) => U): Cursor<U>
    filter(pred: (item: T) => boolean): Cursor<T>
    
    // Collectors (new)
    toArray(), toSet(), groupingBy(), etc.
}

Issues:

  • ❌ Cursor does both pagination AND streaming
  • ❌ Not clear where pagination ends and transformation begins

Storage Layer (tools/forge/storage/)

File system cursors - same mixed concerns:

class FileSystemCursor<T> extends AbstractCursor<T> {
    // Pagination over file listings
    // Transformation via inherited map/filter
}

Proposed Architecture

Core Separation of Concerns

┌──────────────────────────────────────────────────────┐
│ Application Layer                                     │
├──────────────────────────────────────────────────────┤
│ Stream<T>: Transformation + Collection               │
│  - map(fn), filter(pred), flatMap(fn)                │
│  - collect(collector), toArray(), groupingBy()       │
│  - In-memory transformations                         │
└────────────────┬─────────────────────────────────────┘
                 │ uses
┌────────────────▼─────────────────────────────────────┐
│ Cursor<T>: Pure Pagination                           │
│  - next() -> CursorPage<T>                           │
│  - pages() -> AsyncIterable<CursorPage<T>>           │
│  - count() -> Promise<number>                        │
│  - stream() -> Stream<T> (conversion)                │
└────────────────┬─────────────────────────────────────┘
                 │ backed by
┌────────────────▼─────────────────────────────────────┐
│ Query Layer (SQL)                                    │
│  - queries.fetch(cursor_type, filters) -> page       │
│  - queries.next(token, limit) -> page                │
│  - Token-based pagination                            │
│  - RLS enforcement                                   │
└──────────────────────────────────────────────────────┘

1. Cursor Interface (Pure Pagination)

/**
 * Cursor - Pure pagination interface
 * 
 * Responsible ONLY for:
 * - Token-based resumption
 * - Incremental page loading
 * - Page metadata (hasMore, totalCount)
 */
interface Cursor<T> {
    // Pagination primitives
    next(): Promise<CursorPage<T>>;
    count(): Promise<number>;
    
    // Async iteration over PAGES (not items!)
    pages(): AsyncIterable<CursorPage<T>>;
    
    // Convert to stream for transformations
    stream(): Stream<T>;
    
    // NO map/filter/collect here!
}

interface CursorPage<T> {
    items: T[];
    nextToken: string | null;
    hasMore: boolean;
    totalCount: number;
}

2. Stream Interface (Transformation)

/**
 * Stream - Transformation and collection
 * 
 * Responsible for:
 * - Lazy transformations (map, filter, flatMap)
 * - Collection (toArray, groupingBy, etc.)
 */
interface Stream<T> extends AsyncIterable<T> {
    // Transformations (return new Stream)
    map<U>(fn: (item: T) => U | Promise<U>): Stream<U>;
    filter(predicate: (item: T) => boolean | Promise<boolean>): Stream<T>;
    flatMap<U>(fn: (item: T) => Stream<U>): Stream<U>;
    take(n: number): Stream<T>;
    
    // Terminal operations (materialize)
    collect<R>(collector: Collector<T, R>): Promise<R>;
    toArray(): Promise<T[]>;
    toSet(): Promise<Set<T>>;
    groupingBy<K>(classifier: (item: T) => K): Promise<Map<K, T[]>>;
    
    // Async iteration over ITEMS
    [Symbol.asyncIterator](): AsyncIterator<T>;
}

/**
 * Implementation: Functional composition with generators
 * 
 * Key insights from FP/type theory:
 * 1. Transform = async generator transformer function
 * 2. Composition = function composition (elegant!)
 * 3. No need for separate TransformedStream class
 * 4. Single transform pipeline per stream
 */

// Transform type: maps one async iterable to another
type Transform<A, B> = (source: AsyncIterable<A>) => AsyncIterable<B>;

// Identity transform (no-op)
const identity: Transform<any, any> = (source) => source;

// Compose two transforms (right-to-left)
function compose<A, B, C>(
    f: Transform<A, B>,
    g: Transform<B, C>
): Transform<A, C> {
    return (source) => g(f(source));
}

// Base class with functional composition
abstract class BaseStream<T> implements Stream<T> {
    constructor(
        protected readonly source: AsyncIterable<any>,
        private readonly transform: Transform<any, T> = identity
    ) {}
    
    // Apply transform pipeline
    async *[Symbol.asyncIterator](): AsyncIterator<T> {
        yield* this.transform(this.source);
    }
    
    // Transformations compose new pipeline
    map<U>(fn: (item: T) => U | Promise<U>): Stream<U> {
        const mapTransform: Transform<T, U> = async function* (source) {
            for await (const item of source) {
                yield await fn(item);
            }
        };
        return new BaseStream(
            this.source,
            compose(this.transform, mapTransform)
        );
    }
    
    filter(predicate: (item: T) => boolean | Promise<boolean>): Stream<T> {
        const filterTransform: Transform<T, T> = async function* (source) {
            for await (const item of source) {
                if (await predicate(item)) {
                    yield item;
                }
            }
        };
        return new BaseStream(
            this.source,
            compose(this.transform, filterTransform)
        );
    }
    
    flatMap<U>(fn: (item: T) => Stream<U>): Stream<U> {
        const flatMapTransform: Transform<T, U> = async function* (source) {
            for await (const item of source) {
                yield* fn(item);
            }
        };
        return new BaseStream(
            this.source,
            compose(this.transform, flatMapTransform)
        );
    }
    
    take(n: number): Stream<T> {
        const takeTransform: Transform<T, T> = async function* (source) {
            let count = 0;
            for await (const item of source) {
                if (count++ >= n) break;
                yield item;
            }
        };
        return new BaseStream(
            this.source,
            compose(this.transform, takeTransform)
        );
    }
    
    // Collectors (shared by all streams)
    async toArray(): Promise<T[]> {
        const result: T[] = [];
        for await (const item of this) {
            result.push(item);
        }
        return result;
    }
    
    async toSet(): Promise<Set<T>> {
        const result = new Set<T>();
        for await (const item of this) {
            result.add(item);
        }
        return result;
    }
    
    async groupingBy<K>(classifier: (item: T) => K): Promise<Map<K, T[]>> {
        const result = new Map<K, T[]>();
        for await (const item of this) {
            const key = classifier(item);
            if (!result.has(key)) {
                result.set(key, []);
            }
            result.get(key)!.push(item);
        }
        return result;
    }
    
    // ... other collectors
}

// Wraps cursor, flattens pages to items
class CursorStream<T> extends BaseStream<T> {
    constructor(cursor: Cursor<T>) {
        // Source is pages, initial transform flattens to items
        super(
            cursor.pages(),
            async function* (pages) {
                for await (const page of pages) {
                    yield* page.items;
                }
            }
        );
    }
}

3. Usage Examples

Basic Pagination (No Transformation)

import { QueryCursor } from '@/lib/db/api/QueryCursor';

// Get cursor from query layer
const cursor = await QueryCursor.create(client, {
    query: { cursorType: 'nodes', filters: { nodeType: ['document'] } },
    pageSize: 100
});

// Iterate over pages
for await (const page of cursor.pages()) {
    console.log(`Got ${page.items.length} items`);
    // Process page...
}

// Or get next page manually
const firstPage = await cursor.next();

Transformation with Stream

// Convert cursor to stream for transformations
const nodes = await cursor
    .stream()  // ← Explicit conversion: Cursor → Stream
    .filter(n => n.metadata.status === 'active')
    .map(n => ({ id: n.id, name: n.name }))
    .toArray();

// Advanced: grouping
const byType = await cursor
    .stream()
    .groupingBy(n => n.type);

Schema Rename: cursors → queries

Rationale

The current cursors schema is well-designed but narrowly named. Renaming to queries provides:

  1. Accurate naming: "queries" better represents the pagination mechanism
  2. Extensibility: Room for future query features (templates, DSL, etc.)
  3. Clarity: Cursors are a pagination tool within the queries system
  4. Future-proof: Graph queries and custom queries fit naturally

Migration Approach

Simple find/replace in SQL and TypeScript (no production code exists):

  1. SQL files: Replace cursors. with queries. in:

    • db/001_cursors.sqldb/001_queries.sql
    • db/315_graph_cursors.sqldb/315_graph_queries.sql
    • All function/table/type names
  2. TypeScript: Replace imports and references:

    • @db/cursors@db/queries
    • Generated function names stay descriptive
  3. Generated code: Re-run codegen after SQL changes:

    pnpm db:codegen
    

No migration script needed - just rename and update references.


Implementation Plan

Phase 1: Separate Cursor and Stream ✅

Task 1.1: Update Cursor Interface

// Remove from Cursor<T>:
- map<U>(fn): Cursor<U>
- filter(pred): Cursor<T>
- collect(), toArray(), toSet(), groupingBy()

// Add to Cursor<T>:
+ stream(): Stream<T>

Task 1.2: Create Stream Class

// src/lib/cursor/stream.ts
export class CursorStream<T> implements Stream<T> {
    constructor(private cursor: Cursor<T>) {}
    
    // Implement map, filter, collect, etc.
    // (Move from AbstractCursor)
}

Task 1.3: Update QueryCursor

export class QueryCursor<TEntity> implements Cursor<TEntity> {
    // Keep: next(), count(), pages()
    // Add: stream() { return new CursorStream(this); }
    // Remove: map(), filter(), collect(), toArray(), etc.
}

Task 1.4: Update Documentation

  • Update examples to use .stream()
  • Add migration guide for existing code

Impact:

  • ✅ Clear separation of concerns
  • ✅ Explicit conversion with .stream()
  • ⚠️ Existing code needs .stream() before .map()

Migration Example:

// Before
const names = await cursor.map(u => u.name).toArray();

// After
const names = await cursor.stream().map(u => u.name).toArray();

Phase 2: Schema Rename cursors → queries

Task 2.1: Rename SQL Files

  • db/001_cursors.sqldb/001_queries.sql
  • db/315_graph_cursors.sqldb/315_graph_queries.sql

Task 2.2: Find/Replace in SQL

# In all SQL files:
cursors.cursor_types     → queries.cursor_types
cursors.cursor_columns   → queries.cursor_columns
cursors.page            → queries.cursor_page
cursors.cursor_info     → queries.cursor_info
cursors.encode_cursor   → queries.encode_cursor
cursors.decode_cursor   → queries.decode_cursor
cursors.fetch           → queries.fetch
cursors.next            → queries.next
# etc.

Task 2.3: Update TypeScript Imports

// In generated/index.ts and usage:
import { cursors } from '@/lib/db/generated';
// becomes:
import { queries } from '@/lib/db/generated';

// Usage:
cursors.fetch(...) → queries.fetch(...)
cursors.next(...)  → queries.next(...)

Task 2.4: Regenerate Code

pnpm db:codegen

Task 2.5: Update Documentation

  • Update all references to cursors schema
  • Update examples to use queries

Impact:

  • ✅ Better naming
  • ✅ Extensible for future features
  • ⚠️ Simple find/replace (no migration needed)

Phase 3: Verification & Testing

Task 3.1: Unit Tests

  • Test Cursor interface (pagination only)
  • Test Stream interface (transformations)
  • Test cursor.stream() conversion

Task 3.2: Integration Tests

  • Test with real database queries
  • Verify pagination still works
  • Verify transformations work via stream

Task 3.3: Performance

  • Benchmark pagination performance
  • Verify no regression from separation

Summary

Core Decisions ✅

  1. ✅ Separate Cursor and Stream

    • Cursor = Pagination only
    • Stream = Transformation only
    • Explicit .stream() conversion
  2. ❌ No DAG/Algebra

    • Too complex for current needs
    • Can add later if needed
  3. ✅ Schema Rename: cursors → queries

    • Simple find/replace approach
    • No migration needed (no production code)
    • Better naming and extensibility

Implementation Steps

  1. Phase 1: Separate Cursor/Stream interfaces

    • Remove map/filter from Cursor
    • Add stream() method
    • Create CursorStream class
  2. Phase 2: Rename schema

    • Find/replace cursors → queries in SQL
    • Update TypeScript imports
    • Regenerate code
  3. Phase 3: Test and verify

    • Unit tests for both interfaces
    • Integration tests
    • Performance validation

Benefits

  • ✅ Clear separation of concerns
  • ✅ Better API semantics
  • ✅ Extensible for future features
  • ✅ Simple migration path
  • ✅ No production code to migrate

References

  • Database Patterns: .cursorrules/database-patterns.md
  • SQL Cursors: db/001_cursors.sql
  • Graph Cursors: db/315_graph_cursors.sql
  • Current Implementation: src/lib/db/api/QueryCursor.ts
  • Collectors: src/lib/cursor/collectors.ts

Document Version: 2.0
Last Updated: 2025-11-16
Status: 🟢 Approved - Ready for Implementation