Mozaik

Generators

Swap how input is produced, how the model is called, and how tools are resolved — without changing the participant or the environment.

BaseAgentParticipant and BaseHumanParticipant are deliberately thin shells around three generator interfaces. Swap any of them to change how items are produced without touching the environment, the participants, or any consumers.

InterfaceUsed byYields
InputItemSourceBaseHumanParticipant, BaseAgentParticipantUserMessageItem, DeveloperMessageItem, SystemMessageItem
InferenceRunnerBaseAgentParticipantReasoningItem, FunctionCallItem, ModelMessageItem
FunctionCallRunnerBaseAgentParticipantFunctionCallOutputItem

All three are AsyncIterable-shaped: they yield items as they become available, and the participant streams those items into the environment one-by-one.

Custom InputItemSource

Anything that can produce client items over time can be an InputItemSource. Here is a pushable in-memory queue:

import {
  InputItemSource,
  UserMessageItem,
  DeveloperMessageItem,
  SystemMessageItem,
} from '@mozaik-ai/core';

type InputItem = UserMessageItem | DeveloperMessageItem | SystemMessageItem;

export class QueueInputSource implements InputItemSource {
  private readonly queue: InputItem[] = [];
  private resolveNext?: () => void;

  push(item: InputItem) {
    this.queue.push(item);
    this.resolveNext?.();
    this.resolveNext = undefined;
  }

  async *stream(signal?: AbortSignal): AsyncIterable<InputItem> {
    while (!signal?.aborted) {
      while (this.queue.length > 0) {
        yield this.queue.shift()!;
      }
      await new Promise<void>((resolve) => (this.resolveNext = resolve));
    }
  }
}

Use the same shape for stdin, websockets, an HTTP queue, or anything that produces user/developer/system messages over time.

Custom InferenceRunner

Wrap any model runtime — including OpenAIResponses — and decide how its output becomes a stream of items. Here we expand a single InferenceResponse into per-item delivery:

import {
  InferenceRunner,
  InferenceRequest,
  ModelContext,
  GenerativeModel,
  OpenAIResponses,
  ReasoningItem,
  FunctionCallItem,
  ModelMessageItem,
} from '@mozaik-ai/core';

type InferenceItem = ReasoningItem | FunctionCallItem | ModelMessageItem;

export class OpenAIInferenceRunner implements InferenceRunner {
  private readonly runtime = new OpenAIResponses();

  async *run(
    context: ModelContext,
    model: GenerativeModel,
    signal?: AbortSignal,
  ): AsyncIterable<InferenceItem> {
    const response = await this.runtime.infer(new InferenceRequest(model, context));
    for (const item of response.contextItems) {
      yield item as InferenceItem;
    }
  }
}

Replace the body with a streaming runtime and items will flow into the environment as soon as the model produces them.

Custom FunctionCallRunner

Resolve a FunctionCallItem against a tool registry and yield its output:

import {
  FunctionCallRunner,
  FunctionCallItem,
  FunctionCallOutputItem,
  Tool,
} from '@mozaik-ai/core';

export class ToolRegistryFunctionCallRunner implements FunctionCallRunner {
  constructor(private readonly tools: Tool[]) {}

  async *run(
    call: FunctionCallItem,
    signal?: AbortSignal,
  ): AsyncIterable<FunctionCallOutputItem> {
    const tool = this.tools.find((t) => t.name === call.name);
    if (!tool) throw new Error(`Unknown tool: ${call.name}`);

    const result = await tool.invoke(JSON.parse(call.args));
    yield FunctionCallOutputItem.create(call.callId, JSON.stringify(result));
  }
}

If you need fan-out (e.g. partial outputs, progress events), yield as many items as you like — they will all stream into the environment in order.

Wiring it together

Drop the three generators into BaseAgentParticipant and join the environment:

import { BaseAgentParticipant, AgenticEnvironment } from '@mozaik-ai/core';

const agent = new BaseAgentParticipant(
  new QueueInputSource(),
  new OpenAIInferenceRunner(),
  new ToolRegistryFunctionCallRunner(tools),
);

agent.join(new AgenticEnvironment());

You now own input, inference, and tool execution end-to-end while keeping the same Participant contract — and any other participant in the environment can still observe and react to everything the agent emits.

On this page