Skip to content

Commit 91f5777

Browse files
author
Nikita Sokolov
committed
nexmark.Generator
1 parent ddda42a commit 91f5777

6 files changed

Lines changed: 62 additions & 51 deletions

File tree

.gitmodules

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[submodule "examples/nexmark"]
2+
path = examples/nexmark
3+
url = https://github.com/nexmark/nexmark/
4+
branch = master

examples/nexmark

Submodule nexmark added at 4d952a8

examples/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,35 @@
7676
<artifactId>jblas</artifactId>
7777
<version>1.2.4</version>
7878
</dependency>
79+
<dependency>
80+
<groupId>com.github.nexmark</groupId>
81+
<artifactId>nexmark-flink</artifactId>
82+
<version>0.1-SNAPSHOT</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.apache.flink</groupId>
86+
<artifactId>flink-table-api-java</artifactId>
87+
<version>1.11.1</version>
88+
<scope>compile</scope>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.apache.flink</groupId>
92+
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
93+
<version>1.11.1</version>
94+
<scope>compile</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.apache.flink</groupId>
98+
<artifactId>flink-core</artifactId>
99+
<version>1.11.1</version>
100+
<scope>compile</scope>
101+
</dependency>
102+
<dependency>
103+
<groupId>org.apache.flink</groupId>
104+
<artifactId>flink-streaming-java_2.11</artifactId>
105+
<version>1.11.1</version>
106+
<scope>compile</scope>
107+
</dependency>
79108
</dependencies>
80109

81110
<build>

examples/src/main/java/com/spbsu/flamestream/example/nexmark/Query8.java

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.spbsu.flamestream.example.nexmark;
22

3+
import com.github.nexmark.flink.model.Auction;
4+
import com.github.nexmark.flink.model.Person;
35
import com.spbsu.flamestream.core.graph.SerializableBiFunction;
46
import com.spbsu.flamestream.core.graph.SerializableFunction;
57
import com.spbsu.flamestream.example.labels.Flow;
@@ -10,46 +12,20 @@
1012
import scala.runtime.AbstractFunction1;
1113
import scala.util.Either;
1214

15+
import java.time.Instant;
1316
import java.util.ArrayList;
1417
import java.util.Collections;
1518
import java.util.Objects;
1619
import java.util.function.ToLongFunction;
1720
import java.util.stream.Stream;
1821

