Participants
The single abstraction for humans, agents, observers, and tools. Capabilities, BaseHumanParticipant, BaseAgentParticipant, and reaction patterns.
In Mozaik, everything that produces or consumes context items is a Participant: humans, agents, transcript loggers, guardrails, UI streams. They all share one abstraction and one intercept point.
import { Participant, ContextItem } from '@mozaik-ai/core';
class MyParticipant extends Participant {
async onContextItem(source: Participant, item: ContextItem): Promise<void> {
// observe, react, or ignore
}
}onContextItem(source, item) is the single intercept point. A participant can:
- Observe items from other participants (telemetry, audit, UI streaming).
- React to items by triggering its own capabilities (turn a
UserMessageIteminto an inference run, turn aFunctionCallIteminto a tool execution). - Ignore items it does not care about — it is just a method call.
Capabilities
Capabilities describe what a participant can produce. Each capability is a non-blocking method that streams typed items into the environment.
| Capability | Method | Yields |
|---|---|---|
InputCapable | streamInput(environment) | UserMessageItem, DeveloperMessageItem, SystemMessageItem |
InferenceCapable | runInference(environment, context, model) | ReasoningItem, FunctionCallItem, ModelMessageItem |
FunctionCallCapable | executeFunctionCall(environment, call) | FunctionCallOutputItem |
Every capability method returns Promise<void> and pumps its underlying generator into the environment item-by-item. Callers never await a capability for its result — they await only if they care about completion. This is what allows multiple participants to act on the same environment concurrently.
BaseHumanParticipant
A ready-to-use participant for human input. Wraps an InputItemSource.
import { BaseHumanParticipant } from '@mozaik-ai/core';
const human = new BaseHumanParticipant(humanInputSource);
human.join(environment);
human.streamInput(environment);| Capability | Yields |
|---|---|
InputCapable | UserMessageItem, DeveloperMessageItem, SystemMessageItem |
Use it for stdin, websockets, an HTTP queue, or anything that produces user/developer/system messages over time.
BaseAgentParticipant
A ready-to-use agent participant. Wraps three generator interfaces — input, inference, and tool execution — and exposes one capability method per concern.
import { BaseAgentParticipant } from '@mozaik-ai/core';
const agent = new BaseAgentParticipant(
agentInputSource,
inferenceRunner,
functionCallRunner,
);
agent.join(environment);
agent.runInference(environment, context, model);| Capability | Method | Yields |
|---|---|---|
InputCapable | streamInput(environment) | client items via InputItemSource |
InferenceCapable | runInference(environment, context, model) | model items via InferenceRunner |
FunctionCallCapable | executeFunctionCall(environment, call) | FunctionCallOutputItem via FunctionCallRunner |
Customize what the agent does by overriding onContextItem (decide when to call inference, tools, etc.). Customize how it does it by swapping the generators.
Patterns
Passive observer
Subclass Participant and just consume items. No capabilities required.
import { Participant, ContextItem } from '@mozaik-ai/core';
export class TranscriptLogger extends Participant {
async onContextItem(source: Participant, item: ContextItem): Promise<void> {
console.log(`[${source.constructor.name}]`, item.toJSON());
}
}Drop one of these into any environment and you have a live transcript. Add another that pushes to a websocket and you have a UI stream. None of the existing participants change.
Reactive agent
A reactive agent extends BaseAgentParticipant and uses incoming items from other participants to decide when to run inference or execute a tool call.
import {
BaseAgentParticipant,
Participant,
ContextItem,
UserMessageItem,
FunctionCallItem,
AgenticEnvironment,
ModelContext,
GenerativeModel,
InputItemSource,
InferenceRunner,
FunctionCallRunner,
} from '@mozaik-ai/core';
export class ReactiveAgent extends BaseAgentParticipant {
constructor(
inputSource: InputItemSource,
inferenceRunner: InferenceRunner,
functionCallRunner: FunctionCallRunner,
private readonly environment: AgenticEnvironment,
private readonly context: ModelContext,
private readonly model: GenerativeModel,
) {
super(inputSource, inferenceRunner, functionCallRunner);
}
async onContextItem(source: Participant, item: ContextItem): Promise<void> {
if (source === this) return;
this.context.addContextItem(item);
if (item instanceof UserMessageItem) {
this.runInference(this.environment, this.context, this.model);
return;
}
if (item instanceof FunctionCallItem) {
this.executeFunctionCall(this.environment, item);
return;
}
}
}Two things to note:
- The agent never
awaits its own capability calls insideonContextItem. The methods are non-blocking, so the environment keeps delivering items while inference and tool execution run in the background. - Behaviors compose by reaction, not orchestration. Add a second agent that listens for
ModelMessageItems and you get a critique loop. Add aTranscriptLoggerand you get a UI stream. Neither change touches the existing participants.
Discriminating items
Items are discriminated by the ContextItem subclass and, for messages, the role field:
- Client-produced:
UserMessageItem,DeveloperMessageItem,SystemMessageItem,FunctionCallOutputItem - Model-produced:
ModelMessageItem,FunctionCallItem,ReasoningItem
async onContextItem(source: Participant, item: ContextItem): Promise<void> {
if (item instanceof FunctionCallItem) {
// model asked for a tool
} else if (item instanceof ModelMessageItem) {
// model produced a final message
}
}