Savvi Studio

Cursor Pagination in Graph API

⚠️ STATUS: ✅ CURRENT IMPLEMENTATION
Last Updated: January 10, 2025
Applies to: Both current and planned implementations

This guide explains cursor pagination patterns specifically for graph operations, including querying resources, statements, and traversing relationships.

Overview

All graph API operations that return multiple results use cursor-based pagination through the query() method. This provides:

  • Efficient iteration - Stream results without loading everything into memory
  • Type-safe - Full TypeScript support with proper types
  • Stateless - Continuation tokens can be stored and resumed later
  • Consistent - Same pagination pattern across all graph operations

Quick Start

import { GraphOperations } from '@/lib/graph/api/GraphOperations';
import { asUser } from '@/lib/db';

// All query methods return a Cursor
const cursor = await asUser(async (client) => {
  const graph = new GraphOperations(client);
  
  return await graph.resources().query({
    typeNamespace: 'auth.user',
    dataContains: { status: 'active' }
  });
});

// Iterate over results
for await (const resource of cursor) {
  console.log('Resource:', resource.id);
}

Core Concepts

The Cursor Class

All query() methods return a Cursor<T> object that provides async iteration:

class Cursor<T> {
  // Fetch next page
  async next(): Promise<CursorPage<T>>;
  
  // Get total count
  async count(): Promise<number>;
  
  // Fetch all remaining results (use with caution)
  async fetchAll(): Promise<T[]>;
  
  // Async generator for pages
  async *pages(): AsyncIterable<CursorPage<T>>;
  
  // Async iterator for items
  async *[Symbol.asyncIterator](): AsyncIterator<T>;
}

CursorPage Structure

Each page contains items and continuation information:

interface CursorPage<T> {
  items: T[];              // Items in this page
  nextToken: string | null;  // Token for next page
  hasMore: boolean;        // Whether more pages exist
  totalCount: number;      // Total items matching query
}

CursorOptions

Control pagination behavior:

interface CursorOptions {
  pageSize?: number;       // Items per page (default: 100)
  maxPages?: number;       // Safety limit (default: 1000)
}

Resource Queries

Basic Query

const cursor = await graph.resources().query({
  typeNamespace: 'auth.user'
}, { pageSize: 50 });

for await (const resource of cursor) {
  console.log('User:', resource.data.name);
}

Filtered Query

const cursor = await graph.resources().query({
  typeNamespace: 'doc.document',
  dataContains: { 
    status: 'published',
    category: 'tech'
  }
});

for await (const doc of cursor) {
  console.log('Document:', doc.data.title);
}

Manual Pagination

const cursor = await graph.resources().query({
  typeNamespace: 'auth.user'
});

// First page
const firstPage = await cursor.next();
console.log(`Got ${firstPage.items.length} items`);
console.log(`Total: ${firstPage.totalCount}`);

// Second page (if available)
if (firstPage.hasMore) {
  const secondPage = await cursor.next();
  console.log(`Got ${secondPage.items.length} more items`);
}

Statement Queries

Query by Subject

// Get all statements where user is the subject
const cursor = await graph.statements().query({
  subjectId: userId
});

for await (const stmt of cursor) {
  console.log(`User relates to: ${stmt.objectId}`);
}

Query by Predicate Pattern

// Get all organization membership statements
const cursor = await graph.statements().query({
  predicate: 'org.*'
});

for await (const stmt of cursor) {
  console.log(`${stmt.subjectId} -> ${stmt.objectId}`);
}

Query by Object

// Get all statements pointing to a resource
const cursor = await graph.statements().query({
  objectId: resourceId,
  predicate: 'auth.read'
});

for await (const stmt of cursor) {
  console.log(`${stmt.subjectId} can read resource`);
}

Traversal Queries

Basic Traversal

const cursor = await graph.traverse().query(startNodeId, {
  maxDepth: 5,
  direction: 'outgoing'
});

for await (const result of cursor) {
  console.log(`Found at depth ${result.depth}: ${result.resourceId}`);
}

Filtered Traversal

const cursor = await graph.traverse().query(userId, {
  predicates: ['org.member', 'org.owns'],
  maxDepth: 10,
  includeResources: true
});

for await (const result of cursor) {
  console.log(`Path: ${result.path.join(' -> ')}`);
  if (result.resource) {
    console.log(`  Resource: ${result.resource.data.name}`);
  }
}

Iteration Patterns

Item-by-Item Processing

Best for operations that process each item independently:

const cursor = await graph.resources().query({ typeNamespace: 'auth.user' });

for await (const user of cursor) {
  await processUser(user);
}

Page-by-Page Processing

Best for batch operations:

const cursor = await graph.resources().query({ typeNamespace: 'auth.user' });

for await (const page of cursor.pages()) {
  console.log(`Processing batch of ${page.items.length} users`);
  await Promise.all(page.items.map(user => processUser(user)));
  
  if (page.hasMore) {
    console.log(`Next token: ${page.nextToken}`);
  }
}

Collecting Results

Use with caution - only for small result sets:

const cursor = await graph.resources().query({
  typeNamespace: 'auth.user',
  dataContains: { role: 'admin' }
});

// Collect all admins (assuming small set)
const admins = await cursor.fetchAll();
console.log(`Found ${admins.length} admins`);

API Endpoint Patterns

Paginated REST Endpoint

