Mozaik

Agentic environment

A broadcast bus for typed context items. No central scheduler — participants react to whatever flows through.

AgenticEnvironment is the heart of Mozaik. It is a broadcast bus for typed ContextItems, not a pipeline. Participants join() it, capability methods stream items into it, and every joined participant sees every item via its onContextItem(source, item) callback.

There is no central scheduler. The environment does not decide who runs next, who is allowed to speak, or in what order. Behavior emerges from the participants' reactions to delivered items.

flowchart LR
  Human[BaseHumanParticipant] -->|streamInput| Env(("AgenticEnvironment"))
  Agent[BaseAgentParticipant] -->|"runInference / executeFunctionCall"| Env
  Observer[Custom Participant] -->|join| Env
  Env -->|onContextItem| Human
  Env -->|onContextItem| Agent
  Env -->|onContextItem| Observer

Lifecycle

import { AgenticEnvironment } from '@mozaik-ai/core';

const environment = new AgenticEnvironment();

human.join(environment);
agent.join(environment);

environment.start();

// ...participants stream items...

environment.stop();
CallEffect
new AgenticEnvironment()Creates an empty environment with no participants.
participant.join(environment)Registers the participant. From now on it will receive every item via onContextItem.
environment.start()Opens the bus for delivery. Capability calls before this point are typically buffered or no-ops, depending on the participant.
environment.deliverContextItem(initiator, item)Fans the item out to every joined participant's onContextItem. Usually called for you by deliverStream while a capability is running.
environment.stop()Shuts the bus down.

Fan-out semantics

When a participant produces an item, the environment dispatches onContextItem(source, item) on every joined participant synchronously and without awaiting them. Two practical consequences:

  • A slow listener cannot block producers. Even if a logger or a downstream agent is busy, the environment keeps fanning out items.
  • Multiple capabilities can produce items into the same environment concurrently. Each capability call is a fresh promise wrapping an async iterable, and items are streamed in via deliverStream one at a time.
// Both participants produce items in parallel — neither awaits the other.
human.streamInput(environment);
agent.runInference(environment, context, model);

How items get into the environment

Capability methods on participants do not return their items — they stream them. Internally, every capability is wrapped with the deliverStream helper, which pulls items from an AsyncIterable and forwards each one to the environment:

export async function deliverStream<T extends ContextItem>(
  environment: AgenticEnvironment,
  initiator: Participant,
  stream: AsyncIterable<T>,
): Promise<void> {
  for await (const item of stream) {
    await environment.deliverContextItem(initiator, item);
  }
}

This is what makes the system non-blocking: as soon as the underlying generator yields, the item is on the bus and every other participant sees it.

What flows on the bus

Every item dispatched via onContextItem is a typed ContextItem:

  • Client-producedUserMessageItem, DeveloperMessageItem, SystemMessageItem, FunctionCallOutputItem
  • Model-producedModelMessageItem, FunctionCallItem, ReasoningItem

Participants discriminate between them with instanceof (and, for messages, the role field) inside onContextItem.

What the environment is not

  • It is not an FSM. There are no states, no transitions, no before / after hooks. The only intercept point is onContextItem.
  • It is not an orchestrator. It does not decide whose turn it is. If you want a turn-taking convention, encode it in the participants' reactions.
  • It is not a queue. Items are delivered as they are produced; backpressure, if any, must be implemented inside a participant.

On this page