Skip to content

Streams, Signals, Events

The dataflow core of q64. Three first-class temporal types (Signal<T, R>, Event<T>, Stream<T, R>), the @stage and graph declarations that compose them, the |> pipe operator, the pre() one-tick delay for feedback, the four canonical conversions, opt-in fusion via @fuse, and the dedicated SharedSignal<T, R> for low-latency cross-thread sharing.

The stream runtime is the task scheduler from concurrency.md; stages are tasks, the |> is a channel, and @realtime graphs pin to real-time-capable Wasm threads.

Design goals

  1. One dataflow vocabulary for the whole language. Audio, MIDI, AI tokens, video, IMU, UI clicks, file reads, network sockets — all instances of typed dataflow. No “AsyncIterator for LLMs, AudioBus for samples, EventEmitter for UI” trichotomy.
  2. Make invisible distinctions visible. Continuous vs. discrete vs. terminating gets three types. Sample rate at the type level. Cross-thread sharing has a distinct type. Wrong wiring is a compile error.
  3. Synchronous-time semantics. Within a logical tick, all simultaneous changes are coherent. Feedback cycles require explicit pre(). Inspired by Lustre / Lucid Synchrone; applicable to audio, control, simulation, UI.
  4. The compiler sees the graph. @stage-annotated functions and graph declarations are static enough for fusion analysis, topology inspection, latency bounds, resource analysis.
  5. One runtime, one scheduler. Stream graphs and async tasks share the same scheduler. No separate “reactive runtime” alongside the async runtime.

Vocabulary

WordMeaning
tickOne logical instant. Within a tick, simultaneous values are coherent.
rateThe frequency at which a Signal or Stream advances. A type parameter.
stageAn @stage-annotated function. Consumes dataflow types; produces dataflow types.
graphA graph-declared topology. First-class value; .start() / .stop().
fusionCompile-time merging of adjacent @fuse stages into a single task.
feedbackA cycle in the graph; broken by pre() (one-tick delay).

The three dataflow types

TypeWhat it isHas .current()TerminatesCarries rate
Signal<T, R>Continuous: Time → Tyesnoyes
Event<T>Discrete: [(Time, T)]nonono (sparse)
Stream<T, R>Ordered sequence with completionnoyesyes

Signal<T, R> — continuous

Always has a value at every tick. Examples: audio PCM, IMU readings, a UI element’s current position, a fixed clock.

let mic: Signal<PCM<f32>, 48.kHz>
let imu: Signal<Vec3<f32, m/s²>, 1.kHz>
let clock: Signal<Seconds, 60.Hz>

A Signal<T, R> always has a .current() -> T operation: it has a value right now, by construction. Subscribing to a signal mid- stream sees the current value immediately, not the next tick’s.

Event<T> — discrete

Exists at specific moments, not in between. Examples: MIDI messages, UI clicks, network packets, GPS fixes.

let midi: Event<MidiMessage>
let clicks: Event<Point>
let gps: Event<Vec2<f64, Degrees>>

Event<T> has no .current(). Subscribers see only events that arrive after subscription. Events do not carry a rate parameter because they are sparse — they happen when they happen.

Stream<T, R> — ordered with completion

A typed ordered sequence that eventually completes (either normally or with an error). Examples: HTTP response bodies, LLM token streams, file reads, WebSocket messages on a connection that may close.

let body: Stream<Bytes, 1.MB.per.s> // network rate
let tokens: Stream<Token<LlamaVocab>, 20.Hz> // tokens per second
let file: Stream<Bytes, ad_hoc> // file read rate is host-dependent

A Stream<T, R> carries a rate (often a coarse estimate or a host-imposed limit). Completion is a first-class signal: stream.completion() -> Future<Result<(), Error>>. The compiler checks that downstream stages handle completion.

Why three types instead of one

Stages declare which they want. Mixing them is a compile error unless you cross via a conversion:

@stage
fn render(scene: Signal<Scene, 60.Hz>) -> Signal<Frame, 60.Hz> { … }
scope {
let clicks: Event<Point> = ui.clicks()
let g = graph my_app {
let f = render(clicks) // ← STR020: expected Signal, got Event
}
}

Conversions are named and visible (see §“Conversions”).

Sample rate as a type parameter

A Signal<T, R> or Stream<T, R> carries its rate as a type parameter (a const generic per generics.md). The compiler treats two different rates as different types:

