EdgeStream
Custom Transport
Implement ITransport to integrate EdgeStream with any messaging protocol — MQTT, AMQP, gRPC streaming, WebRTC data channels, or proprietary protocols. The BaseTransport abstract class provides status management, error tracking, and handler registration so you can focus on the protocol layer.
ITransport Interface
interface ITransport {
readonly id: string;
readonly type: 'signalr' | 'websocket' | 'sse' | 'http-polling';
readonly status: TransportStatus;
readonly lastError?: Error;
// Lifecycle
connect(): Promise<void>;
disconnect(): Promise<void>;
// Messaging
send(data: string | ArrayBuffer): Promise<void>;
// Event registration (returns unsubscribe function)
onMessage(handler: (raw: string | ArrayBuffer) => void): () => void;
onStatusChange(handler: (status: TransportStatus) => void): () => void;
onError(handler: (error: Error) => void): () => void;
}
Extending BaseTransport (Recommended)
Extend BaseTransport to get status management, error tracking, and handler plumbing for free. You only need to implement connect(), disconnect(), and send():
import { BaseTransport } from 'edge-stream-js';
import type { TransportConfig } from 'edge-stream-js';
import mqtt from 'mqtt';
export class MqttTransport extends BaseTransport {
private client: mqtt.MqttClient | null = null;
constructor(id: string, config: TransportConfig) {
super(id, 'websocket', config); // use 'websocket' as closest type
}
async connect(): Promise<void> {
if (this.status === 'connected') return;
this.setStatus('connecting'); // inherited from BaseTransport
return new Promise((resolve, reject) => {
this.client = mqtt.connect(this.config.url, {
clientId: `edge-stream-${this.id}`,
username: this.config.accessToken,
reconnectPeriod: 1000,
});
this.client.on('connect', () => {
this.setStatus('connected'); // notifies all onStatusChange handlers
resolve();
});
this.client.on('message', (_topic, payload) => {
this.emitMessage(payload.toString()); // feeds EdgeStream pipeline
});
this.client.on('error', (error) => {
this.setStatus('error');
this.emitError(error); // records lastError, notifies onError handlers
reject(error);
});
this.client.on('offline', () => {
this.setStatus('reconnecting');
});
});
}
async disconnect(): Promise<void> {
if (this.client) {
await new Promise<void>((resolve) => {
this.client!.end(false, {}, () => resolve());
});
this.client = null;
}
this.setStatus('disconnected');
this.cleanup(); // clears all handler sets
}
async send(data: string | ArrayBuffer): Promise<void> {
if (!this.client || this.status !== 'connected') {
throw new Error('MQTT transport not connected');
}
const payload = typeof data === 'string' ? data : Buffer.from(data);
await new Promise<void>((resolve, reject) => {
this.client!.publish('edge-stream/outbound', payload, (err) => {
if (err) {
this.emitError(err);
reject(err);
} else {
resolve();
}
});
});
}
}
Registering a Custom Transport
To use a custom transport, create it manually and attach it to the server via server.setTransport() after registration:
const stream = createEdgeStream({ logLevel: 'debug' });
stream.registerServer({
id: 'mqtt',
type: 'bas',
url: 'wss://mqtt.bizfirst.com:8883',
// transportConfig not needed — we attach the transport manually below
transportConfig: { type: 'websocket', url: 'wss://mqtt.bizfirst.com:8883' }, // placeholder
});
// Replace the auto-created transport with our custom one
const server = stream.server('mqtt')!;
const mqttTransport = new MqttTransport('mqtt-transport', {
type: 'websocket',
url: 'wss://mqtt.bizfirst.com:8883',
accessToken: authStore.getToken(),
});
server.setTransport(mqttTransport);
// Pipeline and subscriptions work as normal
server.incomingPipeline.addHook(new NormalizationHook());
stream.subscribe('mqtt', 'sensor.*', sensorHandler);
await stream.start();
BaseTransport Protected API
| Method | Use In | Effect |
|---|---|---|
setStatus(status) | connect, disconnect, reconnect | Updates status, fires onStatusChange handlers (deduplicated) |
emitMessage(raw) | Message received handler | Routes raw bytes into the EdgeStream pipeline |
emitError(error) | Error handlers | Records lastError, fires onError handlers |
cleanup() | disconnect | Clears all handler Sets — call at end of disconnect() |
Call cleanup() on Disconnect
Always call
this.cleanup() at the end of your disconnect() implementation. This prevents memory leaks by clearing the internal Sets of message, status, and error handlers.