@cbnsndwch/zero-sources

Change Sources

Learn about change source implementations

Change sources are the core components that watch databases for changes and stream them to Zero clients in real-time.

Overview

A change source:

  1. Watches a database for changes (inserts, updates, deletes)
  2. Transforms database changes to Zero protocol format
  3. Streams changes to connected clients via WebSocket
  4. Tracks synchronization progress with watermarks

Available Implementations

Architecture

┌─────────────────┐
│   MongoDB       │
│   Database      │
└────────┬────────┘
         │ Change Stream

┌─────────────────┐
│  Change Source  │
│   - Watch       │
│   - Transform   │
│   - Stream      │
└────────┬────────┘
         │ WebSocket

┌─────────────────┐
│  Zero Cache     │
└────────┬────────┘


┌─────────────────┐
│   Clients       │
└─────────────────┘

Key Concepts

Change Streams

MongoDB change streams provide real-time notifications of data changes:

const changeStream = collection.watch();

for await (const change of changeStream) {
    console.log('Change:', change.operationType);
}

Event Transformation

Changes are transformed from MongoDB format to Zero protocol:

// MongoDB change event
{
  operationType: 'insert',
  fullDocument: { _id: '123', content: 'Hello' }
}

// Zero protocol event
{
  type: 'insert',
  table: 'message',
  key: { id: '123' },
  value: { id: '123', content: 'Hello' }
}

WebSocket Streaming

Changes are streamed to clients via WebSocket:

@WebSocketGateway()
export class ChangeSourceGateway {
    @SubscribeMessage('changes/v0/stream')
    async handleStream(client: Socket, request: StreamRequest) {
        // Stream changes to client
    }
}

Configuration

Schema Configuration

Define your Zero schemas:

export const messageSchema = createTableSchema({
    tableName: 'message',
    columns: {
        id: { type: 'string' },
        content: { type: 'string' }
    },
    primaryKey: ['id']
});

Table Mappings

Map MongoDB collections to Zero tables:

{
    "message": {
        "collection": "messages",
        "fields": {
            "id": "_id",
            "content": "content"
        }
    }
}

Deployment

Docker Deployment

docker run -d \
  -e MONGODB_URI=mongodb://mongo:27017 \
  -e SCHEMA_URL=http://app/schema \
  -p 3000:3000 \
  zero-source-mongodb

Kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
    name: change-source
spec:
    replicas: 3
    template:
        spec:
            containers:
                - name: change-source
                  image: zero-source-mongodb:latest
                  env:
                      - name: MONGODB_URI
                        valueFrom:
                            secretKeyRef:
                                name: mongo-secret
                                key: uri

Best Practices

  1. Enable Replica Set: MongoDB must run in replica set mode
  2. Index Fields: Index fields used in filters and sorts
  3. Monitor Performance: Track change stream lag
  4. Handle Errors: Implement retry logic for transient failures
  5. Scale Horizontally: Run multiple instances for high availability

How was this page?