mstar.graph.base

Contents

mstar.graph.base#

Classes

GraphEdge(next_node, name, tensor_info, ...)

GraphNode(name, input_names, outputs, ...)

GraphSection()

GraphStateRegistry(graph_section)

Loop(section, max_iters, outputs, name, ...)

LoopStateRegistry(loop)

NodeAndGraphWalk(node, graph_walk)

Pair of node name and graph walk, e.g., (LLM, decode) or (flow, image_gen).

NodeCompletionOutput(output_edges, ...)

NodeInputsOutputs(ext_inputs, ext_outputs, ...)

Parallel(sections)

ReadySignals(node_name, input_names, ...[, ...])

Readiness state for one 'slot' of a GraphNode.

Sequential(sections)

SpeculativeNodeInfo(node_name, is_new_loop_iter)

TensorPointerInfo(dims, dtype, nbytes, ...)

WorkerGraphStateRegistry(graph_section)

class mstar.graph.base.GraphEdge(next_node: str, name: str, tensor_info: list[mstar.graph.base.TensorPointerInfo] = <factory>, persist: bool = False, conductor_new_token: bool = False, is_streaming: bool = False, output_modality: str = '', _persist_for_loop: bool = False, _final_stream_chunk: bool = False, _total_fanin: int = 1, _shard_dim: int | None = None)[source]#

Bases: object

Parameters:
clone()[source]#
conductor_new_token: bool = False#
is_streaming: bool = False#
name: str#
next_node: str#
output_modality: str = ''#
persist: bool = False#
tensor_info: list[TensorPointerInfo]#
class mstar.graph.base.GraphNode(name: str, input_names: set[str], outputs: list[mstar.graph.base.GraphEdge], consumes_stream: bool = False, enable_async_scheduling: bool = True, _speculatively_scheduled: bool = False, _streaming_inputs: set[str] = <factory>, _managing_registry: 'GraphStateRegistry | None' = None)[source]#

Bases: GraphSection

Parameters:
clear()[source]#
complete()[source]#
Return type:

NodeCompletionOutput

consumes_stream: bool = False#
enable_async_scheduling: bool = True#
get_inputs_outputs()[source]#

Return the I/O signature of this section.

Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.

get_loops()[source]#

Flat map of all Loops in this section, keyed by name.

get_nodes()[source]#

Flat map of all GraphNodes in this section, keyed by name.

ingest_input(edge, can_buffer=True)[source]#

Try to ingest an arriving edge.

Returns True on success, False when the edge is rejected; e.g., this is a streaming edge that we are not redy for and needs to get re-queued

Parameters:
Return type:

bool

input_names: set[str]#
is_ready_for_speculation(check_next_iter=False, allow_streaming=True)[source]#
Parameters:
  • check_next_iter (bool)

  • allow_streaming (bool)

name: str#
outputs: list[GraphEdge]#
register_communication_info(communication_manager, request_id)[source]#
Parameters:

request_id (str)

reset_for_outer_iter()[source]#
reset_outputs()[source]#
class mstar.graph.base.GraphSection[source]#

Bases: ABC

abstractmethod get_inputs_outputs()[source]#

Return the I/O signature of this section.

Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.

Return type:

NodeInputsOutputs

abstractmethod get_loops()[source]#

Flat map of all Loops in this section, keyed by name.

Return type:

dict[str, Loop]

abstractmethod get_nodes()[source]#

Flat map of all GraphNodes in this section, keyed by name.

Return type:

dict[str, GraphNode]

class mstar.graph.base.GraphStateRegistry(graph_section)[source]#

Bases: ABC

Parameters:

graph_section (GraphSection)

clear()[source]#
managed_entities: dict[str, GraphNode | Loop]#
mark_entity_complete(entity_name)[source]#

Record that an entity has finished; no-ops if already done (safeguard only).

Parameters:

entity_name (str)

Return type:

NodeCompletionOutput

abstractmethod register_ingested_input(graph_edge)[source]#
Parameters:

graph_edge (GraphEdge)

