Creating Operators
An operator is a function that transforms a Stream into another Stream.
type Operator<T extends Signal, R extends Signal> = (source: Stream<T>) => Stream<R>;Basic Structure
Every operator follows this pattern:
import { createStream } from "cereb";import type { Signal, Operator } from "cereb";
function myOperator<T extends Signal>(): Operator<T, T> { return (source) => createStream((observer) => { return source.on({ next(signal) { // Transform and forward observer.next(signal); }, error: observer.error?.bind(observer), complete: observer.complete?.bind(observer), }); });}Side-Effect Operators
Pass signals through unchanged while running side-effects. Like spy.
function log<T extends Signal>(label: string): Operator<T, T> { return (source) => createStream((observer) => { return source.on({ next(signal) { console.log(label, signal.value); // side-effect observer.next(signal); // pass through unchanged }, error: observer.error?.bind(observer), complete: observer.complete?.bind(observer), }); });}
// UsagesinglePointer(element) .pipe(log("pointer")) .on(handle);Value-Extending Operators
Add computed properties to signal values. Like extend, offset.
function addTimestamp<T extends Signal>(): Operator<T, T> { return (source) => createStream((observer) => { return source.on({ next(signal) { // Extend the value object (signal.value as any).timestamp = Date.now(); observer.next(signal); }, error: observer.error?.bind(observer), complete: observer.complete?.bind(observer), }); });}
// UsagesinglePointer(element) .pipe(addTimestamp()) .on((s) => console.log(s.value.timestamp));Flow-Control Operators
Control which signals pass through based on conditions or other streams.
Gating (like when)
function gate<T extends Signal>( controller: Stream<Signal<string, { active: boolean }>>): Operator<T, T> { return (source) => createStream((observer) => { let isActive = false;
// Subscribe to controller const controllerUnsub = controller.on({ next(signal) { isActive = signal.value.active; }, });
// Forward source signals only when active const sourceUnsub = source.on({ next(signal) { if (isActive) { observer.next(signal); } }, error: observer.error?.bind(observer), complete: observer.complete?.bind(observer), });
// Cleanup both subscriptions return () => { controllerUnsub(); sourceUnsub(); }; });}Combining (like merge)
function combine<T extends Signal>(other: Stream<T>): Operator<T, T> { return (source) => createStream((observer) => { const unsub1 = source.on({ next: (s) => observer.next(s), error: observer.error?.bind(observer), });
const unsub2 = other.on({ next: (s) => observer.next(s), error: observer.error?.bind(observer), });
return () => { unsub1(); unsub2(); }; });}Key Points
- Always return the unsubscribe function from
source.on() - Forward
errorandcompleteto maintain stream lifecycle - Wrap logic in try/catch and call
observer.error?.()on exceptions - For multi-stream operators, cleanup all subscriptions