let audio: Signal<PCM<f32>, 48.kHz> = mic.read()
let video: Signal<Frame, 60.Hz> = camera.read()
@stage
fn mix(a: Signal<f32, 48.kHz>, b: Signal<f32, 48.kHz>) -> Signal<f32, 48.kHz> { … }
let m = mix(audio, video) // ← STR021: rate mismatch (48.kHz vs 60.Hz)

Cross-rate boundaries require an explicit conversion stage:

let audio_60 = audio |> resample(target: 60.Hz) // Signal<PCM<f32>, 60.Hz>
let video_48 = video |> resample(target: 48.kHz) // expensive, but explicit

Rate-polymorphic stages

Stages can be rate-generic:

@stage
fn gain<const R: Hz>(input: Signal<f32, R>, k: f32) -> Signal<f32, R> {
input.map(|x| x * k)
}
let pcm = mic.read() // Signal<f32, 48.kHz>
let boosted = gain(pcm, 2.0) // Signal<f32, 48.kHz> — R inferred

The compiler enforces that all R-parameterized signals in a stage’s signature share the same R. Mixing requires explicit resampling.

Rates as units

Rates compose with the units-of-measure system from units.md:

  • 48.kHz, 60.Hz, 1.MHz — standard frequencies.
  • 20.Hz — discrete-step rates (LLM tokens, sensor polls).
  • 1.MB.per.s — bandwidth-style rates (network streams).
  • ad_hoc — host-determined; no static rate analysis.

The rate is a compile-time value; runtime rate adaptation (jitter, drift) is handled by the runtime, not encoded in the type.

@stage declaration

A stage is a function that consumes dataflow types and produces dataflow types. Declared with @stage on a fn:

@stage
fn animate(
clock: Signal<Seconds, 60.Hz>,
clip: AnimClip,
skel: Skeleton,
) -> Signal<[Mat4<f32>], 60.Hz> {
clock.map(|t| pose_skeleton(skel, clip, t))
}

The annotation triggers stage-specific checks:

  • At least one input must be a dataflow type (Signal<T, R> / Event<T> / Stream<T, R>).
  • The return type must be a dataflow type, or a tuple of dataflow types, or () for sink stages.
  • No direct recursion. A stage calling itself forms an unschedulable graph; STR030. Use pre() for explicit feedback.
  • All R-parameterized signals in the signature must share R (STR021).
  • Effects from effects.md propagate normally (@realtime, @no_alloc, @pure).

Why annotation and not a stage keyword

The annotation pattern matches @shared (per memory.md), @managed, @derive. Stages are functions with extra rules; the annotation triggers the rules without adding a new top-level keyword. The keyword would also force users to choose between fn and stage upfront, whereas annotations let stage-ness be a property of how the function is used.

Sink stages

A stage with -> () is a sink — it consumes a stream and does something terminal (write to a device, send over the network). Stage signatures follow the smallest-capability convention from env.md §“Passing convention”: take the sub-capability you use, not the whole Env.

@stage
fn play(a: Audio, audio: Signal<PCM<f32>, 48.kHz>) { // -> () implied
a.write(audio)
}

A sink may not appear upstream of another stage in a graph (STR031).

Source stages

A stage with no dataflow inputs but a dataflow output is a source:

@stage
fn mic_input(a: Audio) -> Signal<PCM<f32>, 48.kHz> {
a.input()
}

Sources read from the runtime (microphone, network, timer) into the dataflow world. The compiler treats them as graph roots.

graph declaration

A graph declares a topology of connected stages. The declaration produces a value of type Graph<Out>, where Out is the body’s return type (() for a sink-terminated graph, otherwise the type produced at the last |>):

graph voice_pipeline {
let pcm = mic_input() // source; reads env.audio
let denoised = pcm |> denoise(threshold: 0.1)
let resampled = denoised |> resample(target: 16.kHz)
let tokens = resampled |> whisper_asr // reads env.ai
let response = tokens |> llama_complete
let synthed = response |> tts
let _ = synthed |> play // sink; reads env.audio
}
// voice_pipeline : Graph<()>
scope {
let h: Handle<()> = voice_pipeline.start()
sleep(60.s)
h.cancel()
}

The Graph<Out> type

