Build Your Own
Learn how to build your own custom change source for databases other than MongoDB
Overview
While Zero Sources provides a MongoDB implementation, you can create change sources for any database that supports change detection (Postgres, MySQL, etc.).
Requirements
A change source must:
- Detect Changes: Watch the database for inserts, updates, and deletes
- Transform Events: Convert database events to Zero protocol
- Stream Changes: Push changes to clients via WebSocket
- Track Progress: Maintain watermarks for each client
Implementation Steps
1. Define Your Schema
import { createTableSchema } from '@rocicorp/zero';
export const schema = {
message: createTableSchema({
tableName: 'message',
columns: {
id: { type: 'string' },
content: { type: 'string' }
},
primaryKey: ['id']
})
};2. Create Change Detector
interface ChangeEvent {
type: 'insert' | 'update' | 'delete';
table: string;
key: Record<string, any>;
value?: Record<string, any>;
timestamp: number;
}
class PostgresChangeDetector {
async *watchChanges(): AsyncIterable<ChangeEvent> {
// Use logical replication or polling
const client = new PGClient();
for await (const change of client.logicalReplication()) {
yield this.transformChange(change);
}
}
private transformChange(pgChange: any): ChangeEvent {
return {
type: pgChange.operation,
table: pgChange.table,
key: { id: pgChange.id },
value: pgChange.after,
timestamp: Date.now()
};
}
}3. Implement WebSocket Gateway
import { WebSocketGateway, SubscribeMessage } from '@nestjs/websockets';
import { Socket } from 'socket.io';
@WebSocketGateway()
export class CustomChangeSourceGateway {
constructor(private detector: PostgresChangeDetector) {}
@SubscribeMessage('changes/v0/stream')
async handleStream(client: Socket, request: StreamRequest) {
const { tables, watermark } = request;
// Send changes to client
for await (const change of this.detector.watchChanges()) {
if (tables.includes(change.table)) {
client.emit('change', change);
}
}
}
}4. Add Watermark Support
class WatermarkManager {
constructor(private storage: WatermarkStorage) {}
async getWatermark(clientId: string, table: string): Promise<string> {
return this.storage.getWatermark(clientId, table);
}
async setWatermark(
clientId: string,
table: string,
watermark: string
): Promise<void> {
return this.storage.setWatermark(clientId, table, watermark);
}
}5. Create NestJS Module
@Module({
providers: [
PostgresChangeDetector,
CustomChangeSourceGateway,
WatermarkManager
]
})
export class CustomChangeSourceModule {}Database-Specific Implementations
PostgreSQL
Use logical replication or LISTEN/NOTIFY:
const client = new PGClient();
// Subscribe to notifications
await client.query('LISTEN table_changes');
client.on('notification', msg => {
const change = JSON.parse(msg.payload);
// Process change
});MySQL
Use binlog replication:
import { ZongJi } from 'zongji';
const zongji = new ZongJi({
host: 'localhost',
user: 'root',
password: 'password'
});
zongji.on('binlog', event => {
// Process binlog event
});
zongji.start();DynamoDB Streams
import { DynamoDBStreams } from 'aws-sdk';
const streams = new DynamoDBStreams();
const { Records } = await streams
.getRecords({
ShardIterator: shardIterator
})
.promise();
for (const record of Records) {
// Process stream record
}Best Practices
- Batch Changes: Send changes in batches for efficiency
- Error Handling: Implement robust retry logic
- Backpressure: Handle slow clients gracefully
- Monitoring: Add metrics and logging
- Testing: Write comprehensive tests
Example: Complete Implementation
See the MongoDB change source implementation as a reference:
apps/source-mongodb-server/src/features/change-source/
How was this page?