Skip to content

Commit 56749bd

Browse files
committed
Snapshots
1 parent c1a6ec9 commit 56749bd

10 files changed

Lines changed: 559 additions & 40 deletions

File tree

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package com.spbsu.flamestream.example.bl;
2+
3+
import com.google.common.hash.Hashing;
4+
import com.spbsu.flamestream.core.DataItem;
5+
import com.spbsu.flamestream.core.Graph;
6+
import com.spbsu.flamestream.core.TrackingComponent;
7+
import com.spbsu.flamestream.core.graph.FlameMap;
8+
import com.spbsu.flamestream.core.graph.HashUnit;
9+
import com.spbsu.flamestream.core.graph.SerializableFunction;
10+
import com.spbsu.flamestream.core.graph.Sink;
11+
import com.spbsu.flamestream.core.graph.Source;
12+
13+
import java.util.Arrays;
14+
import java.util.Collections;
15+
import java.util.HashMap;
16+
import java.util.List;
17+
import java.util.stream.IntStream;
18+
import java.util.stream.Stream;
19+
20+
public class WatermarksVsAckerGraph {
21+
static public abstract class Element {
22+
public final int id;
23+
24+
protected Element(int id) {
25+
this.id = id;
26+
}
27+
}
28+
29+
static public final class Data extends Element {
30+
public Data(int id) {
31+
super(id);
32+
}
33+
34+
@Override
35+
public String toString() {
36+
return "Data(id = " + id + ")";
37+
}
38+
}
39+
40+
static public final class Watermark extends Element {
41+
public final int fromPartition, toPartition;
42+
43+
public Watermark(int id, int toPartition) {
44+
this(id, 0, toPartition);
45+
}
46+
47+
public Watermark(int id, int fromPartition, int toPartition) {
48+
super(id);
49+
this.fromPartition = fromPartition;
50+
this.toPartition = toPartition;
51+
}
52+
53+
@Override
54+
public String toString() {
55+
return "Watermark(id = " + id + ", fromPartition = " + fromPartition + ", toPartition = " + toPartition + ")";
56+
}
57+
}
58+
59+
private static Stream<Element> watermarkStream(int parallelism, int id, int fromPartition) {
60+
return IntStream.range(0, parallelism).mapToObj(toPartition -> new Watermark(
61+
id,
62+
fromPartition,
63+
toPartition
64+
));
65+
}
66+
67+
private static class HashFunction implements com.spbsu.flamestream.core.HashFunction {
68+
final List<HashUnit> covering;
69+
final int iteration;
70+
71+
HashFunction(List<HashUnit> covering, int iteration) {
72+
this.covering = covering;
73+
this.iteration = iteration;
74+
}
75+
76+
@Override
77+
public int hash(DataItem dataItem) {
78+
Element payload = dataItem.payload(Element.class);
79+
if (payload instanceof Watermark) {
80+
return covering.get(((Watermark) payload).toPartition).from();
81+
}
82+
return Hashing.murmur3_32().newHasher(8).putInt(payload.id).putInt(iteration).hash().asInt();
83+
}
84+
}
85+
86+
static private Graph.Vertex iteration(
87+
int fromPartitions,
88+
int toPartitions,
89+
int defaultMinimalTime,
90+
HashFunction hashFunction
91+
) {
92+
final int[] watermarks = new int[fromPartitions];
93+
Arrays.fill(watermarks, defaultMinimalTime);
94+
return new FlameMap.Builder<Element, Element>(new SerializableFunction<>() {
95+
int lastEmitted = defaultMinimalTime;
96+
97+
@Override
98+
public Stream<Element> apply(Element element) {
99+
if (element instanceof Data) {
100+
return Stream.of(element);
101+
}
102+
if (element instanceof Watermark) {
103+
final Watermark incoming = (Watermark) element;
104+
if (watermarks[incoming.fromPartition] < incoming.id) {
105+
watermarks[incoming.fromPartition] = incoming.id;
106+
int watermarkToEmit = watermarks[0];
107+
for (final int watermark : watermarks) {
108+
watermarkToEmit = Math.min(watermarkToEmit, watermark);
109+
}
110+
if (lastEmitted < watermarkToEmit) {
111+
lastEmitted = watermarkToEmit;
112+
return watermarkStream(toPartitions, watermarkToEmit, incoming.toPartition);
113+
}
114+
}
115+
return Stream.empty();
116+
}
117+
throw new IllegalArgumentException(element.toString());
118+
}
119+
}, Element.class).hashFunction(hashFunction).build();
120+
}
121+
122+
@SuppressWarnings("Convert2Lambda")
123+
public static Graph apply(int frontsNumber, List<HashUnit> covering, int iterations, int defaultMinimalTime) {
124+
final int allIterations = iterations + 1;
125+
final Graph.Builder graphBuilder = new Graph.Builder();
126+
127+
final Source source = new Source();
128+
final HashMap<Graph.Vertex, TrackingComponent> vertexTrackingComponent = new HashMap<>();
129+
TrackingComponent currentTrackingComponent = new TrackingComponent(0, Collections.emptySet());
130+
vertexTrackingComponent.put(source, currentTrackingComponent);
131+
132+
Graph.Vertex prev = iteration(1, covering.size(), defaultMinimalTime, null);
133+
currentTrackingComponent = new TrackingComponent(
134+
currentTrackingComponent.index + 1,
135+
Collections.singleton(currentTrackingComponent)
136+
);
137+
vertexTrackingComponent.put(prev, currentTrackingComponent);
138+
graphBuilder.link(source, prev);
139+
140+
for (int iteration = 0; iteration < allIterations; iteration++) {
141+
final Graph.Vertex next = iteration(
142+
iteration == 0 ? frontsNumber : covering.size(),
143+
covering.size(),
144+
defaultMinimalTime,
145+
new HashFunction(covering, iteration)
146+
);
147+
currentTrackingComponent = new TrackingComponent(
148+
currentTrackingComponent.index + 1,
149+
Collections.singleton(currentTrackingComponent)
150+
);
151+
vertexTrackingComponent.put(next, currentTrackingComponent);
152+
graphBuilder.link(prev, next);
153+
prev = next;
154+
}
155+
156+
final Graph.Vertex end = iteration(
157+
covering.size(),
158+
1,
159+
defaultMinimalTime,
160+
new HashFunction(covering, allIterations)
161+
);
162+
currentTrackingComponent = new TrackingComponent(
163+
currentTrackingComponent.index + 1,
164+
Collections.singleton(currentTrackingComponent)
165+
);
166+
vertexTrackingComponent.put(end, currentTrackingComponent);
167+
graphBuilder.link(prev, end);
168+
169+
final Sink sink = new Sink();
170+
currentTrackingComponent = new TrackingComponent(
171+
currentTrackingComponent.index + 1,
172+
Collections.singleton(currentTrackingComponent)
173+
);
174+
vertexTrackingComponent.put(sink, currentTrackingComponent);
175+
graphBuilder.link(end, sink);
176+
177+
return graphBuilder.vertexTrackingComponent(vertexTrackingComponent::get).build(source, sink);
178+
}
179+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.spbsu.flamestream.example.bl.no_barrier;
2+
3+
import akka.actor.ActorSystem;
4+
import com.spbsu.flamestream.core.graph.HashGroup;
5+
import com.spbsu.flamestream.core.graph.HashUnit;
6+
import com.spbsu.flamestream.example.bl.WatermarksVsAckerGraph;
7+
import com.spbsu.flamestream.runtime.FlameRuntime;
8+
import com.spbsu.flamestream.runtime.LocalClusterRuntime;
9+
import com.spbsu.flamestream.runtime.acceptance.FlameAkkaSuite;
10+
import com.spbsu.flamestream.runtime.config.SystemConfig;
11+
import com.spbsu.flamestream.runtime.edge.akka.AkkaFront;
12+
import com.spbsu.flamestream.runtime.edge.akka.AkkaFrontType;
13+
import com.spbsu.flamestream.runtime.edge.akka.AkkaRearType;
14+
import com.spbsu.flamestream.runtime.utils.AwaitResultConsumer;
15+
import com.typesafe.config.ConfigFactory;
16+
import org.testng.annotations.DataProvider;
17+
import org.testng.annotations.Test;
18+
import scala.concurrent.Await;
19+
import scala.concurrent.duration.Duration;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.IntStream;
30+
import java.util.stream.Stream;
31+
32+
import static org.testng.Assert.assertEquals;
33+
34+
public class WatermarksVsAckerGraphTest extends FlameAkkaSuite {
35+
@DataProvider
36+
public Object[][] dataProvider() {
37+
return new Object[][]{
38+
{0},
39+
{1},
40+
{10},
41+
};
42+
}
43+
44+
@Test(dataProvider = "dataProvider")
45+
public void test(Integer iterations) throws InterruptedException, TimeoutException {
46+
final int parallelism = 4;
47+
final ActorSystem system = ActorSystem.create("testStand", ConfigFactory.load("remote"));
48+
int streamLength = 100;
49+
try (
50+
final LocalClusterRuntime runtime = new LocalClusterRuntime(
51+
parallelism,
52+
new SystemConfig.Builder().millisBetweenCommits(10000)
53+
.workersResourcesDistributor(ids -> ids.subList(0, 1)).build()
54+
);
55+
final FlameRuntime.Flame flame = runtime.run(WatermarksVsAckerGraph.apply(
56+
1,
57+
HashUnit.covering(parallelism).collect(Collectors.toCollection(ArrayList::new)),
58+
iterations,
59+
-streamLength
60+
))
61+
) {
62+
final AwaitResultConsumer<WatermarksVsAckerGraph.Data> awaitResultConsumer =
63+
new AwaitResultConsumer<>(streamLength);
64+
CountDownLatch watermarkReceived = new CountDownLatch(parallelism);
65+
flame.attachRear("Rear", new AkkaRearType<>(system, WatermarksVsAckerGraph.Element.class))
66+
.forEach(r -> r.addListener(element -> {
67+
if (element instanceof WatermarksVsAckerGraph.Data) {
68+
awaitResultConsumer.accept((WatermarksVsAckerGraph.Data) element);
69+
}
70+
if (element instanceof WatermarksVsAckerGraph.Watermark && element.id == 0) {
71+
watermarkReceived.countDown();
72+
}
73+
}));
74+
75+
final List<AkkaFront.FrontHandle<Object>> consumers =
76+
flame.attachFront("Front", new AkkaFrontType<>(system, false))
77+
.collect(Collectors.toList());
78+
for (int i = 1; i < consumers.size(); i++) {
79+
consumers.get(i).unregister();
80+
}
81+
82+
final AkkaFront.FrontHandle<Object> sink = consumers.get(0);
83+
IntStream.range(-streamLength, 0)
84+
.boxed()
85+
.flatMap(id -> Stream.of(
86+
new WatermarksVsAckerGraph.Data(id),
87+
new WatermarksVsAckerGraph.Watermark(id + 1, 0)
88+
))
89+
.forEach(sink);
90+
sink.unregister();
91+
92+
awaitResultConsumer.await(5, TimeUnit.MINUTES);
93+
watermarkReceived.await();
94+
assertEquals(awaitResultConsumer.result().count(), awaitResultConsumer.expectedSize);
95+
}
96+
Await.ready(system.terminate(), Duration.Inf());
97+
}
98+
}

runtime/src/main/java/com/spbsu/flamestream/runtime/ProcessingWatcher.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,11 @@ private void startEdgeCaches() throws Exception {
233233
curator.getData().forPath(event.getData().getPath()),
234234
FlameRuntime.FrontInstance.class
235235
);
236-
self().tell(
237-
new AttachFront<>(StringUtils.substringAfterLast(event.getData().getPath(), "/"), front),
238-
self()
239-
);
236+
self().tell(new AttachFront<>(
237+
StringUtils.substringAfterLast(event.getData().getPath(), "/"),
238+
front,
239+
systemConfig.ackerWindow()
240+
), self());
240241
}
241242
});
242243
frontsCache.start();

0 commit comments

Comments
 (0)