Portal Community

ITransport Interface

interface ITransport {
  readonly id: string;
  readonly type: 'signalr' | 'websocket' | 'sse' | 'http-polling';
  readonly status: TransportStatus;
  readonly lastError?: Error;

  // Lifecycle
  connect(): Promise<void>;
  disconnect(): Promise<void>;

  // Messaging
  send(data: string | ArrayBuffer): Promise<void>;

  // Event registration (returns unsubscribe function)
  onMessage(handler: (raw: string | ArrayBuffer) => void): () => void;
  onStatusChange(handler: (status: TransportStatus) => void): () => void;
  onError(handler: (error: Error) => void): () => void;
}

Extending BaseTransport (Recommended)

Extend BaseTransport to get status management, error tracking, and handler plumbing for free. You only need to implement connect(), disconnect(), and send():

import { BaseTransport } from 'edge-stream-js';
import type { TransportConfig } from 'edge-stream-js';
import mqtt from 'mqtt';

export class MqttTransport extends BaseTransport {
  private client: mqtt.MqttClient | null = null;

  constructor(id: string, config: TransportConfig) {
    super(id, 'websocket', config); // use 'websocket' as closest type
  }

  async connect(): Promise<void> {
    if (this.status === 'connected') return;

    this.setStatus('connecting'); // inherited from BaseTransport

    return new Promise((resolve, reject) => {
      this.client = mqtt.connect(this.config.url, {
        clientId: `edge-stream-${this.id}`,
        username: this.config.accessToken,
        reconnectPeriod: 1000,
      });

      this.client.on('connect', () => {
        this.setStatus('connected'); // notifies all onStatusChange handlers
        resolve();
      });

      this.client.on('message', (_topic, payload) => {
        this.emitMessage(payload.toString()); // feeds EdgeStream pipeline
      });

      this.client.on('error', (error) => {
        this.setStatus('error');
        this.emitError(error); // records lastError, notifies onError handlers
        reject(error);
      });

      this.client.on('offline', () => {
        this.setStatus('reconnecting');
      });
    });
  }

  async disconnect(): Promise<void> {
    if (this.client) {
      await new Promise<void>((resolve) => {
        this.client!.end(false, {}, () => resolve());
      });
      this.client = null;
    }
    this.setStatus('disconnected');
    this.cleanup(); // clears all handler sets
  }

  async send(data: string | ArrayBuffer): Promise<void> {
    if (!this.client || this.status !== 'connected') {
      throw new Error('MQTT transport not connected');
    }

    const payload = typeof data === 'string' ? data : Buffer.from(data);
    await new Promise<void>((resolve, reject) => {
      this.client!.publish('edge-stream/outbound', payload, (err) => {
        if (err) {
          this.emitError(err);
          reject(err);
        } else {
          resolve();
        }
      });
    });
  }
}

Registering a Custom Transport

To use a custom transport, create it manually and attach it to the server via server.setTransport() after registration:

const stream = createEdgeStream({ logLevel: 'debug' });

stream.registerServer({
  id: 'mqtt',
  type: 'bas',
  url: 'wss://mqtt.bizfirst.com:8883',
  // transportConfig not needed — we attach the transport manually below
  transportConfig: { type: 'websocket', url: 'wss://mqtt.bizfirst.com:8883' }, // placeholder
});

// Replace the auto-created transport with our custom one
const server = stream.server('mqtt')!;
const mqttTransport = new MqttTransport('mqtt-transport', {
  type: 'websocket',
  url: 'wss://mqtt.bizfirst.com:8883',
  accessToken: authStore.getToken(),
});

server.setTransport(mqttTransport);

// Pipeline and subscriptions work as normal
server.incomingPipeline.addHook(new NormalizationHook());
stream.subscribe('mqtt', 'sensor.*', sensorHandler);
await stream.start();

BaseTransport Protected API

MethodUse InEffect
setStatus(status)connect, disconnect, reconnectUpdates status, fires onStatusChange handlers (deduplicated)
emitMessage(raw)Message received handlerRoutes raw bytes into the EdgeStream pipeline
emitError(error)Error handlersRecords lastError, fires onError handlers
cleanup()disconnectClears all handler Sets — call at end of disconnect()
Call cleanup() on Disconnect Always call this.cleanup() at the end of your disconnect() implementation. This prevents memory leaks by clearing the internal Sets of message, status, and error handlers.