Skip to content

Core API Reference

This page contains the auto-generated API documentation for the core ArbiterOS module.

The main components are implemented in arbiteros_alpha.core and re-exported from the package root for convenience.

ArbiterOSAlpha

ArbiterOSAlpha(backend='langgraph')

Main ArbiterOS coordinator for policy-driven LangGraph execution.

ArbiterOSAlpha provides a lightweight governance layer on top of LangGraph, enabling policy-based validation and dynamic routing without modifying the underlying graph structure.

Attributes:

Name Type Description
backend

The execution backend in use.

history History

List of execution history entries with timestamps and I/O.

policy_checkers list[PolicyChecker]

List of PolicyChecker instances for validation.

policy_routers list[PolicyRouter]

List of PolicyRouter instances for dynamic routing.

Example
os = ArbiterOSAlpha(backend="langgraph")
os.add_policy_checker(
    HistoryPolicyChecker("require_verification", ["generate", "execute"])
)
os.add_policy_router(ConfidencePolicyRouter("confidence", 0.5, "retry"))

@os.instruction("generate")
def generate(state):
    return {"result": "output"}

Initialize the ArbiterOSAlpha instance.

Parameters:

Name Type Description Default
backend Literal['langgraph', 'native', 'vanilla']

The execution backend to use. - "langgraph": Use an agent based on the LangGraph framework. - "native": Use the framework-less ('from scratch') agent implementation. - "vanilla": (Deprecated) Alias for "native".