pub face Graph<Out> {
fn start (self) -> Handle<Out> // launches the graph as a task; returns its handle. Reads ambient env.
fn stop (self: ref Self) // cancels every stage in the graph
fn snapshot (self) -> GraphSnapshot // static topology, fused groups, rates, effects
fn output (self) -> Out where Out != () // queryable output side (only for non-sink graphs)
fn completion (self) -> Future<Result<(), Error>> // resolves when all sources are exhausted, or with the first error
}

The Graph<Out> face is auto-prelude (per modules.md §“Stream primitives”). A graph declaration synthesizes one fit of Graph<Out> per declaration; user code does not write fit X : Graph<…> by hand.

For graphs with Out = () (sink-terminated), output is absent (STR064). For graphs whose body returns a dataflow value (e.g. graph chat(prompt: str) -> Stream<str, 20.Hz>), output() returns that value’s runtime handle (a Stream, Signal, or Event reader) and completion() resolves when the underlying sources have drained.

Properties:

  • A graph body is a sequence of let bindings; each binding’s RHS is a single stage call or a |> pipeline.
  • The compiler analyzes the whole topology: rate consistency, fusion candidates, effect propagation, resource bounds.
  • g.start() returns a Handle<Out> from concurrency.md; reads the ambient env to wire host-facing stages. The graph runs until cancelled, panicked, or all sources are exhausted.
  • g.stop() cancels every stage in the graph.
  • g.snapshot() returns a static description (topology, fused groups, rates, effects) — used by q64 show graph.

Graphs nest:

graph subgraph(input: Signal<PCM<f32>, R>) -> Signal<PCM<f32>, R> {
input |> denoise |> normalize
}
graph outer {
let pcm = mic_input()
let clean = pcm |> subgraph
let _ = clean |> play
}

A graph with parameters and a return type acts like a parameterized sub-topology.

Grammar

GraphDecl := Visibility? "graph" Ident GenericParams?
"(" Params? ")" ("->" TypeExpr)? Block
GraphExpr := "graph" Ident? Block // anonymous form for `let g = graph { … }`

GraphDecl joins the top-level Item list in modules.md §Grammar. GenericParams and TypeExpr follow the standard definitions from generics.md. The body is a sequence of let bindings whose RHS is a stage call or a |> pipeline; the compiler builds the topology by walking those bindings.

The |> pipe operator

|> chains stages by passing the LHS as the first positional argument of the RHS:

x |> f(y) // ≡ f(x, y)
x |> f(y) |> g(z) // ≡ g(f(x, y), z)

F# / Elixir style. Reads top-to-bottom; the subject flows through the operations. Works on any function, not just stages — stages are the common case.

Within a graph body, |> is the canonical way to wire stages; the compiler walks the desugared call tree to build the topology. Outside a graph, |> is just syntactic sugar for nested calls.

A |> whose LHS is a dataflow type and whose RHS is not a stage is STR040 (“non-stage in pipeline”) — the compiler caught a likely typo (a regular fn rather than a @stage-annotated one).

pre() for feedback cycles

A graph with feedback (a stage’s output feeding its own input) needs a one-tick delay to break the cycle. Method on Signal<T, R> and Stream<T, R>:

@stage
fn iir_filter(input: Signal<f32, R>) -> Signal<f32, R> {
var state: Signal<f32, R> = 0.0
state = input + 0.95 * state.pre() // ← state.pre() = previous tick's state
state
}

Semantics:

  • signal.pre() returns the value the signal had on the previous tick.
  • On tick 0 (or whenever the signal is first sampled), pre() returns the signal’s declared initial value (0.0 above) or T.default() if no initial is declared.
  • pre() is the only way to form a feedback cycle in a graph. A cycle without pre() is STR050 (“unbroken feedback cycle”).
  • Event<T>.pre() does not exist — events are pointwise and have no “previous tick” notion (STR051).

Inspired by Lustre / Lucid Synchrone, expressed in q64 method syntax (consistent with .map, .filter, .fold).

Conversions between Signal / Event / Stream

Four canonical conversions in the language. Everything else (map, filter, combine_latest, merge, etc.) lives in q64.streams as library code.

