Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Schema with Async Generators

Stream encode and decode with type-safe schemas using async generators.

Type-Safe Streaming

Use createCodec for schema-aware encoding and decoding:

// examples/stream/schema-async-generators.ts
// Demonstrates: Schema-aware streaming with async generators using toRelish

import { RStruct, RString, field, createCodec } from "@grounds/schema";
import { type Static } from "@sinclair/typebox";

// Define a Message schema using RStruct
const MessageSchema = RStruct({
  sender: field(0, RString()),
  content: field(1, RString()),
});

type Message = Static<typeof MessageSchema>;

// Generate typed message values using an async generator
async function* generateMessages(): AsyncGenerator<Message> {
  yield { sender: "alice", content: "hello" };
  yield { sender: "bob", content: "world" };
  yield { sender: "charlie", content: "how are you?" };
}

// Example: Encode typed messages using codec, then decode back
async function example(): Promise<void> {
  console.log("=== Schema-aware Async Generators ===\n");

  // Create a codec for the Message schema
  // This provides encode/decode with full type safety
  const codec = createCodec(MessageSchema);

  // Step 1: Encode messages using the codec
  console.log("Encoding messages...");
  const chunks: Array<Uint8Array> = [];

  for await (const message of generateMessages()) {
    const encodeResult = codec.encode(message);
    encodeResult.match(
      (bytes) => {
        chunks.push(bytes);
        console.log(`  Encoded message from ${message.sender}: ${bytes.length} bytes`);
      },
      (err) => console.error("  Encode error:", err.message),
    );
  }

  console.log(`\nSuccessfully encoded ${chunks.length} messages\n`);

  // Step 2: Decode bytes back to typed messages
  console.log("Decoding messages...");
  const decodedMessages: Array<Message> = [];

  for (const chunk of chunks) {
    const decodeResult = codec.decode(chunk);
    decodeResult.match(
      (message) => {
        decodedMessages.push(message);
        console.log(`  Decoded message from ${message.sender}: "${message.content}"`);
      },
      (err) => {
        console.error("  Decode error:", err.message);
      },
    );
  }

  console.log(`\nSuccessfully decoded ${decodedMessages.length} messages\n`);

  // Step 3: Verify round-trip
  console.log("=== Results ===");
  console.log("Original messages:");
  for await (const msg of generateMessages()) {
    console.log(`  ${msg.sender}: ${msg.content}`);
  }

  console.log("\nDecoded messages:");
  for (const msg of decodedMessages) {
    console.log(`  ${msg.sender}: ${msg.content}`);
  }

  // Verify all match
  let allMatch = true;
  for await (const origMsg of generateMessages()) {
    const found = decodedMessages.find(
      (m) => m.sender === origMsg.sender && m.content === origMsg.content,
    );
    if (!found) {
      allMatch = false;
      break;
    }
  }

  console.log(`\nRound-trip successful: ${allMatch ? "YES" : "NO"}`);
}

await example();

Creating a Codec

Define a schema and create a typed codec:

import { RStruct, RString, field, createCodec } from "@grounds/schema";
import { type Static } from "@sinclair/typebox";

const MessageSchema = RStruct({
  sender: field(0, RString()),
  content: field(1, RString()),
});

type Message = Static<typeof MessageSchema>;

const codec = createCodec(MessageSchema);

The codec provides:

  • codec.encode(value): Encodes typed values to Uint8Array
  • codec.decode(bytes): Decodes bytes to typed values
  • Full TypeScript type inference from the schema

Encoding with Codecs

Encode typed messages using the codec:

async function* generateMessages(): AsyncGenerator<Message> {
  yield { sender: "alice", content: "hello" };
  yield { sender: "bob", content: "world" };
}

const chunks: Array<Uint8Array> = [];

for await (const message of generateMessages()) {
  const result = codec.encode(message);
  result.match(
    (bytes) => chunks.push(bytes),
    (err) => console.error(err.message),
  );
}

Decoding with Codecs

Decode bytes back to typed messages:

const decodedMessages: Array<Message> = [];

for (const chunk of chunks) {
  const result = codec.decode(chunk);
  result.match(
    (message) => decodedMessages.push(message),
    (err) => console.error(err.message),
  );
}

Type Safety

TypeScript enforces schema types:

  • codec.encode() accepts only values matching the schema type
  • codec.decode() returns values with the correct TypeScript type
  • Compile-time errors prevent type mismatches
  • No manual type casting required

Error Handling

Codecs return Result types for explicit error handling:

  • Encoding errors for invalid schema values
  • Decoding errors for malformed binary data
  • Per-message error handling in streams
  • Continue processing after recoverable errors

Next Steps

Learn about Schema with Web Streams for the WHATWG Streams API with type safety.