reset_for_iter()[source]#
class mstar.graph.base.Loop(section: mstar.graph.base.GraphSection, max_iters: int, outputs: list[mstar.graph.base.GraphEdge], name: str = <factory>, accumulated_outputs: list[mstar.graph.base.GraphEdge] = <factory>, curr_iter: int = 0, is_done: bool = False, _finish_signal: bool = False, _managing_registry: 'GraphStateRegistry | None' = None, _ingested_external_inputs: list[mstar.graph.base.GraphEdge] = <factory>, _ingested_external_input_names: set[str] = <factory>, _cached_outputs: dict[str, list[mstar.graph.base.TensorPointerInfo]] = <factory>, _accumulated_cache: dict[str, list[mstar.graph.base.TensorPointerInfo]] = <factory>, _accumulated_output_names: set[str] = <factory>, _external_inputs: set[tuple[str, str]] | None = None, _loop_back_inputs: set[tuple[str, str]] | None = None)[source]#

Bases: GraphSection

Parameters:
accumulated_outputs: list[GraphEdge]#
clear()[source]#
complete_iter()[source]#

Called when every entity in the loop’s section has finished for this iteration.

If the loop is done (last iter or finish signal received): populates output tensor_info from the cache, marks itself complete in the outer registry, and returns the loop’s declared outputs with loop-back signals in filtered_signals. Otherwise: advances curr_iter, resets the inner registry, and returns the saved external inputs so they can be re-routed into the next iteration.

curr_iter: int = 0#
get_inputs_outputs()[source]#

Return the I/O signature of this section.

Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.

get_loops()[source]#

Flat map of all Loops in this section, keyed by name.

get_nodes()[source]#

Flat map of all GraphNodes in this section, keyed by name.

ingest_external_input(graph_edge)[source]#
Parameters:

graph_edge (GraphEdge)

is_done: bool = False#
max_iters: int#
maybe_cache_output(edges)[source]#

Snapshot tensor_info for any edge that matches a declared loop output.

Called after every entity completion so the most recent iteration’s tensor_info is available when complete_iter populates self.outputs. Accumulated output tensor_info is appended across iterations. Deduplicates by name so multi-destination edges don’t double-count.

Parameters:

edges (list[GraphEdge])

name: str#
outputs: list[GraphEdge]#
register_communication_info(communication_manager, request_id)[source]#
Parameters:

request_id (str)

register_finished()[source]#
reset_for_outer_iter()[source]#
section: GraphSection#
class mstar.graph.base.LoopStateRegistry(loop)[source]#

Bases: GraphStateRegistry

Parameters:

loop (Loop)

mark_entity_complete(entity_name)[source]#

Record that an entity has finished; no-ops if already done (safeguard only).

Parameters:

entity_name (str)

Return type:

NodeCompletionOutput

register_ingested_input(graph_edge)[source]#
Parameters:

graph_edge (GraphEdge)

class mstar.graph.base.NodeAndGraphWalk(node, graph_walk)[source]#

Bases: object

Pair of node name and graph walk, e.g., (LLM, decode) or (flow, image_gen). graph_walk may be None for streaming-consumer lookups where the graph walk is not known a priori.

Parameters:
  • node (str)

  • graph_walk (str | None)

graph_walk: str | None#
node: str#
class mstar.graph.base.NodeCompletionOutput(output_edges: list[mstar.graph.base.GraphEdge] = <factory>, filtered_signals: set[tuple[str, str]]=<factory>)[source]#

Bases: object

Parameters:
filtered_signals: set[tuple[str, str]]#
output_edges: list[GraphEdge]#
class mstar.graph.base.NodeInputsOutputs(ext_inputs: set[tuple[str, str]], ext_outputs: list[mstar.graph.base.GraphEdge], loop_back: set[tuple[str, str]])[source]#

Bases: object

Parameters:
ext_inputs: set[tuple[str, str]]#
ext_outputs: list[GraphEdge]#
loop_back: set[tuple[str, str]]#
class mstar.graph.base.Parallel(sections: list[mstar.graph.base.GraphSection])[source]#

