Generators
Swap how input is produced, how the model is called, and how tools are resolved — without changing the participant or the environment.
BaseAgentParticipant and BaseHumanParticipant are deliberately thin shells around three generator interfaces. Swap any of them to change how items are produced without touching the environment, the participants, or any consumers.
| Interface | Used by | Yields |
|---|---|---|
InputItemSource | BaseHumanParticipant, BaseAgentParticipant | UserMessageItem, DeveloperMessageItem, SystemMessageItem |
InferenceRunner | BaseAgentParticipant | ReasoningItem, FunctionCallItem, ModelMessageItem |
FunctionCallRunner | BaseAgentParticipant | FunctionCallOutputItem |
All three are AsyncIterable-shaped: they yield items as they become available, and the participant streams those items into the environment one-by-one.
Custom InputItemSource
Anything that can produce client items over time can be an InputItemSource. Here is a pushable in-memory queue:
import {
InputItemSource,
UserMessageItem,
DeveloperMessageItem,
SystemMessageItem,
} from '@mozaik-ai/core';
type InputItem = UserMessageItem | DeveloperMessageItem | SystemMessageItem;
export class QueueInputSource implements InputItemSource {
private readonly queue: InputItem[] = [];
private resolveNext?: () => void;
push(item: InputItem) {
this.queue.push(item);
this.resolveNext?.();
this.resolveNext = undefined;
}
async *stream(signal?: AbortSignal): AsyncIterable<InputItem> {
while (!signal?.aborted) {
while (this.queue.length > 0) {
yield this.queue.shift()!;
}
await new Promise<void>((resolve) => (this.resolveNext = resolve));
}
}
}Use the same shape for stdin, websockets, an HTTP queue, or anything that produces user/developer/system messages over time.
Custom InferenceRunner
Wrap any model runtime — including OpenAIResponses — and decide how its output becomes a stream of items. Here we expand a single InferenceResponse into per-item delivery:
import {
InferenceRunner,
InferenceRequest,
ModelContext,
GenerativeModel,
OpenAIResponses,
ReasoningItem,
FunctionCallItem,
ModelMessageItem,
} from '@mozaik-ai/core';
type InferenceItem = ReasoningItem | FunctionCallItem | ModelMessageItem;
export class OpenAIInferenceRunner implements InferenceRunner {
private readonly runtime = new OpenAIResponses();
async *run(
context: ModelContext,
model: GenerativeModel,
signal?: AbortSignal,
): AsyncIterable<InferenceItem> {
const response = await this.runtime.infer(new InferenceRequest(model, context));
for (const item of response.contextItems) {
yield item as InferenceItem;
}
}
}Replace the body with a streaming runtime and items will flow into the environment as soon as the model produces them.
Custom FunctionCallRunner
Resolve a FunctionCallItem against a tool registry and yield its output:
import {
FunctionCallRunner,
FunctionCallItem,
FunctionCallOutputItem,
Tool,
} from '@mozaik-ai/core';
export class ToolRegistryFunctionCallRunner implements FunctionCallRunner {
constructor(private readonly tools: Tool[]) {}
async *run(
call: FunctionCallItem,
signal?: AbortSignal,
): AsyncIterable<FunctionCallOutputItem> {
const tool = this.tools.find((t) => t.name === call.name);
if (!tool) throw new Error(`Unknown tool: ${call.name}`);
const result = await tool.invoke(JSON.parse(call.args));
yield FunctionCallOutputItem.create(call.callId, JSON.stringify(result));
}
}If you need fan-out (e.g. partial outputs, progress events), yield as many items as you like — they will all stream into the environment in order.
Wiring it together
Drop the three generators into BaseAgentParticipant and join the environment:
import { BaseAgentParticipant, AgenticEnvironment } from '@mozaik-ai/core';
const agent = new BaseAgentParticipant(
new QueueInputSource(),
new OpenAIInferenceRunner(),
new ToolRegistryFunctionCallRunner(tools),
);
agent.join(new AgenticEnvironment());You now own input, inference, and tool execution end-to-end while keeping the same Participant contract — and any other participant in the environment can still observe and react to everything the agent emits.