app.get('/api/resources', async (req, res) => {
  const { continuationToken, pageSize = 50, typeNamespace } = req.query;
  
  await asUser(async (client) => {
    const graph = new GraphOperations(client);
    
    // Create cursor from token or new query
    const cursor = continuationToken
      ? await Cursor.fromToken(client, continuationToken)
      : await graph.resources().query(
          { typeNamespace: typeNamespace as string },
          { pageSize: Number(pageSize) }
        );
    
    const page = await cursor.next();
    
    res.json({
      items: page.items,
      pagination: {
        nextToken: page.nextToken,
        hasMore: page.hasMore,
        totalCount: page.totalCount,
        pageSize: Number(pageSize)
      }
    });
  });
});

GraphQL Cursor Connection

const resolvers = {
  Query: {
    resources: async (_, { first, after, typeNamespace }, context) => {
      const cursor = after
        ? await Cursor.fromToken(context.client, after)
        : await context.graph.resources().query(
            { typeNamespace },
            { pageSize: first }
          );
      
      const page = await cursor.next();
      
      return {
        edges: page.items.map(item => ({
          node: item,
          cursor: page.nextToken
        })),
        pageInfo: {
          hasNextPage: page.hasMore,
          endCursor: page.nextToken
        }
      };
    }
  }
};

Performance Optimization

Choose Appropriate Page Sizes

// Small pages for UI (quick first response)
const uiCursor = await graph.resources().query(filter, { pageSize: 25 });

// Medium pages for general use (balanced)
const defaultCursor = await graph.resources().query(filter, { pageSize: 100 });

// Large pages for batch processing (efficient)
const batchCursor = await graph.resources().query(filter, { pageSize: 500 });

Set Safety Limits

// Prevent runaway queries
const cursor = await graph.resources().query(filter, {
  pageSize: 100,
  maxPages: 50  // Max 5,000 items
});

try {
  for await (const resource of cursor) {
    await processResource(resource);
  }
} catch (error) {
  if (error.message.includes('maxPages')) {
    console.error('Result set too large, refine query');
  }
}

Early Exit

const cursor = await graph.resources().query(filter);

for await (const resource of cursor) {
  if (foundWhatINeeded(resource)) {
    break;  // Cursor automatically cleaned up
  }
}

Best Practices

1. Use Async Iteration

// ✅ Good: Streams results efficiently
for await (const item of cursor) {
  await processItem(item);
}

// ❌ Bad: Loads everything into memory
const allItems = await cursor.fetchAll();
for (const item of allItems) {
  await processItem(item);
}

2. Filter Early

// ✅ Good: Filter at query time
const cursor = await graph.resources().query({
  typeNamespace: 'auth.user',
  dataContains: { status: 'active' }
});

// ❌ Bad: Filter after fetching
const cursor = await graph.resources().query({ typeNamespace: 'auth.user' });
for await (const user of cursor) {
  if (user.data.status === 'active') {
    await processUser(user);
  }
}

3. Handle Continuation Tokens

// Store token for resuming later
const page = await cursor.next();
if (page.hasMore) {
  await saveToken(userId, page.nextToken);
}

// Resume from token
const savedToken = await getToken(userId);
if (savedToken) {
  const cursor = await Cursor.fromToken(client, savedToken);
  const nextPage = await cursor.next();
}

4. Process in Batches

// ✅ Good: Batch processing
for await (const page of cursor.pages()) {
  await Promise.all(page.items.map(item => processItem(item)));
}

// ❌ Bad: Sequential processing
for await (const item of cursor) {
  await processItem(item);  // Processes one at a time
}

Error Handling

Invalid Tokens

try {
  const cursor = await Cursor.fromToken(client, invalidToken);
} catch (error) {
  console.error('Invalid or expired token');
  // Start fresh query
  const cursor = await graph.resources().query(filter);
}

Max Pages Exceeded

try {
  const cursor = await graph.resources().query(filter, { maxPages: 10 });
  await cursor.fetchAll();
} catch (error) {
  if (error.message.includes('maxPages')) {
    // Result set too large, use pagination instead
    console.error('Too many results, process incrementally');
  }
}

Database Errors

try {
  for await (const item of cursor) {
    await processItem(item);
  }
} catch (error: any) {
  if (error.code === 'CURSOR_NOT_FOUND') {
    console.error('Cursor expired, restart query');
  } else {
    throw error;
  }
}

Common Patterns

Infinite Scroll

let cursor = await graph.resources().query(filter);
let hasMore = true;

async function loadMore() {
  if (!hasMore) return [];
  
  const page = await cursor.next();
  hasMore = page.hasMore;
  
  return page.items;
}

// In UI
const items = await loadMore();
appendToList(items);

Export to File

import { createWriteStream } from 'fs';

const stream = createWriteStream('export.jsonl');
const cursor = await graph.resources().query(filter, { pageSize: 1000 });

for await (const resource of cursor) {
  stream.write(JSON.stringify(resource) + '\n');
}

stream.end();

Aggregate Statistics

const stats = {
  total: 0,
  byType: new Map<string, number>()
};

const cursor = await graph.resources().query({});

for await (const resource of cursor) {
  stats.total++;
  const count = stats.byType.get(resource.typeNamespace) || 0;
  stats.byType.set(resource.typeNamespace, count + 1);
}

console.log('Statistics:', stats);

See Also