66import com .spbsu .benchmark .flink .index .ops .OrderEnforcer ;
77import com .spbsu .benchmark .flink .index .ops .TwoPCKryoSocketSink ;
88import com .spbsu .benchmark .flink .index .ops .WikipediaPageToWordPositions ;
9- import com .spbsu .flamestream .example .benchmark .BenchStand ;
9+ import com .spbsu .flamestream .example .benchmark .WikiBenchStand ;
1010import com .spbsu .flamestream .example .benchmark .GraphDeployer ;
1111import com .typesafe .config .Config ;
1212import com .typesafe .config .ConfigFactory ;
2424import java .nio .file .Paths ;
2525
2626public class FlinkBench {
27- public static void main (String [] args ) throws IOException , InterruptedException {
27+ public static void main (String [] args ) throws Exception {
2828 final Config benchConfig ;
2929 final Config deployerConfig ;
3030 if (args .length == 2 ) {
@@ -34,9 +34,8 @@ public static void main(String[] args) throws IOException, InterruptedException
3434 benchConfig = ConfigFactory .load ("flink-bench.conf" ).getConfig ("benchmark" );
3535 deployerConfig = ConfigFactory .load ("flink-deployer.conf" ).getConfig ("deployer" );
3636 }
37- final BenchStand .StandConfig standConfig = new BenchStand .StandConfig (benchConfig );
38-
39- final GraphDeployer deployer = new GraphDeployer () {
37+ WikiBenchStand wikiBenchStand = new WikiBenchStand (benchConfig );
38+ wikiBenchStand .run (new GraphDeployer () {
4039 @ Override
4140 public void deploy () {
4241 final int parallelism = deployerConfig .getInt ("parallelism" );
@@ -59,12 +58,12 @@ public void deploy() {
5958 final SinkFunction <Result > sinkFunction ;
6059 if (guarantees .equals ("EXACTLY_ONCE" )) {
6160 sinkFunction = new TwoPCKryoSocketSink (
62- standConfig .benchHost () ,
63- standConfig .rearPort () ,
61+ wikiBenchStand .benchHost ,
62+ wikiBenchStand .rearPort ,
6463 environment .getConfig ()
6564 );
6665 } else {
67- sinkFunction = new KryoSocketSink (standConfig .benchHost (), standConfig .rearPort () );
66+ sinkFunction = new KryoSocketSink (wikiBenchStand .benchHost , wikiBenchStand .rearPort );
6867 }
6968
7069 if (guarantees .equals ("EXACTLY_ONCE" ) || guarantees .equals ("AT_LEAST_ONCE" )) {
@@ -81,7 +80,7 @@ public void deploy() {
8180
8281
8382 environment
84- .addSource (new KryoSocketSource (standConfig .benchHost (), standConfig .frontPort () ))
83+ .addSource (new KryoSocketSource (wikiBenchStand .benchHost , wikiBenchStand .frontPort ))
8584 .setParallelism (parallelism )
8685 .shuffle ()
8786 .flatMap (new WikipediaPageToWordPositions ())
@@ -105,11 +104,7 @@ public void deploy() {
105104 public void close () {
106105 // It will close itself on completion
107106 }
108- };
109-
110- try (BenchStand benchStand = new BenchStand (standConfig , deployer )) {
111- benchStand .run ();
112- }
107+ });
113108 System .exit (0 );
114109 }
115110}
0 commit comments