ConversionDirectionMeaning
signal.changes()Signal<T, R>Event<T>Emit an event whenever the signal’s value changes.
events.hold(initial: T)Event<T>Signal<T, R>Latch the last event value; rate inferred from context.
events.fold(seed: U, f)Event<T>Signal<U, R>Accumulate event history with a folding function.
signal.sample(events)Signal<T, R>, Event<U>Event<T>Sample the signal at the times the events fire.
let mouse_pos: Signal<Point, 60.Hz> = ui.mouse()
let clicks: Event<()> = ui.click()
let click_pts: Event<Point> = mouse_pos.sample(clicks)
let trail: Signal<[Point], 60.Hz> = click_pts.fold([], |xs, p| xs.push(p))
let opened: Event<bool> = trail.changes().map(|xs| xs.len() > 5)

hold and fold introduce a rate (the rate of the signal they produce) that is inferred from the surrounding graph’s rate. Within a graph @rate(60.Hz) … context, an events.hold(0) is Signal<i64, 60.Hz>.

Why exactly four

These are the conversions that cross the temporal-type boundary in a non-trivial way (Signal ↔ Event, with rate- introduction or sampling). Combinators that stay within one type (map, filter, merge on two Events of the same shape, combine_latest on two Signals at the same rate) compose trivially from these four plus ordinary functions, and live in q64.streams.

Fusion via @fuse

Adjacent stages can be merged into a single task to reduce context-switch overhead and improve SIMD usage. q64 fusion is opt-in: stages are not fused unless the user marks them @fuse.

@stage @fuse
fn denoise<const R: Hz>(input: Signal<f32, R>) -> Signal<f32, R> { … }
@stage @fuse
fn resample(input: Signal<f32, 48.kHz>) -> Signal<f32, 16.kHz> { … }
@stage // not @fuse — task boundary here
fn whisper_asr<const R: Hz>(input: Signal<f32, R>) -> Stream<Token<WhisperVocab>, 10.Hz> { … } // reads env.ai
graph voice {
let asr_in = mic_input() |> denoise |> resample // fused into one task
let tokens = asr_in |> whisper_asr // separate task
}

Effect of @fuse:

  • The compiler may merge adjacent @fuse stages into a single task body (one suspension point per group).
  • The compiler will refuse to fuse across an effect boundary (e.g., a @realtime stage and a @no_realtime stage cannot fuse), across a thread boundary, or across a stage with observable side effects between.
  • A non-@fuse stage always forms a task boundary. Use this for scheduling, measurement, or debugging.
  • A panic raised in one stage of a fused group unwinds the shared task body as a single panic — the enclosing scope sees one panic event, not one per stage in the group. Cancellation of siblings (per concurrency.md §“Panics across tasks”) proceeds the same way it would if the group were a single stage.

Why opt-in

Automatic fusion conflicts with q64’s “the compiler is your answer key” principle: the user should know which stages are tasks. @fuse makes the optimization explicit; q64 show graph shows fused groups visually.

Cross-thread signals: SharedSignal<T, R>

Signal<T, R> is by construction thread-local — it represents an in-memory continuous value within one Wasm instance. To share a signal across threads, q64 ships a distinct type backed by mem.shared (per memory.md):

@shared
struct AudioState {
level: SharedSignal<f32, 48.kHz>,
}
scope {
let state = AudioState.new()
// audio thread (writer):
spawn scope @realtime {
let pcm = mic_input(env.audio)
state.level = pcm |> rms_envelope // continuously updates
}
// UI thread (readers, possibly many):
spawn {
loop {
let v = state.level.current() // SAB-backed atomic load
draw_meter(v)
sleep(16.ms)
}
}
}

Properties:

  • SharedSignal<T, R> lives in mem.shared (per memory.md).
  • Single writer; multiple readers. The writer is the stage producing the signal; readers call .current() from any thread.
  • Latency: one tick + one atomic operation (~10 ns + tick period).
  • Cost: SAB allocation + atomic op per tick. Use only where cross-thread visibility justifies it.
  • Signal<T, R> does not implicitly convert to SharedSignal<T, R>; explicit .shareable() is required, and the result lives in mem.shared.

Why a distinct type

The alternative — making Signal<T, R> @send when T: @send — hides the cross-thread cost behind the type system. A distinct type makes “this signal pays SAB overhead” visible at the declaration. Matches q64’s “make invisible distinctions visible” principle. Also keeps the same-thread case zero-overhead.

Cross-thread events and streams

Event<T> and Stream<T, R> go through the regular channel mechanism from concurrency.md. An event stream sent across a thread boundary uses a channel<T>(…) with a policy; the receiving side wraps the receiver in an event stream.

