Change Sources
Learn about change source implementations
Change sources are the core components that watch databases for changes and stream them to Zero clients in real-time.
Overview
A change source:
- Watches a database for changes (inserts, updates, deletes)
- Transforms database changes to Zero protocol format
- Streams changes to connected clients via WebSocket
- Tracks synchronization progress with watermarks
Available Implementations
MongoDB Server
Production-ready MongoDB change source server
Build Your Own Change Source
Build your own change source
Architecture
┌─────────────────┐
│ MongoDB │
│ Database │
└────────┬────────┘
│ Change Stream
▼
┌─────────────────┐
│ Change Source │
│ - Watch │
│ - Transform │
│ - Stream │
└────────┬────────┘
│ WebSocket
▼
┌─────────────────┐
│ Zero Cache │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Clients │
└─────────────────┘Key Concepts
Change Streams
MongoDB change streams provide real-time notifications of data changes:
const changeStream = collection.watch();
for await (const change of changeStream) {
console.log('Change:', change.operationType);
}Event Transformation
Changes are transformed from MongoDB format to Zero protocol:
// MongoDB change event
{
operationType: 'insert',
fullDocument: { _id: '123', content: 'Hello' }
}
// Zero protocol event
{
type: 'insert',
table: 'message',
key: { id: '123' },
value: { id: '123', content: 'Hello' }
}WebSocket Streaming
Changes are streamed to clients via WebSocket:
@WebSocketGateway()
export class ChangeSourceGateway {
@SubscribeMessage('changes/v0/stream')
async handleStream(client: Socket, request: StreamRequest) {
// Stream changes to client
}
}Configuration
Schema Configuration
Define your Zero schemas:
export const messageSchema = createTableSchema({
tableName: 'message',
columns: {
id: { type: 'string' },
content: { type: 'string' }
},
primaryKey: ['id']
});Table Mappings
Map MongoDB collections to Zero tables:
{
"message": {
"collection": "messages",
"fields": {
"id": "_id",
"content": "content"
}
}
}Deployment
Docker Deployment
docker run -d \
-e MONGODB_URI=mongodb://mongo:27017 \
-e SCHEMA_URL=http://app/schema \
-p 3000:3000 \
zero-source-mongodbKubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
name: change-source
spec:
replicas: 3
template:
spec:
containers:
- name: change-source
image: zero-source-mongodb:latest
env:
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongo-secret
key: uriBest Practices
- Enable Replica Set: MongoDB must run in replica set mode
- Index Fields: Index fields used in filters and sorts
- Monitor Performance: Track change stream lag
- Handle Errors: Implement retry logic for transient failures
- Scale Horizontally: Run multiple instances for high availability
How was this page?