-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathAsyncRawSocketSender.java
More file actions
129 lines (103 loc) · 3.54 KB
/
AsyncRawSocketSender.java
File metadata and controls
129 lines (103 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package org.fluentd.logger.sender;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.fluentd.logger.errorhandler.ErrorHandler;
import org.fluentd.logger.sender.ExponentialDelayReconnector;
import org.fluentd.logger.sender.RawSocketSender;
import org.fluentd.logger.sender.Reconnector;
import org.fluentd.logger.sender.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An asynchronous wrapper around RawSocketSender
*
* @author mxk
*
*/
public class AsyncRawSocketSender implements Sender {
private final class EmitRunnable implements Runnable {
private final String tag;
private final Map<String, Object> data;
private final RawSocketSender sender;
private final long timestamp;
private EmitRunnable(String tag, Map<String, Object> data,
RawSocketSender sender, long timestamp) {
this.tag = tag;
this.data = data;
this.sender = sender;
this.timestamp = timestamp;
}
@Override
public void run() {
sender.emit(tag, timestamp, data);
}
}
private final class FlushRunnable implements Runnable {
private final RawSocketSender sender;
private FlushRunnable(RawSocketSender sender) {
this.sender = sender;
}
@Override
public void run() {
sender.flush();
}
}
private RawSocketSender sender;
private Reconnector reconnector;
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class);
private static final ExecutorService flusher = Executors.newSingleThreadExecutor();
public AsyncRawSocketSender() {
this("localhost", 24224);
}
public AsyncRawSocketSender(String host, int port) {
this(host, port, 3 * 1000, 8 * 1024 * 1024);
}
public AsyncRawSocketSender(String host, int port, int timeout,
int bufferCapacity) {
this(host, port, timeout, bufferCapacity,
new ExponentialDelayReconnector());
}
public AsyncRawSocketSender(String host, int port, int timeout,
int bufferCapacity, Reconnector reconnector) {
this.reconnector = reconnector;
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
reconnector);
}
@Override
public synchronized void flush() {
final RawSocketSender sender = this.sender;
flusher.execute(new FlushRunnable(sender));
}
@Override
public void close() {
sender.close();
}
@Override
public boolean emit(String tag, Map<String, Object> data) {
return emit(tag, System.currentTimeMillis() / 1000, data);
}
@Override
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
final RawSocketSender sender = this.sender;
flusher.execute(new EmitRunnable(tag, data, sender, timestamp));
return sender.isConnected() || reconnector.enableReconnection(System.currentTimeMillis());
}
@Override
public String getName() {
return sender.getName();
}
@Override
public boolean isConnected() {
return sender.isConnected();
}
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
sender.setErrorHandler(errorHandler);
}
@Override
public void removeErrorHandler() {
sender.removeErrorHandler();
}
}