1922
public class Query8 {
20-
public static class Input {
21-
private Input() {
22-
}
23-
24-
public static class Auction extends Input {
25-
public final String seller;
26-
public final long dateTime;
27-
28-
public Auction(String seller, long dateTime) {
29-
this.seller = seller;
30-
this.dateTime = dateTime;
31-
}
32-
}
33-
34-
public static class Person extends Input {
35-
public final String id;
36-
public final String name;
37-
public final long dateTime;
38-
39-
public Person(String id, String name, long dateTime) {
40-
this.id = id;
41-
this.name = name;
42-
this.dateTime = dateTime;
43-
}
44-
}
45-
}
46-
4723
public static final class PersonGroupingKey {
48-
public final String id;
24+
public final long id;
4925
public final String name;
5026
public final long startTime;
5127

52-
private PersonGroupingKey(String id, String name, long startTime) {
28+
private PersonGroupingKey(long id, String name, long startTime) {
5329
this.id = id;
5430
this.name = name;
5531
this.startTime = startTime;
@@ -79,10 +55,10 @@ public String toString() {
7955
}
8056

8157
private static final class AuctionGroupingKey {
82-
public final String seller;
58+
public final long seller;
8359
public final long startTime;
8460

85-
private AuctionGroupingKey(String seller, long startTime) {
61+
private AuctionGroupingKey(long seller, long startTime) {
8662
this.seller = seller;
8763
this.startTime = startTime;
8864
}
@@ -99,7 +75,7 @@ public boolean equals(Object obj) {
9975
}
10076
if (obj instanceof AuctionGroupingKey) {
10177
final var that = (AuctionGroupingKey) obj;
102-
return Objects.equals(seller, that.seller) && startTime == that.startTime;
78+
return seller == that.seller && startTime == that.startTime;
10379
}
10480
return false;
10581
}
@@ -110,21 +86,21 @@ public String toString() {
11086
}
11187
}
11288

113-
public static Flow<Input, PersonGroupingKey> create(long timeWindow) {
114-
final var inputs = new Operator.Input<>(Input.class);
89+
public static Flow<Object, PersonGroupingKey> create() {
90+
final var inputs = new Operator.Input<>(Object.class);
11591
return new Flow<>(inputs, selectJoinOn(
11692
PersonGroupingKey.class,
11793
(p, a) -> p,
11894
selectGroupBy(
11995
PersonGroupingKey.class,
120-
person -> new PersonGroupingKey(person.id, person.name, tumbleStart(person.dateTime, timeWindow)),
121-
filterType(inputs, Input.Person.class),
96+
person -> new PersonGroupingKey(person.id, person.name, tumbleStart(person.dateTime, 10)),
97+
filterType(inputs, Person.class),
12298
person -> person.startTime
12399
),
124100
selectGroupBy(
125101
AuctionGroupingKey.class,
126-
auction -> new AuctionGroupingKey(auction.seller, tumbleStart(auction.dateTime, timeWindow)),
127-
filterType(inputs, Input.Auction.class),
102+
auction -> new AuctionGroupingKey(auction.seller, tumbleStart(auction.dateTime, 10)),
103+
filterType(inputs, Auction.class),
128104
auction -> auction.startTime
129105
),
130106
p -> p.id, a -> a.seller, p -> p.startTime, a -> a.startTime
@@ -237,7 +213,7 @@ public Output apply(Right right) {
237213
);
238214
}
239215

240-
private static long tumbleStart(long dateTime, long interval) {
241-
return (dateTime / interval + 1) * interval;
216+
private static long tumbleStart(Instant dateTime, long interval) {
217+
return (dateTime.getEpochSecond() / interval + 1) * interval;
242218
}
243219
}

examples/src/test/java/com/spbsu/flamestream/example/nexmark/Query8Test.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package com.spbsu.flamestream.example.nexmark;
22

3+
import com.github.nexmark.flink.model.Auction;
4+
import com.github.nexmark.flink.model.Person;
35
import com.spbsu.flamestream.core.Graph;
46
import com.spbsu.flamestream.example.labels.Materializer;
57
import com.spbsu.flamestream.runtime.FlameRuntime;
68
import com.spbsu.flamestream.runtime.LocalRuntime;
79
import com.spbsu.flamestream.runtime.acceptance.FlameAkkaSuite;
8-
import com.spbsu.flamestream.runtime.edge.akka.AkkaFront;
910
import com.spbsu.flamestream.runtime.edge.akka.AkkaFrontType;
1011
import com.spbsu.flamestream.runtime.edge.akka.AkkaRearType;
1112
import com.spbsu.flamestream.runtime.utils.AwaitResultConsumer;
1213
import org.testng.annotations.Test;
1314

14-
import java.util.List;
15+
import java.time.Instant;
1516
import java.util.Queue;
1617
import java.util.concurrent.ConcurrentLinkedQueue;
1718
import java.util.concurrent.TimeUnit;
@@ -20,25 +21,24 @@
2021
public class Query8Test extends FlameAkkaSuite {
2122
@Test
2223
public void test() throws InterruptedException {
23-
final Graph graph = Materializer.materialize(Query8.create(2));
24+
final Graph graph = Materializer.materialize(Query8.create());
2425

2526
try (final LocalRuntime runtime = new LocalRuntime.Builder().maxElementsInGraph(2)
2627
.millisBetweenCommits(500)
2728
.build()) {
2829
try (final FlameRuntime.Flame flame = runtime.run(graph)) {
29-
final Queue<Query8.Input> input = new ConcurrentLinkedQueue<>();
30-
input.add(new Query8.Input.Person("person", "name", 0));
31-
input.add(new Query8.Input.Auction("person", 1));
32-
input.add(new Query8.Input.Auction("person", 2));
30+
final Queue<Object> input = new ConcurrentLinkedQueue<>();
31+
input.add(new Person(0, "name", null, null, null, null, Instant.ofEpochSecond(0), null));
32+
input.add(new Auction(0, null, null, 0, 0, Instant.ofEpochSecond(1), null, 0, 0, null));
33+
input.add(new Auction(0, null, null, 0, 0, Instant.ofEpochSecond(10), null, 0, 0, null));
3334

3435
final AwaitResultConsumer<Query8.PersonGroupingKey> awaitConsumer =
3536
new AwaitResultConsumer<>(1);
3637
flame.attachRear("rear", new AkkaRearType<>(runtime.system(), Query8.PersonGroupingKey.class))
3738
.forEach(r -> r.addListener(awaitConsumer));
38-
final List<AkkaFront.FrontHandle<Query8.Input>> handles = flame
39-
.attachFront("front", new AkkaFrontType<Query8.Input>(runtime.system()))
40-
.collect(Collectors.toList());
41-
applyDataToAllHandlesAsync(input, handles);
39+
applyDataToAllHandlesAsync(input, flame
40+
.attachFront("front", new AkkaFrontType<>(runtime.system()))
41+
.collect(Collectors.toList()));
4242
awaitConsumer.await(200, TimeUnit.SECONDS);
4343
}
4444
}

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@
1717
<module>examples</module>
1818
<module>client</module>
1919
<module>benchmark/flink-benchmark</module>
20+
<module>examples/nexmark/nexmark-flink</module>
2021
</modules>
2122
</project>

0 commit comments

Comments
 (0)