Skip to content

Commit d22ed1d

Browse files
committed
Extract EventStream into external class so behavior can be overidden
1 parent 0719f19 commit d22ed1d

File tree

4 files changed

+416
-360
lines changed

4 files changed

+416
-360
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package net.servicestack.client.sse;
2+
3+
import net.servicestack.client.Log;
4+
import net.servicestack.client.Utils;
5+
6+
import java.io.BufferedInputStream;
7+
import java.io.IOException;
8+
import java.io.InputStream;
9+
import java.net.HttpURLConnection;
10+
import java.net.URL;
11+
12+
/**
13+
* Created by mythz on 3/3/2017.
14+
*/
15+
16+
public class EventStream implements Runnable {
17+
static int BufferSize = 1024 * 64;
18+
19+
ServerEventsClient client;
20+
ServerEventMessage currentMsg;
21+
22+
public EventStream(ServerEventsClient client) {
23+
this.client = client;
24+
}
25+
26+
protected InputStream getInputStream(URL streamUri) throws IOException {
27+
HttpURLConnection req = (HttpURLConnection) streamUri.openConnection();
28+
return new BufferedInputStream(req.getInputStream());
29+
}
30+
31+
public void close(){
32+
}
33+
34+
@Override
35+
public void run() {
36+
try {
37+
if (client.running.get())
38+
return;
39+
client.running.set(true);
40+
41+
URL streamUri = new URL(client.getEventStreamUri());
42+
InputStream is = getInputStream(streamUri);
43+
client.errorsCount.set(0);
44+
readStream(is);
45+
} catch (InterruptedException ie){
46+
Log.i("EventStream.run(): Caught InterruptedException"); //thrown by interruptBackgroundThread()
47+
return;
48+
} catch (Exception e) {
49+
Log.e("Error reading from event-stream, continuous errors: " + client.errorsCount.incrementAndGet(), e);
50+
Log.e(Utils.getStackTrace(e));
51+
} finally {
52+
client.running.set(false);
53+
}
54+
55+
if (!client.running.get()){
56+
client.restart();
57+
}
58+
}
59+
60+
protected int readFromStream(InputStream inputStream, byte[] buffer) throws IOException, InterruptedException {
61+
int len;
62+
while (true) {
63+
int available = inputStream.available();
64+
if (available > 0) break;
65+
Thread.sleep(1);
66+
}
67+
68+
len = inputStream.read(buffer);
69+
return len;
70+
}
71+
72+
protected void readStream(InputStream inputStream) throws IOException, InterruptedException {
73+
byte[] buffer = new byte[BufferSize];
74+
String overflowText = "";
75+
76+
int len = 0;
77+
while (true) {
78+
len = readFromStream(inputStream, buffer);
79+
80+
if (len <= 0)
81+
break;
82+
83+
String text = overflowText + new String(buffer, 0, len, "UTF-8");
84+
85+
int pos;
86+
while ((pos = text.indexOf('\n')) >= 0) {
87+
if (pos == 0) {
88+
if (currentMsg != null)
89+
processEventMessage(currentMsg);
90+
currentMsg = null;
91+
92+
text = text.substring(pos + 1);
93+
94+
if (!Utils.isEmpty(text))
95+
continue;
96+
97+
break;
98+
}
99+
100+
String line = text.substring(0, pos);
101+
if (!Utils.isNullOrWhiteSpace(line))
102+
processLine(line);
103+
if (text.length() > pos + 1)
104+
text = text.substring(pos + 1);
105+
}
106+
107+
overflowText = text;
108+
}
109+
110+
if (Log.isDebugEnabled())
111+
Log.d("Connection ended on " + client.getConnectionDisplayName());
112+
}
113+
114+
protected void processLine(String line) {
115+
if (line == null || line.length() == 0)
116+
return;
117+
118+
if (currentMsg == null)
119+
currentMsg = new ServerEventMessage();
120+
121+
String[] parts = Utils.splitOnFirst(line, ':');
122+
String label = parts[0];
123+
String data = parts[1];
124+
if (data.length() > 0 && data.charAt(0) == ' ')
125+
data = data.substring(1);
126+
127+
if ("id".equals(label)) {
128+
currentMsg.setEventId(Long.parseLong(data));
129+
} else if ("data".equals(label)) {
130+
currentMsg.setData(data);
131+
}
132+
}
133+
134+
protected void processEventMessage(ServerEventMessage e) {
135+
String[] parts = Utils.splitOnFirst(e.getData(), ' ');
136+
e.setSelector(parts[0]);
137+
String[] selParts = Utils.splitOnFirst(e.getSelector(), '@');
138+
if (selParts.length > 1) {
139+
e.setChannel(selParts[0]);
140+
e.setSelector(selParts[1]);
141+
}
142+
143+
e.setJson(parts[1]);
144+
145+
if (!Utils.isNullOrEmpty(e.getSelector())) {
146+
parts = Utils.splitOnFirst(e.getSelector(), '.');
147+
if (parts.length < 2)
148+
throw new IllegalArgumentException("Invalid Selector '" + e.getSelector() + "'");
149+
150+
e.setOp(parts[0]);
151+
String target = parts[1].replace("%20", " ");
152+
153+
String[] tokens = Utils.splitOnFirst(target, '$');
154+
e.setTarget(tokens[0]);
155+
if (tokens.length > 1)
156+
e.setCssSelector(tokens[1]);
157+
158+
if ("cmd".equals(e.getOp())) {
159+
target = e.getTarget();
160+
if ("onConnect".equals(target)) {
161+
client.processOnConnectMessage(e);
162+
return;
163+
} else if ("onJoin".equals(target)) {
164+
client.processOnJoinMessage(e);
165+
return;
166+
} else if ("onLeave".equals(target)) {
167+
client.processOnLeaveMessage(e);
168+
return;
169+
} else if ("onUpdate".equals(target)) {
170+
client.processOnUpdateMessage(e);
171+
return;
172+
} else if ("onHeartbeat".equals(target)) {
173+
client.processOnHeartbeatMessage(e);
174+
return;
175+
} else {
176+
ServerEventCallback cb = client.getHandlers().get(e.getTarget());
177+
if (cb != null) {
178+
cb.execute(this.client, e);
179+
}
180+
}
181+
}
182+
183+
ServerEventCallback receiver = client.getNamedReceivers().get(e.getOp());
184+
if (receiver != null) {
185+
receiver.execute(this.client, e);
186+
}
187+
}
188+
189+
client.onMessageReceived(e);
190+
}
191+
}

0 commit comments

Comments
 (0)