|
3 | 3 | Producer and exchange stream examples demonstrating stateful batch-oriented data processing. |
4 | 4 |
|
5 | 5 | ```cpp title="examples/streaming.cpp" |
6 | | -#include "vgi_rpc/server.h" |
7 | | -#include "vgi_rpc/stream.h" |
8 | | -#include "vgi_rpc/metadata.h" |
9 | | -#include "vgi_rpc/arrow_utils.h" |
10 | | - |
11 | | -#include <arrow/array.h> |
12 | | -#include <arrow/builder.h> |
13 | | -#include <arrow/type.h> |
14 | | - |
15 | | -using namespace vgi_rpc; |
16 | | - |
17 | | -// --- Counter producer: emits {index, value} batches --- |
18 | | - |
19 | | -static auto counter_schema() { |
20 | | - return arrow::schema({ |
21 | | - arrow::field("index", arrow::int64()), |
22 | | - arrow::field("value", arrow::int64()), |
23 | | - }); |
24 | | -} |
25 | | - |
26 | | -class CounterState : public ProducerState { |
27 | | -public: |
28 | | - CounterState(int64_t count) : count_(count) {} |
29 | | - |
30 | | - void produce(OutputCollector& out, CallContext& /*ctx*/) override { |
31 | | - if (current_ >= count_) { |
32 | | - out.finish(); |
33 | | - return; |
34 | | - } |
35 | | - |
36 | | - arrow::Int64Builder idx_builder, val_builder; |
37 | | - VGI_RPC_THROW_NOT_OK(idx_builder.Append(current_)); |
38 | | - VGI_RPC_THROW_NOT_OK(val_builder.Append(current_ * 10)); |
39 | | - |
40 | | - auto idx_arr = unwrap(idx_builder.Finish()); |
41 | | - auto val_arr = unwrap(val_builder.Finish()); |
42 | | - |
43 | | - out.emit_arrays({idx_arr, val_arr}); |
44 | | - ++current_; |
45 | | - } |
46 | | - |
47 | | -private: |
48 | | - int64_t count_; |
49 | | - int64_t current_ = 0; |
50 | | -}; |
51 | | - |
52 | | -static Stream make_counter(const Request& req, CallContext& /*ctx*/) { |
53 | | - auto count = req.get<int64_t>("count"); |
54 | | - |
55 | | - Stream s; |
56 | | - s.output_schema = counter_schema(); |
57 | | - s.input_schema = empty_schema(); |
58 | | - s.state = std::make_shared<CounterState>(count); |
59 | | - return s; |
60 | | -} |
61 | | - |
62 | | -// --- Scale exchange: multiplies input values by factor --- |
63 | | - |
64 | | -static auto scale_input_schema() { |
65 | | - return arrow::schema({arrow::field("value", arrow::float64())}); |
66 | | -} |
67 | | - |
68 | | -static auto scale_output_schema() { |
69 | | - return arrow::schema({arrow::field("value", arrow::float64())}); |
70 | | -} |
71 | | - |
72 | | -class ScaleState : public ExchangeState { |
73 | | -public: |
74 | | - ScaleState(double factor) : factor_(factor) {} |
75 | | - |
76 | | - void exchange(const AnnotatedBatch& input, |
77 | | - OutputCollector& out, CallContext& /*ctx*/) override { |
78 | | - auto col = std::static_pointer_cast<arrow::DoubleArray>( |
79 | | - input.batch->column(0)); |
80 | | - |
81 | | - arrow::DoubleBuilder builder; |
82 | | - for (int64_t i = 0; i < col->length(); ++i) { |
83 | | - VGI_RPC_THROW_NOT_OK(builder.Append(col->Value(i) * factor_)); |
84 | | - } |
85 | | - auto result_arr = unwrap(builder.Finish()); |
86 | | - out.emit_arrays({result_arr}); |
87 | | - } |
88 | | - |
89 | | -private: |
90 | | - double factor_; |
91 | | -}; |
92 | | - |
93 | | -static Stream make_scale(const Request& req, CallContext& /*ctx*/) { |
94 | | - auto factor = req.get<double>("factor"); |
95 | | - |
96 | | - Stream s; |
97 | | - s.output_schema = scale_output_schema(); |
98 | | - s.input_schema = scale_input_schema(); |
99 | | - s.state = std::make_shared<ScaleState>(factor); |
100 | | - return s; |
101 | | -} |
102 | | - |
103 | | -int main() { |
104 | | - auto server = ServerBuilder() |
105 | | - .add_producer( |
106 | | - "produce_n", |
107 | | - arrow::schema({arrow::field("count", arrow::int64())}), |
108 | | - counter_schema(), |
109 | | - make_counter, |
110 | | - "Produce N batches with index and value=index*10") |
111 | | - .add_exchange( |
112 | | - "exchange_scale", |
113 | | - arrow::schema({arrow::field("factor", arrow::float64())}), |
114 | | - scale_input_schema(), |
115 | | - scale_output_schema(), |
116 | | - make_scale, |
117 | | - "Scale input values by a factor") |
118 | | - .enable_describe("StreamingExample") |
119 | | - .build(); |
120 | | - |
121 | | - server->run(); |
122 | | - return 0; |
123 | | -} |
| 6 | +--8<-- "examples/streaming.cpp" |
124 | 7 | ``` |
125 | 8 |
|
126 | 9 | ## Producer Pattern |
|
0 commit comments