11package opendota ;
22
33import java .io .BufferedReader ;
4+ import java .io .ByteArrayInputStream ;
45import java .io .ByteArrayOutputStream ;
56import java .io .IOException ;
67import java .io .InputStream ;
910import java .io .UnsupportedEncodingException ;
1011import java .net .InetSocketAddress ;
1112import java .net .URI ;
12- import java .net .URL ;
1313import java .net .URLDecoder ;
14+ import java .net .http .HttpClient ;
15+ import java .net .http .HttpRequest ;
16+ import java .net .http .HttpResponse ;
17+ import java .time .Duration ;
1418import java .util .LinkedHashMap ;
1519import java .util .Map ;
1620import java .util .Timer ;
@@ -43,11 +47,9 @@ public void handle(HttpExchange t) throws IOException {
4347 InputStream is = t .getRequestBody ();
4448 OutputStream os = t .getResponseBody ();
4549 try {
46- new Parse (is , os , t .getRequestURI ().getRawQuery () != null ? t .getRequestURI ().getRawQuery ().contains ("blob" ) : false );
47- }
48- catch (Exception e )
49- {
50- e .printStackTrace ();
50+ new Parse (is , os , false );
51+ } catch (Exception e ) {
52+ e .printStackTrace ();
5153 }
5254 os .close ();
5355 }
@@ -68,64 +70,106 @@ static class BlobHandler implements HttpHandler {
6870 public void handle (HttpExchange t ) throws IOException {
6971 try {
7072 Map <String , String > query = splitQuery (t .getRequestURI ());
71- URL replayUrl = URI .create (query .get ("replay_url" )). toURL ( );
73+ URI replayUrl = URI .create (query .get ("replay_url" ));
7274 // boolean v2 = t.getRequestURI().getRawQuery() != null ? t.getRequestURI().getRawQuery().contains("v2") : false;
73- boolean v2 = true ;
74- String cmd = String .format ("""
75- curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600%s" %s
76- """ ,
77- replayUrl ,
78- replayUrl .toString ().endsWith (".bz2" ) ? "bunzip2" : "cat" ,
79- v2 ? "?blob" : "" ,
80- v2 ? "" : " | node processors/createParsedDataBlob.mjs"
81- );
82- System .err .println (cmd );
83- // Download, unzip, parse, aggregate
84- Process proc = new ProcessBuilder (new String [] {"bash" , "-c" , cmd })
85- .start ();
86- ByteArrayOutputStream output = new ByteArrayOutputStream ();
87- ByteArrayOutputStream error = new ByteArrayOutputStream ();
88- copy (proc .getInputStream (), output );
89- // Write error to console
90- copy (proc .getErrorStream (), error );
91- System .err .println (error .toString ());
92- int exitCode = proc .waitFor ();
93- if (exitCode != 0 ) {
94- // We can send 200 status here and no response if expected error (read the error string)
95- // Maybe we can pass the specific error info in the response headers
96- int status = 500 ;
97- if (error .toString ().contains ("curl: (28) Operation timed out" )) {
98- // Parse took too long, maybe China replay?
99- status = 200 ;
100- }
101- if (error .toString ().contains ("curl: (22) The requested URL returned error: 502" )) {
102- // Google-Edge-Cache: origin retries exhausted Error: 2010
103- // Server error, don't retry
104- status = 200 ;
105- }
106- if (error .toString ().contains ("bunzip2: Data integrity error when decompressing" )) {
107- // Corrupted replay, don't retry
108- status = 200 ;
109- }
110- if (error .toString ().contains ("bunzip2: Compressed file ends unexpectedly" )) {
111- // Corrupted replay, don't retry
112- status = 200 ;
113- }
114- if (error .toString ().contains ("bunzip2: (stdin) is not a bzip2 file." )) {
115- // Tried to unzip a non-bz2 file
116- status = 200 ;
117- }
118- t .sendResponseHeaders (status , 0 );
119- t .getResponseBody ().close ();
120- } else {
121- t .sendResponseHeaders (200 , output .size ());
122- output .writeTo (t .getResponseBody ());
123- t .getResponseBody ().close ();
75+ // boolean v2 = true;
76+
77+ // Get the replay as a byte[]
78+ HttpClient client = HttpClient .newHttpClient ();
79+ HttpRequest request = HttpRequest .newBuilder ()
80+ .timeout (Duration .ofSeconds (145 ))
81+ .uri (replayUrl )
82+ .build ();
83+ HttpResponse <byte []> response = client .send (request , HttpResponse .BodyHandlers .ofByteArray ());
84+
85+ byte [] bzIn = response .body ();
86+ byte [] bzOut = bzIn ;
87+
88+ if (replayUrl .toString ().endsWith (".bz2" )) {
89+ // Write byte[] to bunzip, get back decompressed byte[]
90+ Process bz = new ProcessBuilder (new String [] {"bunzip2" }).start ();
91+
92+ // Start separate thread so we can consume output while sending input
93+ Thread thread = new Thread (() -> {
94+ try {
95+ copy (new ByteArrayInputStream (bzIn ), bz .getOutputStream ());
96+ bz .getOutputStream ().close ();
97+ } catch (IOException ex ) {
98+ ex .printStackTrace ();
99+ }
100+ });
101+ thread .start ();
102+
103+ bzOut = bz .getInputStream ().readAllBytes ();
104+ System .err .println (new String (bz .getErrorStream ().readAllBytes ()));
124105 }
125- }
126- catch (InterruptedException e ) {
127- e .printStackTrace ();
106+
107+ // Start parser with input stream created from byte[]
108+ ByteArrayOutputStream baos2 = new ByteArrayOutputStream ();
109+ new Parse (new ByteArrayInputStream (bzOut ), baos2 , true );
110+ byte [] parseOut = baos2 .toByteArray ();
111+
112+ t .sendResponseHeaders (200 , parseOut .length );
113+ t .getResponseBody ().write (parseOut );
114+ t .getResponseBody ().close ();
115+ } catch (Exception ex ) {
116+ // TODO handle timeouts and corrupted replays (don't retry in those cases)
117+ ex .printStackTrace ();
118+ t .sendResponseHeaders (500 , 0 );
119+ t .getResponseBody ().close ();
128120 }
121+
122+ // String cmd = String.format("""
123+ // curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600%s" %s
124+ // """,
125+ // replayUrl,
126+ // replayUrl.toString().endsWith(".bz2") ? "bunzip2" : "cat",
127+ // v2 ? "?blob" : "",
128+ // v2 ? "" : " | node processors/createParsedDataBlob.mjs"
129+ // );
130+ // System.err.println(cmd);
131+ // // Download, unzip, parse, aggregate
132+ // Process proc = new ProcessBuilder(new String[] {"bash", "-c", cmd})
133+ // .start();
134+ // ByteArrayOutputStream output = new ByteArrayOutputStream();
135+ // ByteArrayOutputStream error = new ByteArrayOutputStream();
136+ // copy(proc.getInputStream(), output);
137+ // // Write error to console
138+ // copy(proc.getErrorStream(), error);
139+ // System.err.println(error.toString());
140+ // int exitCode = proc.waitFor();
141+ // if (exitCode != 0) {
142+ // // We can send 200 status here and no response if expected error (read the error string)
143+ // // Maybe we can pass the specific error info in the response headers
144+ // int status = 500;
145+ // if (error.toString().contains("curl: (28) Operation timed out")) {
146+ // // Parse took too long, maybe China replay?
147+ // status = 200;
148+ // }
149+ // if (error.toString().contains("curl: (22) The requested URL returned error: 502")) {
150+ // // Google-Edge-Cache: origin retries exhausted Error: 2010
151+ // // Server error, don't retry
152+ // status = 200;
153+ // }
154+ // if (error.toString().contains("bunzip2: Data integrity error when decompressing")) {
155+ // // Corrupted replay, don't retry
156+ // status = 200;
157+ // }
158+ // if (error.toString().contains("bunzip2: Compressed file ends unexpectedly")) {
159+ // // Corrupted replay, don't retry
160+ // status = 200;
161+ // }
162+ // if (error.toString().contains("bunzip2: (stdin) is not a bzip2 file.")) {
163+ // // Tried to unzip a non-bz2 file
164+ // status = 200;
165+ // }
166+ // t.sendResponseHeaders(status, 0);
167+ // t.getResponseBody().close();
168+ // } else {
169+ // t.sendResponseHeaders(200, output.size());
170+ // output.writeTo(t.getResponseBody());
171+ // t.getResponseBody().close();
172+ // }
129173 }
130174 }
131175
0 commit comments