Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schema with Web Streams

Use the WHATWG Streams API with type-safe schema encoding and decoding.

Type-Safe Transform Streams

Use createSchemaEncoderStream and createSchemaDecoderStream:

// examples/stream/schema-web-streams.ts
// Demonstrates: Schema-aware Web Streams with automatic type conversion

import { createSchemaEncoderStream, createSchemaDecoderStream } from "@grounds/stream";
import { RStruct, RString, field } from "@grounds/schema";
import { type Static } from "@sinclair/typebox";

// Define a Message schema using RStruct
const MessageSchema = RStruct({
  sender: field(0, RString()),
  content: field(1, RString()),
});

type Message = Static<typeof MessageSchema>;

async function example(): Promise<void> {
  console.log("=== Schema-aware Web Streams ===\n");

  // Create typed Message values
  const messages: Array<Message> = [
    { sender: "alice", content: "hello" },
    { sender: "bob", content: "world" },
    { sender: "charlie", content: "how are you?" },
  ];

  console.log("Original messages:");
  for (const msg of messages) {
    console.log(`  ${msg.sender}: ${msg.content}`);
  }
  console.log();

  // Step 1: Create a readable stream of typed Message values
  const messageStream = new ReadableStream<Message>({
    start(controller) {
      for (const msg of messages) {
        controller.enqueue(msg);
      }
      controller.close();
    },
  });

  // Step 2: Pipe through schema encoder (accepts Message, outputs Uint8Array)
  console.log("Encoding messages through schema encoder stream...");
  const encodedStream = messageStream.pipeThrough(createSchemaEncoderStream(MessageSchema));

  // Collect encoded chunks
  const chunks: Array<Uint8Array> = [];
  const encodedReader = encodedStream.getReader();

  while (true) {
    const { done, value } = await encodedReader.read();
    if (done) break;
    chunks.push(value);
    console.log(`  Encoded chunk: ${value.length} bytes`);
  }

  console.log(`Total encoded: ${chunks.length} chunks\n`);

  // Step 3: Create readable stream from encoded chunks
  const chunkStream = new ReadableStream<Uint8Array>({
    start(controller) {
      for (const chunk of chunks) {
        controller.enqueue(chunk);
      }
      controller.close();
    },
  });

  // Step 4: Pipe through schema decoder (accepts Uint8Array, outputs typed Message)
  console.log("Decoding messages through schema decoder stream...");
  const decodedStream = chunkStream.pipeThrough(createSchemaDecoderStream(MessageSchema));

  // Collect decoded typed values
  const decodedMessages: Array<Message> = [];
  const decodedReader = decodedStream.getReader();

  while (true) {
    const { done, value } = await decodedReader.read();
    if (done) break;
    decodedMessages.push(value);
    console.log(`  Decoded message from ${value.sender}: "${value.content}"`);
  }

  console.log(`\nSuccessfully decoded ${decodedMessages.length} messages\n`);

  // Step 5: Verify round-trip
  console.log("=== Verification ===");
  console.log("Decoded messages:");
  for (const msg of decodedMessages) {
    console.log(`  ${msg.sender}: ${msg.content}`);
  }

  // Check if all messages match
  let allMatch = true;
  for (let i = 0; i < messages.length; i++) {
    if (
      messages[i].sender !== decodedMessages[i].sender ||
      messages[i].content !== decodedMessages[i].content
    ) {
      allMatch = false;
      break;
    }
  }

  console.log(`\nRound-trip successful: ${allMatch ? "YES" : "NO"}`);
}

await example();

createSchemaEncoderStream

Creates a TransformStream that encodes typed values to Uint8Array:

import { createSchemaEncoderStream } from "@grounds/stream";
import { RStruct, RString, field } from "@grounds/schema";

const MessageSchema = RStruct({
  sender: field(0, RString()),
  content: field(1, RString()),
});

const encoderStream = createSchemaEncoderStream(MessageSchema);

// Stream accepts typed Message values, outputs Uint8Array
messageStream.pipeThrough(encoderStream).pipeTo(networkSink);

The encoder stream:

  • Accepts values matching the schema type
  • Automatically converts to Relish wire format
  • Outputs Uint8Array chunks
  • Validates values against schema

createSchemaDecoderStream

Creates a TransformStream that decodes Uint8Array to typed values:

const decoderStream = createSchemaDecoderStream(MessageSchema);

// Stream accepts Uint8Array, outputs typed Message values
networkSource.pipeThrough(decoderStream).pipeTo(messageHandler);

The decoder stream:

  • Accepts Uint8Array chunks
  • Parses Relish wire format
  • Outputs typed values matching the schema
  • Validates decoded data against schema

Pipeline Composition

Chain schema transforms with other streams:

sourceStream
  .pipeThrough(createSchemaEncoderStream(MessageSchema))
  .pipeThrough(compressionStream)
  .pipeTo(networkSink);

networkSource
  .pipeThrough(decompressionStream)
  .pipeThrough(createSchemaDecoderStream(MessageSchema))
  .pipeTo(messageHandler);

Type Safety

TypeScript enforces schema types throughout pipelines:

  • Encoder input must match schema type
  • Decoder output is typed by schema
  • Compile-time errors for type mismatches
  • No manual validation needed

Browser Compatibility

Schema streams work in the same environments as basic streams:

  • Modern browsers (Chrome, Firefox, Safari, Edge)
  • Node.js 16+ (with --experimental-fetch or Node 18+)
  • Deno
  • Cloudflare Workers

Next Steps

See the Reference section for wire format details.