Bases: GraphSection

Parameters:

sections (list[GraphSection])

get_inputs_outputs()[source]#

Return the I/O signature of this section.

Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.

get_loops()[source]#

Flat map of all Loops in this section, keyed by name.

get_nodes()[source]#

Flat map of all GraphNodes in this section, keyed by name.

sections: list[GraphSection]#
class mstar.graph.base.ReadySignals(node_name, input_names, streaming_inputs, ready_inputs=<factory>, ready_names=<factory>, is_ready=False, is_ready_for_streaming=False)[source]#

Bases: object

Readiness state for one ‘slot’ of a GraphNode.

Each node holds two instances — ready_signals (current iteration) and ready_next_iter — so that loop-back inputs arriving during execution can be buffered without overwriting the current slot.

Parameters:
clear()[source]#
input_names: set[str]#
is_ready: bool = False#
is_ready_for_streaming: bool = False#
node_name: str#
ready_inputs: dict[str, GraphEdge]#
ready_names: set[str]#
register_communication_info(communication_manager, request_id)[source]#
Parameters:

request_id (str)

remove(edge_name)[source]#

Reverse update(edge) without dereferencing tensor_info.

Used by the worker’s speculation rollback path when a streaming chunk was ingested for a readiness check and the rid was subsequently dropped. The chunk is handed back to its StreamBuffer’s uningested cache; the StreamBuffer remains responsible for the tensor lifecycle, so we must not dereference here.

Returns the removed edge, or None if no such name was present.

Parameters:

edge_name (str)

Return type:

GraphEdge | None

streaming_inputs: set[str]#
update(edge)[source]#
Parameters:

edge (GraphEdge)

class mstar.graph.base.Sequential(sections: list[mstar.graph.base.GraphSection])[source]#

Bases: GraphSection

Parameters:

sections (list[GraphSection])

get_inputs_outputs()[source]#

Return the I/O signature of this section.

Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.

get_loops()[source]#

Flat map of all Loops in this section, keyed by name.

get_nodes()[source]#

Flat map of all GraphNodes in this section, keyed by name.

sections: list[GraphSection]#
class mstar.graph.base.SpeculativeNodeInfo(node_name: str, is_new_loop_iter: bool, loop_name: str | None = None)[source]#

Bases: object

Parameters:
  • node_name (str)

  • is_new_loop_iter (bool)

  • loop_name (str | None)

is_new_loop_iter: bool#
loop_name: str | None = None#
node_name: str#
class mstar.graph.base.TensorPointerInfo(dims: list[int], dtype: str, nbytes: int, address: int, stride: list[int], uuid: str, source_session_id: str, source_entity: str, offset: int = 0, source_tp_size: int = 1, source_tp_rank: int = 0, _source_node_name: str | None = None, _source_graph_walk: str | None = None)[source]#

Bases: object

Parameters:
  • dims (list[int])

  • dtype (str)

  • nbytes (int)

  • address (int)

  • stride (list[int])

  • uuid (str)

  • source_session_id (str)

  • source_entity (str)

  • offset (int)

  • source_tp_size (int)

  • source_tp_rank (int)

  • _source_node_name (str | None)

  • _source_graph_walk (str | None)

address: int#
clone()[source]#
dims: list[int]#
dtype: str#
nbytes: int#
offset: int = 0#
source_entity: str#
source_session_id: str#
source_tp_rank: int = 0#
source_tp_size: int = 1#
stride: list[int]#
uuid: str#
class mstar.graph.base.WorkerGraphStateRegistry(graph_section)[source]#

Bases: GraphStateRegistry

Parameters:

graph_section (GraphSection)

clear()[source]#
mark_entity_complete(entity_name)[source]#

Record that an entity has finished; no-ops if already done (safeguard only).

Parameters:

entity_name (str)

Return type:

NodeCompletionOutput

register_ingested_input(graph_edge)[source]#
Parameters:

graph_edge (GraphEdge)

reset_for_iter()[source]#