mstar.graph.base#
Classes
|
|
|
|
|
|
|
|
|
|
|
Pair of node name and graph walk, e.g., (LLM, decode) or (flow, image_gen). |
|
|
|
|
|
|
|
Readiness state for one 'slot' of a GraphNode. |
|
|
|
|
|
|
|
- class mstar.graph.base.GraphEdge(next_node: str, name: str, tensor_info: list[mstar.graph.base.TensorPointerInfo] = <factory>, persist: bool = False, conductor_new_token: bool = False, is_streaming: bool = False, output_modality: str = '', _persist_for_loop: bool = False, _final_stream_chunk: bool = False, _total_fanin: int = 1, _shard_dim: int | None = None)[source]#
Bases:
object- Parameters:
- tensor_info: list[TensorPointerInfo]#
- class mstar.graph.base.GraphNode(name: str, input_names: set[str], outputs: list[mstar.graph.base.GraphEdge], consumes_stream: bool = False, enable_async_scheduling: bool = True, _speculatively_scheduled: bool = False, _streaming_inputs: set[str] = <factory>, _managing_registry: 'GraphStateRegistry | None' = None)[source]#
Bases:
GraphSection- Parameters:
- get_inputs_outputs()[source]#
Return the I/O signature of this section.
Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.
- ingest_input(edge, can_buffer=True)[source]#
Try to ingest an arriving edge.
Returns True on success, False when the edge is rejected; e.g., this is a streaming edge that we are not redy for and needs to get re-queued
- class mstar.graph.base.GraphSection[source]#
Bases:
ABC- abstractmethod get_inputs_outputs()[source]#
Return the I/O signature of this section.
Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.
- Return type:
- class mstar.graph.base.GraphStateRegistry(graph_section)[source]#
Bases:
ABC- Parameters:
graph_section (GraphSection)
- class mstar.graph.base.Loop(section: mstar.graph.base.GraphSection, max_iters: int, outputs: list[mstar.graph.base.GraphEdge], name: str = <factory>, accumulated_outputs: list[mstar.graph.base.GraphEdge] = <factory>, curr_iter: int = 0, is_done: bool = False, _finish_signal: bool = False, _managing_registry: 'GraphStateRegistry | None' = None, _ingested_external_inputs: list[mstar.graph.base.GraphEdge] = <factory>, _ingested_external_input_names: set[str] = <factory>, _cached_outputs: dict[str, list[mstar.graph.base.TensorPointerInfo]] = <factory>, _accumulated_cache: dict[str, list[mstar.graph.base.TensorPointerInfo]] = <factory>, _accumulated_output_names: set[str] = <factory>, _external_inputs: set[tuple[str, str]] | None = None, _loop_back_inputs: set[tuple[str, str]] | None = None)[source]#
Bases:
GraphSection- Parameters:
section (GraphSection)
max_iters (int)
name (str)
curr_iter (int)
is_done (bool)
_finish_signal (bool)
_managing_registry (GraphStateRegistry | None)
_cached_outputs (dict[str, list[TensorPointerInfo]])
_accumulated_cache (dict[str, list[TensorPointerInfo]])
- complete_iter()[source]#
Called when every entity in the loop’s section has finished for this iteration.
If the loop is done (last iter or finish signal received): populates output tensor_info from the cache, marks itself complete in the outer registry, and returns the loop’s declared outputs with loop-back signals in filtered_signals. Otherwise: advances curr_iter, resets the inner registry, and returns the saved external inputs so they can be re-routed into the next iteration.
- get_inputs_outputs()[source]#
Return the I/O signature of this section.
Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.
- maybe_cache_output(edges)[source]#
Snapshot tensor_info for any edge that matches a declared loop output.
Called after every entity completion so the most recent iteration’s tensor_info is available when complete_iter populates self.outputs. Accumulated output tensor_info is appended across iterations. Deduplicates by name so multi-destination edges don’t double-count.
- register_communication_info(communication_manager, request_id)[source]#
- Parameters:
request_id (str)
- section: GraphSection#
- class mstar.graph.base.LoopStateRegistry(loop)[source]#
Bases:
GraphStateRegistry- Parameters:
loop (Loop)
- class mstar.graph.base.NodeAndGraphWalk(node, graph_walk)[source]#
Bases:
objectPair of node name and graph walk, e.g., (LLM, decode) or (flow, image_gen). graph_walk may be None for streaming-consumer lookups where the graph walk is not known a priori.
- class mstar.graph.base.NodeCompletionOutput(output_edges: list[mstar.graph.base.GraphEdge] = <factory>, filtered_signals: set[tuple[str, str]]=<factory>)[source]#
Bases:
object
- class mstar.graph.base.NodeInputsOutputs(ext_inputs: set[tuple[str, str]], ext_outputs: list[mstar.graph.base.GraphEdge], loop_back: set[tuple[str, str]])[source]#
Bases:
object- Parameters:
- class mstar.graph.base.Parallel(sections: list[mstar.graph.base.GraphSection])[source]#
Bases:
GraphSection- Parameters:
sections (list[GraphSection])
- get_inputs_outputs()[source]#
Return the I/O signature of this section.
Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.
- sections: list[GraphSection]#
- class mstar.graph.base.ReadySignals(node_name, input_names, streaming_inputs, ready_inputs=<factory>, ready_names=<factory>, is_ready=False, is_ready_for_streaming=False)[source]#
Bases:
objectReadiness state for one ‘slot’ of a GraphNode.
Each node holds two instances — ready_signals (current iteration) and ready_next_iter — so that loop-back inputs arriving during execution can be buffered without overwriting the current slot.
- Parameters:
- register_communication_info(communication_manager, request_id)[source]#
- Parameters:
request_id (str)
- remove(edge_name)[source]#
Reverse
update(edge)without dereferencing tensor_info.Used by the worker’s speculation rollback path when a streaming chunk was ingested for a readiness check and the rid was subsequently dropped. The chunk is handed back to its StreamBuffer’s uningested cache; the StreamBuffer remains responsible for the tensor lifecycle, so we must not dereference here.
Returns the removed edge, or None if no such name was present.
- class mstar.graph.base.Sequential(sections: list[mstar.graph.base.GraphSection])[source]#
Bases:
GraphSection- Parameters:
sections (list[GraphSection])
- get_inputs_outputs()[source]#
Return the I/O signature of this section.
Construction-time only — used to derive Loop._external_inputs / _loop_back_inputs. Not for runtime introspection.
- sections: list[GraphSection]#
- class mstar.graph.base.SpeculativeNodeInfo(node_name: str, is_new_loop_iter: bool, loop_name: str | None = None)[source]#
Bases:
object
- class mstar.graph.base.TensorPointerInfo(dims: list[int], dtype: str, nbytes: int, address: int, stride: list[int], uuid: str, source_session_id: str, source_entity: str, offset: int = 0, source_tp_size: int = 1, source_tp_rank: int = 0, _source_node_name: str | None = None, _source_graph_walk: str | None = None)[source]#
Bases:
object- Parameters:
- class mstar.graph.base.WorkerGraphStateRegistry(graph_section)[source]#
Bases:
GraphStateRegistry- Parameters:
graph_section (GraphSection)