mstar.streaming.chunk_policy#

Classes

ChunkPolicy()

Determines when a StreamBuffer has enough items for the consumer node.

FixedChunkPolicy(chunk_size[, ...])

Release non-overlapping chunks of fixed size.

LeftContextChunkPolicy(chunk, left_context)

Chunk policy for streaming vocoders with left-context overlap.

SlidingWindowChunkPolicy(window, stride)

Fixed-size sliding window that advances by a stride.

class mstar.streaming.chunk_policy.ChunkPolicy[source]#

Bases: ABC

Determines when a StreamBuffer has enough items for the consumer node.

continue_after_producer_done()[source]#

Whether the buffer should keep producing (empty) chunks after the producer signals done and all buffered items have been consumed.

Default False: partition-done is propagated to the conductor after the last item is flushed.

Set to True for connections where the consumer must keep running after the producer finishes (e.g., Thinker→Talker: the Talker continues generating codec tokens after the Thinker hits text EOS). In this case the buffer produces empty chunks (_collate([]){"data": None}), and the consumer’s partition-done is determined by its own model logic, not by the StreamBuffer.

Return type:

bool

abstractmethod is_ready(buffer_len)[source]#

Return True if the buffer has enough items for a chunk.

Parameters:

buffer_len (int)

Return type:

bool

abstractmethod next_chunk_size(buffer_len)[source]#

Return the number of items to consume for the next chunk.

Only called when is_ready() returns True. For sliding-window policies this is the stride, not the window.

Parameters:

buffer_len (int)

Return type:

int

register_chunk(chunk_size)[source]#
Parameters:

chunk_size (int)

abstractmethod window_size()[source]#

Return the full window of items to include in the chunk.

For non-overlapping policies, equals next_chunk_size. For sliding-window policies, this is larger than the stride — the buffer retains older items so the chunk contains the full window.

Return type:

int

class mstar.streaming.chunk_policy.FixedChunkPolicy(chunk_size, continue_after_done=False)[source]#

Bases: ChunkPolicy

Release non-overlapping chunks of fixed size.

Each pop_chunk returns exactly chunk_size items and advances by chunk_size. No overlap, no sliding window.

Parameters:
  • chunk_size (int) – number of items per chunk.

  • continue_after_done (bool) – if True, keep producing empty chunks after the producer finishes and all buffered items are consumed.

continue_after_producer_done()[source]#

Whether the buffer should keep producing (empty) chunks after the producer signals done and all buffered items have been consumed.

Default False: partition-done is propagated to the conductor after the last item is flushed.

Set to True for connections where the consumer must keep running after the producer finishes (e.g., Thinker→Talker: the Talker continues generating codec tokens after the Thinker hits text EOS). In this case the buffer produces empty chunks (_collate([]){"data": None}), and the consumer’s partition-done is determined by its own model logic, not by the StreamBuffer.

Return type:

bool

is_ready(buffer_len)[source]#

Return True if the buffer has enough items for a chunk.

Return type:

bool

next_chunk_size(buffer_len)[source]#

Return the number of items to consume for the next chunk.

Only called when is_ready() returns True. For sliding-window policies this is the stride, not the window.

Parameters:

buffer_len (int)

Return type:

int

window_size()[source]#

Return the full window of items to include in the chunk.

For non-overlapping policies, equals next_chunk_size. For sliding-window policies, this is larger than the stride — the buffer retains older items so the chunk contains the full window.

Return type:

int

class mstar.streaming.chunk_policy.LeftContextChunkPolicy(chunk, left_context)[source]#

Bases: ChunkPolicy

Chunk policy for streaming vocoders with left-context overlap.

Matches HuggingFace’s Qwen3OmniMoeCode2Wav.chunked_decode pattern:

Iter 0: codes[0 : chunk] → emit all (no context) Iter 1: codes[chunk-ctx : 2*chunk] → trim first ctx, emit rest Iter 2: codes[2*chunk-ctx : 3*chunk] → trim first ctx, emit rest

The first pop returns chunk items (no context). Subsequent pops return chunk + left_context items, where the leading left_context items OVERLAP with the tail of the previous chunk. This overlap allows the causal ConvNet vocoder to “warm up” its internal state on frames it has already processed, ensuring a smooth transition at chunk boundaries.

The key invariant: the first pop advances by chunk - left_context (not chunk), so the last left_context items of the first chunk remain in the buffer as overlap for the second pop. All subsequent pops advance by chunk.

Parameters:
  • chunk (int)

  • left_context (int)

is_ready(buffer_len)[source]#

Return True if the buffer has enough items for a chunk.

Parameters:

buffer_len (int)

Return type:

bool

next_chunk_size(buffer_len)[source]#

Return the number of items to consume for the next chunk.

Only called when is_ready() returns True. For sliding-window policies this is the stride, not the window.

Parameters:

buffer_len (int)

Return type:

int

window_size()[source]#

Return the full window of items to include in the chunk.

For non-overlapping policies, equals next_chunk_size. For sliding-window policies, this is larger than the stride — the buffer retains older items so the chunk contains the full window.

Return type:

int

class mstar.streaming.chunk_policy.SlidingWindowChunkPolicy(window, stride)[source]#

Bases: ChunkPolicy

Fixed-size sliding window that advances by a stride.

Each pop_chunk returns window items and advances the consumed pointer by stride. Old items before the window are discarded.

Example (Orpheus SNAC): window=28 tokens (4 frames), stride=7 (1 frame).

Parameters:
is_ready(buffer_len)[source]#

Return True if the buffer has enough items for a chunk.

Parameters:

buffer_len (int)

Return type:

bool

next_chunk_size(buffer_len)[source]#

Return the number of items to consume for the next chunk.

Only called when is_ready() returns True. For sliding-window policies this is the stride, not the window.

Parameters:

buffer_len (int)

Return type:

int

window_size()[source]#

Return the full window of items to include in the chunk.

For non-overlapping policies, equals next_chunk_size. For sliding-window policies, this is larger than the stride — the buffer retains older items so the chunk contains the full window.

Return type:

int