'langgraph'
Source code in arbiteros_alpha/core.py
def __init__(
    self, backend: Literal["langgraph", "native", "vanilla"] = "langgraph"
):
    """Initialize the ArbiterOSAlpha instance.

    Args:
        backend: The execution backend to use.
            - "langgraph": Use an agent based on the LangGraph framework.
            - "native": Use the framework-less ('from scratch') agent implementation.
            - "vanilla": (Deprecated) Alias for "native".
    """
    if backend == "vanilla":
        import warnings

        warnings.warn(
            "The 'vanilla' backend is deprecated and will be removed in a future version. "
            "Please use 'native' instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        self.backend = "native"
    else:
        self.backend = backend

    self.history: History = History()
    self.policy_checkers: list[PolicyChecker] = []
    self.policy_routers: list[PolicyRouter] = []
    self.evaluators: list[NodeEvaluator] = []
    self._in_rollout: bool = False

    if self.backend == "langgraph":
        self._patch_pregel_loop()

    self.langfuse = Langfuse()
    self.span = None

add_policy_checker(checker)

Register a policy checker for validation.

Parameters:

Name Type Description Default
checker PolicyChecker

A PolicyChecker instance to validate execution constraints.

required
Source code in arbiteros_alpha/core.py
def add_policy_checker(self, checker: PolicyChecker) -> None:
    """Register a policy checker for validation.

    Args:
        checker: A PolicyChecker instance to validate execution constraints.
    """
    logger.debug(f"Adding policy checker: {checker}")
    self.policy_checkers.append(checker)

add_policy_router(router)

Register a policy router for dynamic flow control.

Policy routers are only supported when using the "langgraph" backend.

Parameters:

Name Type Description Default
router PolicyRouter

A PolicyRouter instance to dynamically route execution.

required

Raises:

Type Description
RuntimeError

If the backend is not "langgraph".

Source code in arbiteros_alpha/core.py
def add_policy_router(self, router: PolicyRouter) -> None:
    """Register a policy router for dynamic flow control.

    Policy routers are only supported when using the "langgraph" backend.

    Args:
        router: A PolicyRouter instance to dynamically route execution.

    Raises:
        RuntimeError: If the backend is not "langgraph".
    """
    if self.backend != "langgraph":
        raise RuntimeError(
            "Policy routers are only supported with the 'langgraph' backend."
        )
    logger.debug(f"Adding policy router: {router}")
    self.policy_routers.append(router)

add_evaluator(evaluator)

Register a node evaluator for quality assessment.

Evaluators assess node execution quality after completion. Unlike policy checkers, they do not block execution but provide feedback and scores for monitoring, RL training, or self-improvement.

Parameters:

Name Type Description Default
evaluator NodeEvaluator

A NodeEvaluator instance to assess execution quality.

required
Source code in arbiteros_alpha/core.py
def add_evaluator(self, evaluator: NodeEvaluator) -> None:
    """Register a node evaluator for quality assessment.

    Evaluators assess node execution quality after completion. Unlike
    policy checkers, they do not block execution but provide feedback
    and scores for monitoring, RL training, or self-improvement.

    Args:
        evaluator: A NodeEvaluator instance to assess execution quality.
    """
    logger.debug(f"Adding evaluator: {evaluator}")
    self.evaluators.append(evaluator)

_check_before()

Execute all policy checkers before instruction execution.

Returns:

Type Description
dict[str, bool]

A dictionary mapping checker names to their validation results.

bool

A final boolean indicating if all checkers passed.

Source code in arbiteros_alpha/core.py
def _check_before(self) -> tuple[dict[str, bool], bool]:
    """Execute all policy checkers before instruction execution.

    Returns:
        A dictionary mapping checker names to their validation results.
        A final boolean indicating if all checkers passed.
    """
    results = {}
    logger.debug(f"Running {len(self.policy_checkers)} policy checkers (before)")
    for checker in self.policy_checkers:
        result = checker.check_before(self.history)

        if result is False:
            results[checker.name] = result
            logger.error(f"Policy checker {checker} failed validation.")

    return results, all(results.values())

_route_after()

Determine if execution should be routed to a different node.

Consults all registered policy routers in order. Returns the first non-None routing decision.

Returns:

Type Description
dict[str, str | None]

A dictionary mapping checker names to their route destination.

str | None

A final str indicating the final route destination.

Source code in arbiteros_alpha/core.py
def _route_after(self) -> tuple[dict[str, str | None], str | None]:
    """Determine if execution should be routed to a different node.

    Consults all registered policy routers in order. Returns the first
    non-None routing decision.

    Returns:
        A dictionary mapping checker names to their route destination.
        A final str indicating the final route destination.
    """
    results = {}
    destination = None
    used_router = None
    logger.debug(f"Checking {len(self.policy_routers)} policy routers")
    for router in self.policy_routers:
        decision = router.route_after(self.history)

        if decision:
            results[router.name] = decision
            used_router = router
            destination = decision

    decision_count = sum(1 for v in results.values() if v is not None)
    if decision_count > 1:
        logger.error(
            "Multiple routers decided to route. Fallback to first decision."
        )

    if destination is not None:
        logger.warning(f"Router {used_router} decision made to: {destination}")
    return results, destination

_evaluate_node()

Execute all evaluators on the most recent node.

Evaluators assess the quality of the node execution that just completed. The node's HistoryItem (including output_state) has already been added to history and can be accessed via history.entries[-1][-1].

Only evaluators whose target_instructions match the current node's instruction type will be executed. If target_instructions is None, the evaluator runs on all nodes.

Returns:

Type Description
dict[str, EvaluationResult]

A dictionary mapping evaluator names to their evaluation results.

Note

Evaluator failures are logged but do not raise exceptions or interrupt execution. This ensures evaluation does not break the workflow.

Source code in arbiteros_alpha/core.py
def _evaluate_node(self) -> dict[str, EvaluationResult]:
    """Execute all evaluators on the most recent node.

    Evaluators assess the quality of the node execution that just completed.
    The node's HistoryItem (including output_state) has already been added
    to history and can be accessed via `history.entries[-1][-1]`.

    Only evaluators whose target_instructions match the current node's
    instruction type will be executed. If target_instructions is None,
    the evaluator runs on all nodes.

    Returns:
        A dictionary mapping evaluator names to their evaluation results.

    Note:
        Evaluator failures are logged but do not raise exceptions or
        interrupt execution. This ensures evaluation does not break
        the workflow.
    """
    results = {}
    current_item = self.history.entries[-1][-1]
    current_instruction = current_item.instruction

    logger.debug(f"Running evaluators for instruction: {current_instruction.name}")

    for evaluator in self.evaluators:
        # Check if this evaluator should run for this instruction type
        if evaluator.target_instructions is not None:
            if current_instruction not in evaluator.target_instructions:
                logger.debug(
                    f"Skipping evaluator {evaluator.name} "
                    f"(not targeting {current_instruction.name})"
                )
                continue

        try:
            result = evaluator.evaluate(self.history)
            results[evaluator.name] = result
            if result.passed:
                logger.debug(
                    f"Evaluator {evaluator.name}: score={result.score:.2f}, "
                    f"passed={result.passed}, feedback={result.feedback}"
                )
            else:
                logger.error(
                    f"Evaluator {evaluator.name}: score={result.score:.2f}, "
                    f"passed={result.passed}, feedback={result.feedback}"
                )
        except Exception as e:
            logger.error(
                f"Evaluator {evaluator.name} failed with error: {e}",
                exc_info=True,
            )
            # Evaluation failures should not interrupt execution
    return results

instruction(instruction_type, input_schema=None, output_schema=None)

Decorator to wrap LangGraph node functions with policy governance.

This decorator adds policy validation, execution history tracking, and dynamic routing to LangGraph node functions. It's the core integration point between ArbiterOS and LangGraph.

Parameters:

Name Type Description Default
instruction_type InstructionType

An instruction type from one of the Core enums (CognitiveCore, MemoryCore, ExecutionCore, NormativeCore, MetacognitiveCore, AdaptiveCore, SocialCore, or AffectiveCore).

required
input_schema type[BaseModel] | None

Optional Pydantic model to validate the input state. If provided, the input state will be validated against this schema before execution.

None
output_schema type[BaseModel] | None

Optional Pydantic model to validate the output state. If provided, the result will be validated against this schema after execution.

None

Returns:

Type Description
Callable[[Callable], Callable]

A decorator function that wraps the target node function.

Example
from arbiteros_alpha.instructions import CognitiveCore

@os.instruction(CognitiveCore.GENERATE)
def generate(state: State) -> State:
    return {"field": "value"}
# Function now includes policy checks and history tracking
Source code in arbiteros_alpha/core.py
def instruction(
    self,
    instruction_type: InstructionType,
    input_schema: type[BaseModel] | None = None,
    output_schema: type[BaseModel] | None = None,
) -> Callable[[Callable], Callable]:
    """Decorator to wrap LangGraph node functions with policy governance.

    This decorator adds policy validation, execution history tracking,
    and dynamic routing to LangGraph node functions. It's the core
    integration point between ArbiterOS and LangGraph.

    Args:
        instruction_type: An instruction type from one of the Core enums
            (CognitiveCore, MemoryCore, ExecutionCore, NormativeCore,
            MetacognitiveCore, AdaptiveCore, SocialCore, or AffectiveCore).
        input_schema: Optional Pydantic model to validate the input state.
            If provided, the input state will be validated against this schema
            before execution.
        output_schema: Optional Pydantic model to validate the output state.
            If provided, the result will be validated against this schema
            after execution.

    Returns:
        A decorator function that wraps the target node function.

    Example:
        ```python
        from arbiteros_alpha.instructions import CognitiveCore

        @os.instruction(CognitiveCore.GENERATE)
        def generate(state: State) -> State:
            return {"field": "value"}
        # Function now includes policy checks and history tracking
        ```
    """
    # Validate that instruction_type is a valid InstructionType enum
    if not isinstance(instruction_type, InstructionType.__args__):
        raise TypeError(
            f"instruction_type must be an instance of one of the Core enums, got {type(instruction_type)}"
        )

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            if not self._in_rollout or self.span is None:
                raise RuntimeError(
                    "Instructions must be executed within a @arbiter_os.rollout context."
                )

            # Capture input state from arguments
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()
            input_state: Mapping = bound_args.arguments

            # Capture output fields from output_schema
            if output_schema is not None:
                output_fields: list[str] = list(output_schema.model_fields.keys())

            if self.backend == "langgraph":
                # For langgraph backend, the first argument is the state
                input_state: Mapping | BaseModel = next(iter(input_state.values()))

            # input_schema validation
            if input_schema is not None:
                try:
                    input_schema.model_validate(input_state)
                except ValidationError as ve:
                    logger.error(
                        f"Input validation failed for {func.__name__}: {ve}",  # type: ignore[attr-defined]
                        exc_info=True,
                    )

            if isinstance(input_state, BaseModel):
                input_state: Mapping = input_state.model_dump()

            history_item = HistoryItem(
                timestamp=datetime.datetime.now(),
                instruction=instruction_type,
                input_state=input_state,
            )

            if self.backend == "native":
                self.history.enter_next_superstep([instruction_type.name])

            self.history.add_entry(history_item)

            # langfuse record
            observation_type = self._get_observation_type(instruction_type)

            with self.span.start_as_current_observation(
                as_type=observation_type,
                name=func.__name__,  # type: ignore[attr-defined]
            ) as generation:
                logger.info(
                    f"instruction: {instruction_type.__class__.__name__}.{instruction_type.name} started with input: {input_state}"
                )
                history_item.check_policy_results, all_passed = self._check_before()

                result = func(*args, **kwargs)

                logger.info(
                    f"instruction: {instruction_type.__class__.__name__}.{instruction_type.name} returned output: {result}"
                )

                if result is None:
                    return

                # Convert result to output_state based on return type
                if isinstance(result, dict):
                    output_state: Mapping = result
                elif isinstance(result, BaseModel):
                    output_state: Mapping = result.model_dump()
                else:
                    # Convert single value or tuple to tuple
                    result_tuple = (
                        (result,) if not isinstance(result, tuple) else result
                    )

                    # Map tuple to dict using schema fields or enumeration
                    if output_schema is not None:
                        output_state: Mapping = dict(
                            zip(output_fields, result_tuple)
                        )
                    else:
                        output_state = dict(enumerate(result_tuple))
                        logger.warning(
                            f"Function {func.__name__} returned a non-dict/non-BaseModel value without output_schema. "  # type: ignore[attr-defined]
                            f"Using numeric indices as keys in history: {list(output_state.keys())}. "
                            f"Consider adding output_schema or returning a dict for better readability."
                        )

                # Validate output if schema is provided
                if output_schema is not None:
                    try:
                        output_schema.model_validate(output_state)
                    except ValidationError as ve:
                        logger.error(
                            f"Output validation failed for {func.__name__}: {ve}",  # type: ignore[attr-defined]
                            exc_info=True,
                        )

                history_item.output_state = output_state

                # Evaluate node execution quality
                if self.evaluators:
                    history_item.evaluation_results = self._evaluate_node()

                if self.backend == "langgraph":
                    history_item.route_policy_results, destination = (
                        self._route_after()
                    )
                metadata = (
                    history_item.check_policy_results
                    | getattr(history_item, "evaluation_results", {})
                    | history_item.route_policy_results
                )

                generation.update(
                    input=input_state, output=result, metadata=metadata
                )

            if self.backend == "langgraph" and destination:
                from langgraph.types import Command

                return Command(update=result, goto=destination)

            return result

        return wrapper

    return decorator

History

History()

Initialize an empty execution history.

Source code in arbiteros_alpha/history.py
def __init__(self) -> None:
    """Initialize an empty execution history."""
    self.entries: list[SuperStep] = []
    self.next_superstep: list[str] = []

enter_next_superstep(nodes)

Source code in arbiteros_alpha/history.py
def enter_next_superstep(self, nodes: list[str]) -> None:
    if "__start__" in nodes or "__end__" in nodes:
        return
    logger.debug(f"Entering next superstep with nodes: {nodes}")
    self.next_superstep = nodes
    self.entries.append([])

add_entry(entry)

Source code in arbiteros_alpha/history.py
def add_entry(self, entry: HistoryItem) -> None:
    if not self.entries or len(self.entries[-1]) >= len(self.next_superstep):
        raise RuntimeError(
            "All nodes for the current superstep have already recorded entries.\n"
            "Hint: Did you forget to call \n"
            "    - register_compiled_graph() for langgraph backend or \n"
            "    - enter_next_superstep() for native backend?"
        )
    self.entries[-1].append(entry)

pprint()

Source code in arbiteros_alpha/history.py
def pprint(self) -> None:
    import yaml
    from rich.console import Console

    console = Console()
    console.print("\n[bold cyan]📋 Arbiter OS Execution History[/bold cyan]")
    console.print("=" * 80)

    for superstep_idx, superstep in enumerate(self.entries, 1):
        console.print(
            f"\n[bold magenta]╔═══ SuperStep {superstep_idx} ═══╗[/bold magenta]"
        )

        for entry_idx, entry in enumerate(superstep, 1):
            # Format policy results
            check_results = entry.check_policy_results
            route_results = entry.route_policy_results

            # Header with instruction name
            console.print(
                f"\n[bold cyan]  [{superstep_idx}.{entry_idx}] {entry.instruction.name}[/bold cyan]"
            )
            console.print(f"[dim]    Timestamp: {entry.timestamp}[/dim]")

            # Format input state as YAML
            console.print("    [yellow]Input:[/yellow]")
            input_yaml = yaml.dump(
                entry.input_state, default_flow_style=False, sort_keys=False
            )
            for line in input_yaml.strip().split("\n"):
                console.print(f"      [dim]{line}[/dim]")

            # Format output state as YAML
            console.print("    [yellow]Output:[/yellow]")
            output_yaml = yaml.dump(
                entry.output_state, default_flow_style=False, sort_keys=False
            )
            for line in output_yaml.strip().split("\n"):
                console.print(f"      [dim]{line}[/dim]")

            # Show detailed policy check results
            console.print("    [yellow]Policy Checks:[/yellow]")
            if check_results:
                for policy_name, result in check_results.items():
                    status = "[green]✓[/green]" if result else "[red]✗[/red]"
                    console.print(f"      {status} {policy_name}")
            else:
                console.print("      [dim](none)[/dim]")

            # Show detailed policy route results
            console.print("    [yellow]Policy Routes:[/yellow]")
            if route_results:
                for policy_name, destination in route_results.items():
                    if destination:
                        console.print(
                            f"      [magenta]→[/magenta] {policy_name} [bold magenta]⇒ {destination}[/bold magenta]"
                        )
                    else:
                        console.print(f"      [dim]— {policy_name}[/dim]")
            else:
                console.print("      [dim](none)[/dim]")

            # Show evaluation results
            console.print("    [yellow]Evaluations:[/yellow]")
            eval_results = entry.evaluation_results
            if eval_results:
                for eval_name, eval_result in eval_results.items():
                    status = (
                        "[green]✓[/green]" if eval_result.passed else "[red]✗[/red]"
                    )
                    console.print(
                        f"      {status} {eval_name}: "
                        f"[cyan]score={eval_result.score:.2f}[/cyan] - {eval_result.feedback}"
                    )
            else:
                console.print("      [dim](none)[/dim]")

        console.print(
            f"[bold magenta]╚{'═' * (len(f'SuperStep {superstep_idx}') + 9)}╝[/bold magenta]"
        )

    console.print("\n" + "=" * 80 + "\n")

HistoryItem

HistoryItem(timestamp, instruction, input_state, output_state=dict(), check_policy_results=dict(), route_policy_results=dict(), evaluation_results=dict()) dataclass

The minimal OS metadata for tracking instruction execution.

Attributes:

Name Type Description
timestamp datetime

When the instruction was executed.

instruction InstructionType

The instruction type that was executed.

input_state dict[str, Any]

The state passed to the instruction.

output_state Any

The state returned by the instruction.

check_policy_results dict[str, bool]

Results of policy checkers (name -> passed/failed).

route_policy_results dict[str, str | None]

Results of policy routers (name -> target or None).

evaluation_results dict[str, EvaluationResult]

Results of node evaluators (name -> EvaluationResult).