Error propagation

A stage has no Result return type at the graph boundary — stages produce dataflow values, not Result-wrapped ones. A fallible operation inside a stage body that surfaces an Err panics with that Err value as the payload. The panic propagates per concurrency.md §“Panics across tasks”: sibling stages in the same scope are cancelled; the graph’s enclosing scope { … } catch { … } (if any) intercepts it; otherwise it re-panics at the scope’s closing brace.

panic e requires e: Panic (per errors.md §“The Panic face”). The auto-derived bridge in errors.md gives every type fitting Error an automatic Panic fit, so panic e works directly when the stage’s fallible inner call returns Result<T, E> with E: Error. For an E that does not fit Error (e.g. a bare str), wrap it explicitly in a Panic-fitting payload before unwinding — typically by constructing a stage-local error type:

pub struct ParseFailure(str)
pub fit ParseFailure : Display { fn fmt(self) -> str { "parse failure: {self.0}" } }
pub fit ParseFailure : Error { } // bridge auto-fits Panic

A panic e site whose payload doesn’t fit Panic is TYP306 (per errors.md’s diagnostic table); the stage-error case surfaces it the same way as any other call site.

@stage
fn decode(input: Stream<Bytes, R>) -> Stream<Frame, R> {
input.map(|b| match Frame.parse(b) { // Frame.parse: -> Result<Frame, ParseError>
Ok(f) -> f,
Err(e) -> panic e, // ParseError fits Error → Panic via the bridge
})
}
scope {
let g = graph audio { mic_input() |> decode |> play }
g.start().await()
} catch (e: Cancelled) {
env.out("graph shut down cleanly")
} catch (e: Panic) {
log.error("audio graph stopped: {e.fmt()}")
bring_up_silence()
}

This is deliberate: q64 has one error-handling story (the errors.md Result<T, E> + panic model), and graphs reuse it. There is no second mechanism for “in-stream errors” (no Stream<Result<T, E>> convention at the language level, though user code can use it). Inside a stage body, fallible chains use the same try propagation as any other function (the Err value flows out of the stage only when an explicit panic e is issued).

Cancelled (from concurrency.md’s cancellation model) is the expected way for a graph to shut down cleanly — caught as its own arm above so the application doesn’t conflate cleanup with crash.

Effects on stages

The effect markers from effects.md apply to stages. The compiler enforces them across the graph:

MarkerEffect on stages
@realtimeBounded execution; no alloc; pins the stage to a real-time-capable thread.
@no_allocThe stage’s body cannot allocate (linear or managed).
@pureNo mutation, no observable side effects. Composes safely.
@no_suspendThe stage cannot yield mid-body.
@sendThe stage’s outputs may cross thread boundaries.
@cancelThe stage observes cancellation. Declared explicitly: a stage that wants to observe cancellation lists ctx: Cancel in its signature (acquiring @cancel per effects.md). Stages without ctx cannot call ctx.cancelled(); the graph still shuts them down at scope close.

A @realtime graph (all stages @realtime) pins to a real-time- capable thread (audio worklet on browser, low-latency pool on native). Mixing @realtime and non-@realtime stages in one graph is allowed; only the @realtime segment pins.

The compiler verifies effect compatibility across |>:

@stage
fn play(a: Audio, pcm: Signal<PCM<f32>, 48.kHz>) @realtime { … }
@stage
fn http_post(n: Net, url: Url, body: Bytes) { … } // not @realtime
scope {
let g = graph audio {
let _ = mic_input(env.audio) |> play(env.audio) |> http_post(env.net, url"…") // ← STR060
}
}

STR060: @realtime stage piped into non-@realtime stage (this example), or Stream<T, R> piped into a @realtime stage (would block on the resulting Backpressure channel). Both shapes carry the same code; the diagnostic message distinguishes them.

Stream runtime = task scheduler

