Mozaik

Participants

The single abstraction for humans, agents, observers, and tools. Capabilities, BaseHumanParticipant, BaseAgentParticipant, and reaction patterns.

In Mozaik, everything that produces or consumes context items is a Participant: humans, agents, transcript loggers, guardrails, UI streams. They all share one abstraction and one intercept point.

import { Participant, ContextItem } from '@mozaik-ai/core';

class MyParticipant extends Participant {
  async onContextItem(source: Participant, item: ContextItem): Promise<void> {
    // observe, react, or ignore
  }
}

onContextItem(source, item) is the single intercept point. A participant can:

  • Observe items from other participants (telemetry, audit, UI streaming).
  • React to items by triggering its own capabilities (turn a UserMessageItem into an inference run, turn a FunctionCallItem into a tool execution).
  • Ignore items it does not care about — it is just a method call.

Capabilities

Capabilities describe what a participant can produce. Each capability is a non-blocking method that streams typed items into the environment.

CapabilityMethodYields
InputCapablestreamInput(environment)UserMessageItem, DeveloperMessageItem, SystemMessageItem
InferenceCapablerunInference(environment, context, model)ReasoningItem, FunctionCallItem, ModelMessageItem
FunctionCallCapableexecuteFunctionCall(environment, call)FunctionCallOutputItem

Every capability method returns Promise<void> and pumps its underlying generator into the environment item-by-item. Callers never await a capability for its result — they await only if they care about completion. This is what allows multiple participants to act on the same environment concurrently.

BaseHumanParticipant

A ready-to-use participant for human input. Wraps an InputItemSource.

import { BaseHumanParticipant } from '@mozaik-ai/core';

const human = new BaseHumanParticipant(humanInputSource);
human.join(environment);
human.streamInput(environment);
CapabilityYields
InputCapableUserMessageItem, DeveloperMessageItem, SystemMessageItem

Use it for stdin, websockets, an HTTP queue, or anything that produces user/developer/system messages over time.

BaseAgentParticipant

A ready-to-use agent participant. Wraps three generator interfaces — input, inference, and tool execution — and exposes one capability method per concern.

import { BaseAgentParticipant } from '@mozaik-ai/core';

const agent = new BaseAgentParticipant(
  agentInputSource,
  inferenceRunner,
  functionCallRunner,
);

agent.join(environment);
agent.runInference(environment, context, model);
CapabilityMethodYields
InputCapablestreamInput(environment)client items via InputItemSource
InferenceCapablerunInference(environment, context, model)model items via InferenceRunner
FunctionCallCapableexecuteFunctionCall(environment, call)FunctionCallOutputItem via FunctionCallRunner

Customize what the agent does by overriding onContextItem (decide when to call inference, tools, etc.). Customize how it does it by swapping the generators.

Patterns

Passive observer

Subclass Participant and just consume items. No capabilities required.

import { Participant, ContextItem } from '@mozaik-ai/core';

export class TranscriptLogger extends Participant {
  async onContextItem(source: Participant, item: ContextItem): Promise<void> {
    console.log(`[${source.constructor.name}]`, item.toJSON());
  }
}

Drop one of these into any environment and you have a live transcript. Add another that pushes to a websocket and you have a UI stream. None of the existing participants change.

Reactive agent

A reactive agent extends BaseAgentParticipant and uses incoming items from other participants to decide when to run inference or execute a tool call.

import {
  BaseAgentParticipant,
  Participant,
  ContextItem,
  UserMessageItem,
  FunctionCallItem,
  AgenticEnvironment,
  ModelContext,
  GenerativeModel,
  InputItemSource,
  InferenceRunner,
  FunctionCallRunner,
} from '@mozaik-ai/core';

export class ReactiveAgent extends BaseAgentParticipant {
  constructor(
    inputSource: InputItemSource,
    inferenceRunner: InferenceRunner,
    functionCallRunner: FunctionCallRunner,
    private readonly environment: AgenticEnvironment,
    private readonly context: ModelContext,
    private readonly model: GenerativeModel,
  ) {
    super(inputSource, inferenceRunner, functionCallRunner);
  }

  async onContextItem(source: Participant, item: ContextItem): Promise<void> {
    if (source === this) return;

    this.context.addContextItem(item);

    if (item instanceof UserMessageItem) {
      this.runInference(this.environment, this.context, this.model);
      return;
    }

    if (item instanceof FunctionCallItem) {
      this.executeFunctionCall(this.environment, item);
      return;
    }
  }
}

Two things to note:

  1. The agent never awaits its own capability calls inside onContextItem. The methods are non-blocking, so the environment keeps delivering items while inference and tool execution run in the background.
  2. Behaviors compose by reaction, not orchestration. Add a second agent that listens for ModelMessageItems and you get a critique loop. Add a TranscriptLogger and you get a UI stream. Neither change touches the existing participants.

Discriminating items

Items are discriminated by the ContextItem subclass and, for messages, the role field:

  • Client-produced: UserMessageItem, DeveloperMessageItem, SystemMessageItem, FunctionCallOutputItem
  • Model-produced: ModelMessageItem, FunctionCallItem, ReasoningItem
async onContextItem(source: Participant, item: ContextItem): Promise<void> {
  if (item instanceof FunctionCallItem) {
    // model asked for a tool
  } else if (item instanceof ModelMessageItem) {
    // model produced a final message
  }
}

On this page