Schema with Async Generators
Stream encode and decode with type-safe schemas using async generators.
Type-Safe Streaming
Use createCodec for schema-aware encoding and decoding:
// examples/stream/schema-async-generators.ts
// Demonstrates: Schema-aware streaming with async generators using toRelish
import { RStruct, RString, field, createCodec } from "@grounds/schema";
import { type Static } from "@sinclair/typebox";
// Define a Message schema using RStruct
const MessageSchema = RStruct({
sender: field(0, RString()),
content: field(1, RString()),
});
type Message = Static<typeof MessageSchema>;
// Generate typed message values using an async generator
async function* generateMessages(): AsyncGenerator<Message> {
yield { sender: "alice", content: "hello" };
yield { sender: "bob", content: "world" };
yield { sender: "charlie", content: "how are you?" };
}
// Example: Encode typed messages using codec, then decode back
async function example(): Promise<void> {
console.log("=== Schema-aware Async Generators ===\n");
// Create a codec for the Message schema
// This provides encode/decode with full type safety
const codec = createCodec(MessageSchema);
// Step 1: Encode messages using the codec
console.log("Encoding messages...");
const chunks: Array<Uint8Array> = [];
for await (const message of generateMessages()) {
const encodeResult = codec.encode(message);
encodeResult.match(
(bytes) => {
chunks.push(bytes);
console.log(` Encoded message from ${message.sender}: ${bytes.length} bytes`);
},
(err) => console.error(" Encode error:", err.message),
);
}
console.log(`\nSuccessfully encoded ${chunks.length} messages\n`);
// Step 2: Decode bytes back to typed messages
console.log("Decoding messages...");
const decodedMessages: Array<Message> = [];
for (const chunk of chunks) {
const decodeResult = codec.decode(chunk);
decodeResult.match(
(message) => {
decodedMessages.push(message);
console.log(` Decoded message from ${message.sender}: "${message.content}"`);
},
(err) => {
console.error(" Decode error:", err.message);
},
);
}
console.log(`\nSuccessfully decoded ${decodedMessages.length} messages\n`);
// Step 3: Verify round-trip
console.log("=== Results ===");
console.log("Original messages:");
for await (const msg of generateMessages()) {
console.log(` ${msg.sender}: ${msg.content}`);
}
console.log("\nDecoded messages:");
for (const msg of decodedMessages) {
console.log(` ${msg.sender}: ${msg.content}`);
}
// Verify all match
let allMatch = true;
for await (const origMsg of generateMessages()) {
const found = decodedMessages.find(
(m) => m.sender === origMsg.sender && m.content === origMsg.content,
);
if (!found) {
allMatch = false;
break;
}
}
console.log(`\nRound-trip successful: ${allMatch ? "YES" : "NO"}`);
}
await example();
Creating a Codec
Define a schema and create a typed codec:
import { RStruct, RString, field, createCodec } from "@grounds/schema";
import { type Static } from "@sinclair/typebox";
const MessageSchema = RStruct({
sender: field(0, RString()),
content: field(1, RString()),
});
type Message = Static<typeof MessageSchema>;
const codec = createCodec(MessageSchema);
The codec provides:
codec.encode(value): Encodes typed values toUint8Arraycodec.decode(bytes): Decodes bytes to typed values- Full TypeScript type inference from the schema
Encoding with Codecs
Encode typed messages using the codec:
async function* generateMessages(): AsyncGenerator<Message> {
yield { sender: "alice", content: "hello" };
yield { sender: "bob", content: "world" };
}
const chunks: Array<Uint8Array> = [];
for await (const message of generateMessages()) {
const result = codec.encode(message);
result.match(
(bytes) => chunks.push(bytes),
(err) => console.error(err.message),
);
}
Decoding with Codecs
Decode bytes back to typed messages:
const decodedMessages: Array<Message> = [];
for (const chunk of chunks) {
const result = codec.decode(chunk);
result.match(
(message) => decodedMessages.push(message),
(err) => console.error(err.message),
);
}
Type Safety
TypeScript enforces schema types:
codec.encode()accepts only values matching the schema typecodec.decode()returns values with the correct TypeScript type- Compile-time errors prevent type mismatches
- No manual type casting required
Error Handling
Codecs return Result types for explicit error handling:
- Encoding errors for invalid schema values
- Decoding errors for malformed binary data
- Per-message error handling in streams
- Continue processing after recoverable errors
Next Steps
Learn about Schema with Web Streams for the WHATWG Streams API with type safety.