Reference ยท Integrations
Peer-Consult & TaskFlow
rfml-moe-hub is designed to slot into structures you already run. A dispatch is a single TaskFlow node, and Peer-Consult plugs in on both ends - as an expert that participates in arbitration, and as the escalation path when consensus is weak.
TaskFlow: orchestration as a node
To TaskFlow, the hub is just another step. Wrap hub.dispatch() in a @node and the entire MoE pass - route, gate, fan-out, arbitrate - becomes a single unit in your DAG, with retries, fan-out, and failure handling supplied by TaskFlow as usual.
# TaskFlow node: the hub is a drop-in step in an existing DAG.from taskflow import node, Flowfrom moe_hub import Hub@node(inputs=["spec"], outputs=["patch", "trace"])def synthesize(spec, *, hub: Hub): # the TaskFlow run id rides along as the dispatch trace, so the # whole MoE pass shows up inline in your existing TaskFlow trace view. result = hub.dispatch( task=spec.summary, context={"files": spec.files}, trace=spec.run_id, ) return {"patch": result.consensus, "trace": result.trace.as_json()}flow = ( Flow("ship-patch") .step(plan) # existing TaskFlow steps .step(synthesize) # <- MoE orchestration as one node .step(run_tests) .on_fail(synthesize, retry=2))Traces line up
run_id as the dispatch trace and the hub threads it through every stage. The MoE decision record nests inside your existing TaskFlow trace - no separate observability stack to wire up.Peer-Consult: two integration points
Peer-Consult connects to the hub in two complementary ways. They are independent - use either or both.
As an escalation target
When arbitration returns weak support, the hub can open a Peer-Consult round, attach the competing candidates and the dissenting experts' rationale, and fold the peer verdict back in as one more weighted proposal. The MoE layer handles the easy cases cheaply and reserves the peer mesh for genuine disagreement.
# Peer-Consult: route weak consensus out to a peer mesh, then fold the# peer's verdict back in as one more weighted proposal.from moe_hub import Hub, Proposalfrom peer_consult import Sessiondef dispatch_with_escalation(hub: Hub, task, peers: Session): decision = hub.dispatch(task) if decision.support >= 0.85: return decision # confident: commit as-is # weak consensus -> open a Peer-Consult round with the dissent attached verdict = peers.consult( question=task, candidates=decision.candidates, # the competing answers rationale=decision.dissenting, ) # the peer verdict re-enters arbitration as a high-weight proposal return hub.arbiter.arbitrate( decision.proposals + [Proposal.from_peer(verdict, route_weight=1.0)] )As an expert
Because an Expert is just an adapter that returns a Proposal, an entire Peer-Consult session can register as a single expert. The router scores it like any other, the gate may or may not engage it, and its verdict competes in arbitration on equal, weighted terms.
# An Expert is just an adapter around anything that returns a Proposal -# a local model, a remote agent, or an entire Peer-Consult sub-session.from moe_hub import Expert, Proposalclass PeerConsultExpert(Expert): id = "peer.consult" capabilities = ["review", "adjudicate"] def __init__(self, session): self._session = session async def run(self, dispatch) -> Proposal: verdict = await self._session.consult_async(dispatch.context) return Proposal( expert_id=self.id, payload=verdict.answer, confidence=verdict.agreement, # peer agreement as confidence route_weight=dispatch.weight, cost=verdict.elapsed_ms, digest=Proposal.hash(verdict.answer), )What stays the hub's job
- Routing & sparsity. The hub decides which experts run. TaskFlow decides what happens around the dispatch; Peer-Consult is one expert or one escalation among many.
- One trace, one decision record. Every dispatch produces a single replayable audit trail, regardless of how many TaskFlow steps or Peer-Consult rounds it touched.
- No lock-in on transport. In-process, gRPC to the Go data plane, or across a Peer-Consult mesh - the dispatch envelope is identical.