Per concurrency.md §“Stream runtime = task scheduler”, there is no separate “reactive runtime” alongside the task scheduler. They are the same. The contract:

  • A stage is a task. Stage spawning, ctx propagation, and panic unwinding follow the rules in concurrency.md unchanged.

  • A |> between stages is a channel<T>(…) from concurrency.md. The channel’s policy is determined by the dataflow type of the LHS, per this table:

    LHS dataflow typeChannel policyCapacity
    Stream<T, R>Backpressureinferred from the consumer’s @stage budget; default 8
    Signal<T, R>LatestValuesingle slot (policy property)
    Event<T>RingBufferthe event’s declared history bound; default 16

    This mapping is normative: a Signal piped to a stage always produces a LatestValue channel, a Stream always a Backpressure channel, an Event always a RingBuffer channel. User code does not name the policy at the |> site; it’s a property of the dataflow type. A consequence rule composes with concurrency.md §“Effects across concurrency”: a @realtime stage cannot consume from a Backpressure channel (which would block), so piping Stream<T, R> into a @realtime stage is STR060. Use Signal or Event (non-blocking policies) for @realtime inputs.

  • Fused stages share a task body (one suspension point per fused group). A panic raised inside a fused group unwinds the shared task body and is delivered to the graph’s enclosing scope as a single panic — siblings see one cancellation event, not one per fused stage.

  • A graph’s Handle<Out> (from g.start()) is the Handle<T> from concurrency.md. g.stop() is semantically h.cancel() on that handle (per concurrency.md §“Where ctx comes from”); the graph’s root ctx propagates to every stage.

  • A graph’s panic propagates via scope { … } catch { … } — same mechanism as any task-internal panic.

  • A graph’s cancellation observes the enclosing scope’s ctx — cancelling the scope cancels the graph as a unit.

Examples

Voice agent (mic → ASR → LLM → TTS → speaker)

graph voice_agent {
let pcm: Signal<PCM<f32>, 48.kHz> = mic_input()
let denoised: Signal<PCM<f32>, 48.kHz> = pcm |> denoise(threshold: 0.05)
let prepped: Signal<PCM<f32>, 16.kHz> = denoised |> resample(target: 16.kHz)
let tokens: Stream<Token<WhisperVocab>, 20.Hz>
= prepped |> whisper_asr
let text: Event<str> = tokens |> assemble_utterances()
let response: Stream<Token<LlamaVocab>, 50.Hz>
= text |> llama_complete
let synth: Signal<PCM<f32>, 24.kHz> = response |> tts
let out: Signal<PCM<f32>, 48.kHz> = synth |> resample(target: 48.kHz)
let _ = out |> play
}
scope {
let h = voice_agent.start()
select {
_ = h.await() -> {},
_ = ctx.cancelled() -> h.cancel(),
}
} catch (e: Panic) {
log.error("voice agent crashed: {e}")
}

Every Signal↔Stream↔Event crossing is explicit. Every rate change is a named conversion. The compiler verifies the whole graph at build time.

Reactive UI counter

@stage
fn counter(clicks: Event<()>) -> Signal<i64, 60.Hz> {
clicks.fold(0, |n, _| n + 1)
}
@stage @fuse
fn render(count: Signal<i64, 60.Hz>) -> Signal<Frame, 60.Hz> @realtime {
count.map(|n| render_button(text: "Clicks: {n}"))
}
graph ui {
let clicks = env.ui.button("Click me").clicks()
let count = counter(clicks)
let frames = render(count)
let _ = frames |> blit // reads env.ui
}

Audio with IIR feedback

@stage
fn iir(input: Signal<f32, 48.kHz>, alpha: f32) -> Signal<f32, 48.kHz> {
var y: Signal<f32, 48.kHz> = 0.0
y = input + alpha * y.pre() // one-tick feedback
y
}
graph audio_path {
let pcm = mic_input()
let smoothed = pcm |> iir(alpha: 0.9)
let _ = smoothed |> play
}

Cross-thread visualization with SharedSignal

@shared
struct Meter {
rms: SharedSignal<f32, 100.Hz>,
}
graph audio_engine(meter: ref Meter) {
let pcm = mic_input()
meter.rms = pcm |> rms_envelope(window: 10.ms) // writer (audio thread)
let _ = pcm |> play
}
scope {
let meter = Meter.new()
spawn { audio_engine.start(ref meter).await() } // audio thread
spawn { // UI thread (reader)
loop {
draw_vu_meter(meter.rms.current())
sleep(16.ms)
}
}
}

The audio thread is @realtime (via @realtime on audio_engine); the UI thread reads via meter.rms.current() at its own pace. Latency from RMS update to UI read: one 48 kHz tick plus one atomic load (~20 µs + 10 ns).

LLM token pipeline with completion

