Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Async Generators

Stream encode and decode with async generators.

Streaming Encode/Decode

Use encodeIterable and decodeIterable for streaming:

// examples/stream/async-generators.ts
// Demonstrates: Streaming encode/decode with async generators

import { encodeIterable, decodeIterable } from "@grounds/stream";
import { String_, U32, Bool, type RelishValue, type DecodedValue } from "@grounds/core";

// Generate values using an async generator
async function* generateValues(): AsyncGenerator<RelishValue> {
  yield String_("hello");
  yield U32(42);
  yield Bool(true);
}

// Encode values to byte chunks
async function example(): Promise<void> {
  const chunks: Array<Uint8Array> = [];

  // encodeIterable yields Result<Uint8Array, EncodeError> for each value
  for await (const result of encodeIterable(generateValues())) {
    result.match(
      (bytes) => chunks.push(bytes),
      (err) => console.error("Encode error:", err.message),
    );
  }

  console.log("Encoded", chunks.length, "chunks");

  // Decode chunks back to values
  async function* yieldChunks(): AsyncGenerator<Uint8Array> {
    for (const chunk of chunks) {
      yield chunk;
    }
  }

  const values: Array<DecodedValue> = [];

  // decodeIterable yields Result<DecodedValue, DecodeError> for each value
  for await (const result of decodeIterable(yieldChunks())) {
    result.match(
      (value) => values.push(value),
      (err) => console.error("Decode error:", err.message),
    );
  }

  console.log("Decoded", values.length, "values");
  console.log("Values:", values);
}

await example();

encodeIterable

Encodes values from an async generator:

async function* values(): AsyncGenerator<RelishValue> {
  yield { type: TypeCode.String, value: "hello" };
  yield { type: TypeCode.U32, value: 42 };
}

for await (const result of encodeIterable(values())) {
  result.match(
    (bytes) => sendChunk(bytes),
    (err) => console.error(err),
  );
}

decodeIterable

Decodes values from an async generator of byte chunks:

async function* chunks(): AsyncGenerator<Uint8Array> {
  yield await receiveChunk();
}

for await (const result of decodeIterable(chunks())) {
  result.match(
    (value) => processValue(value),
    (err) => console.error(err),
  );
}

Error Handling

Each yielded item is a Result, allowing per-item error handling:

  • Continue processing after recoverable errors
  • Accumulate errors for batch reporting
  • Stop on first error if needed

Next Steps

Learn about Web Streams for the WHATWG Streams API.

For type-safe streaming with schemas, see Schema with Async Generators.