@cbnsndwch/zero-sources

Build Your Own

Learn how to build your own custom change source for databases other than MongoDB

Overview

While Zero Sources provides a MongoDB implementation, you can create change sources for any database that supports change detection (Postgres, MySQL, etc.).

Requirements

A change source must:

  1. Detect Changes: Watch the database for inserts, updates, and deletes
  2. Transform Events: Convert database events to Zero protocol
  3. Stream Changes: Push changes to clients via WebSocket
  4. Track Progress: Maintain watermarks for each client

Implementation Steps

1. Define Your Schema

import { createTableSchema } from '@rocicorp/zero';

export const schema = {
    message: createTableSchema({
        tableName: 'message',
        columns: {
            id: { type: 'string' },
            content: { type: 'string' }
        },
        primaryKey: ['id']
    })
};

2. Create Change Detector

interface ChangeEvent {
    type: 'insert' | 'update' | 'delete';
    table: string;
    key: Record<string, any>;
    value?: Record<string, any>;
    timestamp: number;
}

class PostgresChangeDetector {
    async *watchChanges(): AsyncIterable<ChangeEvent> {
        // Use logical replication or polling
        const client = new PGClient();

        for await (const change of client.logicalReplication()) {
            yield this.transformChange(change);
        }
    }

    private transformChange(pgChange: any): ChangeEvent {
        return {
            type: pgChange.operation,
            table: pgChange.table,
            key: { id: pgChange.id },
            value: pgChange.after,
            timestamp: Date.now()
        };
    }
}

3. Implement WebSocket Gateway

import { WebSocketGateway, SubscribeMessage } from '@nestjs/websockets';
import { Socket } from 'socket.io';

@WebSocketGateway()
export class CustomChangeSourceGateway {
    constructor(private detector: PostgresChangeDetector) {}

    @SubscribeMessage('changes/v0/stream')
    async handleStream(client: Socket, request: StreamRequest) {
        const { tables, watermark } = request;

        // Send changes to client
        for await (const change of this.detector.watchChanges()) {
            if (tables.includes(change.table)) {
                client.emit('change', change);
            }
        }
    }
}

4. Add Watermark Support

class WatermarkManager {
    constructor(private storage: WatermarkStorage) {}

    async getWatermark(clientId: string, table: string): Promise<string> {
        return this.storage.getWatermark(clientId, table);
    }

    async setWatermark(
        clientId: string,
        table: string,
        watermark: string
    ): Promise<void> {
        return this.storage.setWatermark(clientId, table, watermark);
    }
}

5. Create NestJS Module

@Module({
    providers: [
        PostgresChangeDetector,
        CustomChangeSourceGateway,
        WatermarkManager
    ]
})
export class CustomChangeSourceModule {}

Database-Specific Implementations

PostgreSQL

Use logical replication or LISTEN/NOTIFY:

const client = new PGClient();

// Subscribe to notifications
await client.query('LISTEN table_changes');

client.on('notification', msg => {
    const change = JSON.parse(msg.payload);
    // Process change
});

MySQL

Use binlog replication:

import { ZongJi } from 'zongji';

const zongji = new ZongJi({
    host: 'localhost',
    user: 'root',
    password: 'password'
});

zongji.on('binlog', event => {
    // Process binlog event
});

zongji.start();

DynamoDB Streams

import { DynamoDBStreams } from 'aws-sdk';

const streams = new DynamoDBStreams();

const { Records } = await streams
    .getRecords({
        ShardIterator: shardIterator
    })
    .promise();

for (const record of Records) {
    // Process stream record
}

Best Practices

  1. Batch Changes: Send changes in batches for efficiency
  2. Error Handling: Implement robust retry logic
  3. Backpressure: Handle slow clients gracefully
  4. Monitoring: Add metrics and logging
  5. Testing: Write comprehensive tests

Example: Complete Implementation

See the MongoDB change source implementation as a reference:

  • apps/source-mongodb-server/src/features/change-source/

How was this page?