graph chat(prompt: str) -> Stream<str, 20.Hz> {
let toks: Stream<Token<LlamaVocab>, 20.Hz> = llama_complete(prompt)
toks |> detokenize
}
scope {
let g = chat.start("explain monads")
for_each(g.output) |chunk| {
env.out.write(chunk)
}
match g.completion().await() {
Ok(()) -> env.out.write("\n"),
Err(e) -> log.warn("generation failed: {e}"),
}
}

g.output and g.completion() are the two queryable surfaces of a graph that produces a Stream<T, R>.

Diagnostic codes

All stream-and-graph diagnostics use the STR prefix. Numbers stable, never reused. STR070-STR099 reserved for expansion.

CodeShort messageWhen
STR010@stage on function with no dataflow inputAn @stage-annotated fn has no Signal / Event / Stream parameter.
STR011@stage on function with non-dataflow returnReturn type isn’t a dataflow type, a tuple of them, or ().
STR012@stage body uses a forbidden featureE.g. capturing an outer mutable variable in a way the graph analyzer can’t model.
STR020dataflow type mismatch in pipelineSignal piped into a Stream consumer, or similar.
STR021rate mismatchTwo signals at different rates in a stage signature without explicit resample.
STR022rate parameter must be constA rate type parameter declared without const.
STR030direct stage recursionA stage calls itself. Use pre() for feedback.
STR031sink stage upstream of another stageA -> () stage appears in the middle of a pipeline.
STR032source stage downstream of another stageA no-input source appears after a `
STR040non-stage in pipeline`
STR050unbroken feedback cycleA graph cycle with no pre() to break it.
STR051pre() on Event<T>Events have no previous-tick notion.
STR052pre() outside a stage bodypre() is only meaningful inside graph evaluation.
STR060@realtime × `>` effect violation
STR061non-@send payload crossing thread boundaryA stage’s output is consumed on a different thread; payload isn’t @send.
STR062SharedSignal with non-@send TThe wrapped type must be @send.
STR063multiple writers on SharedSignalA SharedSignal<T, R> may have at most one writer.
STR064.output() on a sink-terminated graphA Graph<()> exposes no output(); only graphs whose body returns a dataflow value do.

All codes use the envelope from diagnostics.md.

Open items deferred

  • Hot-swap stages at runtime. A graph today is static; runtime reconfiguration (replacing a stage in a running graph) is deferred. Use stop + start until the design lands.
  • Persistent stream state across restarts. Checkpointing a graph’s accumulator state to disk; resuming. Library-level for now (q64.streams.persist).
  • Distributed stages. A @stage whose body runs on another machine. Out of scope for v0; the type system can model it later.
  • Graph composition operators beyond |>. Parallel splits (fork), joins, fan-in / fan-out. Currently expressible by binding the upstream to a let and reusing; sugar deferred.
  • Latency bounds in the type system. Signal<T, R> could carry a max-latency parameter; today latency is a property of the runtime, not the type. Tracked as a research direction.
  • @stage introspection at comptime. Programmatically reading a stage’s signature, inputs, outputs, effects. Useful for meta-programming; deferred until the comptime story lands more broadly.
  • concurrency.md — the task scheduler that runs the graph; the channel mechanism that |> desugars to; the scope { … } catch { … } form for stage panics.
  • effects.md@realtime, @no_alloc, @pure, @send, @cancel on stages.
  • memory.md@shared, Atomic<T>, SharedSignal<T, R>’s mem.shared backing; the implicit scope arena that stage bodies allocate into.
  • generics.md — const generics for the rate parameter R; type parameters on Signal<T, R> / Event<T> / Stream<T, R>.
  • types.md — units of measure (48.kHz, 60.Hz, 1.MB.per.s) used as rate values.
  • faces.mdStage, Source, Sink, RateAware as blessed faces (additions tracked in faces.md’s “Open items deferred”).
  • env.md — the WASIp3 (WASI 0.3) Component Model mapping. Under the default preview3 target a Stream<T, R> lowers to WIT stream<T> and a Future<T> to WIT future<T> at the component boundary; q64 tracks the WASIp3 RC snapshot pinned there.
  • errors.mdResult<T, E> and panic semantics that stage error propagation reuses.
  • modules.mdSignal, Event, Stream, SharedSignal, @stage, graph, pre, |>, conversion primitives (changes / hold / fold / sample) are auto-prelude.
  • diagnostics.md — envelope format for the STR* codes.