mstar.graph.graph_io#

Functions

Classes

WorkerGraphIO(graph[, wg_id])

Primary interface between the worker execution loop and a computation graph.

class mstar.graph.graph_io.WorkerGraphIO(graph, wg_id=None)[source]#

Bases: object

Primary interface between the worker execution loop and a computation graph.

The worker calls ingest_input with arriving edges, reads ready_node_names to pick a node to run, calls mark_node_complete, and routes the returned output edges. register_loop_finish_signal handles externally-signalled loop termination (e.g. EOS).

Parameters:
clear()[source]#
clear_speculative_inputs()[source]#

Clear all speculative buffers — call when discarding a speculative schedule.

Return type:

None

get_loop_indices()[source]#
get_nested_loop_idxs(target_loop_name)[source]#
Parameters:

target_loop_name (str)

get_nested_loop_idxs_for_node(node_name)[source]#
Parameters:

node_name (str)

get_node(name)[source]#
Parameters:

name (str)

ingest_for_speculation(edges, source_node)[source]#

Ingest an set of anticipated output edges into speculative buffers.

Returns a list of nodes that are ready to be specilatively executed: (A) if the same as source_node, that speculative & next_iter & streaming cover all of the inputs.

  1. if not the same, that speculative & inputs & streaming cover the inputs.

Parameters:
Return type:

list[SpeculativeNodeInfo]

ingest_input(graph_edge, can_buffer=True)[source]#

Route an arriving edge to its destination node.

Returns False when the edge isn’t claimed here: next_node not in this graph, OR the node exists but rejected the edge (name mismatch / both ready slots full — see GraphNode.ingest_input). Callers should treat the False return as “try the next destination” (cross-worker routing or StreamBuffer re-queue).

Parameters:
Return type:

bool

mark_node_complete(node_name)[source]#

Signal that a node has finished executing.

The caller must route each edge in output_edges, skipping any in filtered_signals.

Parameters:

node_name (str)

Return type:

NodeCompletionOutput

property ready_for_streaming#
property ready_node_names#
register_communication_info(communication_manager, request_id)[source]#
Parameters:

request_id (str)

register_loop_finish_signal(loop_name)[source]#

Registers an external loop finish signal (e.g., saw an EOS)

Parameters:

loop_name (str)

mstar.graph.graph_io.format_graph_edge_list(lst)[source]#
Parameters:

lst (list[GraphEdge])

Return type:

str