Creating Operators

An operator is a function that transforms a Stream into another Stream.

type Operator<T extends Signal, R extends Signal> = (source: Stream<T>) => Stream<R>;

Basic Structure

Every operator follows this pattern:

import { createStream } from "cereb";
import type { Signal, Operator } from "cereb";
function myOperator<T extends Signal>(): Operator<T, T> {
return (source) =>
createStream((observer) => {
return source.on({
next(signal) {
// Transform and forward
observer.next(signal);
},
error: observer.error?.bind(observer),
complete: observer.complete?.bind(observer),
});
});
}

Side-Effect Operators

Pass signals through unchanged while running side-effects. Like spy.

function log<T extends Signal>(label: string): Operator<T, T> {
return (source) =>
createStream((observer) => {
return source.on({
next(signal) {
console.log(label, signal.value); // side-effect
observer.next(signal); // pass through unchanged
},
error: observer.error?.bind(observer),
complete: observer.complete?.bind(observer),
});
});
}
// Usage
singlePointer(element)
.pipe(log("pointer"))
.on(handle);

Value-Extending Operators

Add computed properties to signal values. Like extend, offset.

function addTimestamp<T extends Signal>(): Operator<T, T> {
return (source) =>
createStream((observer) => {
return source.on({
next(signal) {
// Extend the value object
(signal.value as any).timestamp = Date.now();
observer.next(signal);
},
error: observer.error?.bind(observer),
complete: observer.complete?.bind(observer),
});
});
}
// Usage
singlePointer(element)
.pipe(addTimestamp())
.on((s) => console.log(s.value.timestamp));

Flow-Control Operators

Control which signals pass through based on conditions or other streams.

Gating (like when)

function gate<T extends Signal>(
controller: Stream<Signal<string, { active: boolean }>>
): Operator<T, T> {
return (source) =>
createStream((observer) => {
let isActive = false;
// Subscribe to controller
const controllerUnsub = controller.on({
next(signal) {
isActive = signal.value.active;
},
});
// Forward source signals only when active
const sourceUnsub = source.on({
next(signal) {
if (isActive) {
observer.next(signal);
}
},
error: observer.error?.bind(observer),
complete: observer.complete?.bind(observer),
});
// Cleanup both subscriptions
return () => {
controllerUnsub();
sourceUnsub();
};
});
}

Combining (like merge)

function combine<T extends Signal>(other: Stream<T>): Operator<T, T> {
return (source) =>
createStream((observer) => {
const unsub1 = source.on({
next: (s) => observer.next(s),
error: observer.error?.bind(observer),
});
const unsub2 = other.on({
next: (s) => observer.next(s),
error: observer.error?.bind(observer),
});
return () => {
unsub1();
unsub2();
};
});
}

Key Points

  • Always return the unsubscribe function from source.on()
  • Forward error and complete to maintain stream lifecycle
  • Wrap logic in try/catch and call observer.error?.() on exceptions
  • For multi-stream operators, cleanup all subscriptions