1010import com .spbsu .flamestream .runtime .edge .EdgeContext ;
1111import com .spbsu .flamestream .runtime .master .acker .api .Heartbeat ;
1212import com .spbsu .flamestream .runtime .master .acker .api .registry .UnregisterFront ;
13+ import org .apache .commons .lang3 .SerializationUtils ;
1314
1415import java .time .Duration ;
1516import java .time .Instant ;
1617import java .util .Map ;
18+ import java .util .concurrent .Callable ;
1719import java .util .concurrent .Executors ;
1820import java .util .function .Consumer ;
1921
@@ -65,21 +67,58 @@ public void onStart(Consumer<Object> consumer, GlobalTime from) {
6567 consumer .accept (new UnregisterFront (edgeContext .edgeId ()));
6668 return ;
6769 }
68- final var nexmarkConfiguration = type .nexmarkConfiguration ;
69- final var generatorConfig = new GeneratorConfig (
70- nexmarkConfiguration ,
71- type .baseTime ,
72- 1 ,
73- type .maxEvents ,
74- 1
75- ).split (type .nodePartition .size ()).get (partition );
7670 final var executor =
7771 Executors .newSingleThreadExecutor (runnable -> new Thread (runnable , edgeContext .edgeId ().toString ()));
78- executor .submit (() -> {
79- try {
80- Meta basicMeta = null ;
81- int childId = 0 ;
82- final var generator = new NexmarkGenerator (generatorConfig );
72+ executor .submit (new Callable <Object >() {
73+ private int childId ;
74+ private Meta basicMeta ;
75+
76+ @ Override
77+ public Object call () throws Exception {
78+ try {
79+ final var warmUpRateReduction = 10 ;
80+ generate (
81+ generate (
82+ 1 ,
83+ type .baseTime ,
84+ type .nexmarkConfiguration .windowSizeSec * type .nexmarkConfiguration .firstEventRate * type .nodePartition .size (),
85+ slower (type .nexmarkConfiguration , warmUpRateReduction )
86+ ),
87+ type .baseTime + type .nexmarkConfiguration .windowSizeSec * warmUpRateReduction * 1000 ,
88+ type .maxEvents ,
89+ type .nexmarkConfiguration
90+ );
91+ consumer .accept (new Heartbeat (new GlobalTime (Long .MAX_VALUE , edgeContext .edgeId ())));
92+ return null ;
93+ } catch (Throwable throwable ) {
94+ throwable .printStackTrace ();
95+ throw throwable ;
96+ } finally {
97+ executor .shutdown ();
98+ }
99+ }
100+
101+ private NexmarkConfiguration slower (NexmarkConfiguration nexmarkConfiguration , int times ) {
102+ nexmarkConfiguration = SerializationUtils .clone (nexmarkConfiguration );
103+ nexmarkConfiguration .firstEventRate /= times ;
104+ nexmarkConfiguration .nextEventRate /= times ;
105+ return nexmarkConfiguration ;
106+ }
107+
108+ private long generate (
109+ long firstEventId ,
110+ long baseTime ,
111+ long maxEvents ,
112+ NexmarkConfiguration nexmarkConfiguration
113+ ) throws InterruptedException {
114+ final var generatorConfig = new GeneratorConfig (
115+ nexmarkConfiguration ,
116+ baseTime ,
117+ firstEventId ,
118+ maxEvents ,
119+ 1
120+ );
121+ final var generator = new NexmarkGenerator (generatorConfig .split (type .nodePartition .size ()).get (partition ));
83122 while (generator .hasNext ()) {
84123 final var nextEvent = generator .nextEvent ();
85124 final var event = nextEvent .event ;
@@ -101,13 +140,8 @@ public void onStart(Consumer<Object> consumer, GlobalTime from) {
101140 }
102141 consumer .accept (new PayloadDataItem (new Meta (basicMeta , 0 , childId ++), event ));
103142 }
104- consumer .accept (new Heartbeat (new GlobalTime (Long .MAX_VALUE , edgeContext .edgeId ())));
105- return null ;
106- } catch (Throwable throwable ) {
107- throwable .printStackTrace ();
108- throw throwable ;
109- } finally {
110- executor .shutdown ();
143+ System .out .println ("hi" );
144+ return generatorConfig .getStopEventId ();
111145 }
112146 });
113147 }
0 commit comments