Add support for streaming#157
Open
joefreeman wants to merge 25 commits into
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This adds support for streaming values from a producer to consumers.
A stream can be created explicitly with
cf.stream(...), where the argument is a generator. Or implicitly by having the task yield values. A stream can be configured with a buffer size and a timeout. By default there's no buffer or timeout, which means the producer will produce one item at a time to meet demand from consumers. If a buffer is configured, the producer will eagerly produce items to fill the buffer (or continuously if the buffer is set toNone). If a timeout is configured then the stream will be closed if no items are produced for that interval - this could be either because items aren't being produced or because items aren't being consumed.A
@taskcan be configured withstreams=cf.Streams(...)(plural) to configure default buffer/timeout for streams (including an implicitly created stream).A stream can be sliced or partitioned to select the items to be received, and then iterated to consume them - so a stream can be consumed in parallel by multiple consumers.
Item values are persisted to the orchestration server (either as raw values or references to blobs). Executions that have streams that are successfully exhausted (i.e., closed by the producer) can be cached/memoised.