-
Notifications
You must be signed in to change notification settings - Fork 86
Expand file tree
/
Copy pathAsyncRawSocketSender.java
More file actions
139 lines (110 loc) · 3.83 KB
/
AsyncRawSocketSender.java
File metadata and controls
139 lines (110 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package org.fluentd.logger.sender;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.fluentd.logger.errorhandler.ErrorHandler;
import org.fluentd.logger.sender.ExponentialDelayReconnector;
import org.fluentd.logger.sender.RawSocketSender;
import org.fluentd.logger.sender.Reconnector;
import org.fluentd.logger.sender.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An asynchronous wrapper around RawSocketSender
* <br>
* This feature is highly experimental.
*
* @author mxk
*
*/
public class AsyncRawSocketSender implements Sender {
private final class EmitRunnable implements Runnable {
private final String tag;
private final Map<String, Object> data;
private final RawSocketSender sender;
private final long timestamp;
private EmitRunnable(String tag, Map<String, Object> data,
RawSocketSender sender, long timestamp) {
this.tag = tag;
this.data = data;
this.sender = sender;
this.timestamp = timestamp;
}
@Override
public void run() {
sender.emit(tag, timestamp, data);
}
}
private final class FlushRunnable implements Runnable {
private final RawSocketSender sender;
private FlushRunnable(RawSocketSender sender) {
this.sender = sender;
}
@Override
public void run() {
sender.flush();
}
}
private RawSocketSender sender;
private Reconnector reconnector;
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(AsyncRawSocketSender.class);
private final ExecutorService senderTask = Executors.newSingleThreadExecutor();
private static final ErrorHandler DEFAULT_ERROR_HANDLER = new ErrorHandler() {};
private ErrorHandler errorHandler = DEFAULT_ERROR_HANDLER;
public AsyncRawSocketSender() {
this("localhost", 24224);
}
public AsyncRawSocketSender(String host, int port) {
this(host, port, 3 * 1000, 8 * 1024 * 1024);
}
public AsyncRawSocketSender(String host, int port, int timeout,
int bufferCapacity) {
this(host, port, timeout, bufferCapacity,
new ExponentialDelayReconnector());
}
public AsyncRawSocketSender(String host, int port, int timeout,
int bufferCapacity, Reconnector reconnector) {
this.reconnector = reconnector;
this.sender = new RawSocketSender(host, port, timeout, bufferCapacity,
reconnector);
}
@Override
public synchronized void flush() {
final RawSocketSender sender = this.sender;
senderTask.execute(new FlushRunnable(sender));
}
@Override
public void close() {
sender.close();
}
@Override
public boolean emit(String tag, Map<String, Object> data) {
return emit(tag, System.currentTimeMillis() / 1000, data);
}
@Override
public boolean emit(final String tag, final long timestamp, final Map<String, Object> data) {
final RawSocketSender sender = this.sender;
senderTask.execute(new EmitRunnable(tag, data, sender, timestamp));
return true;
}
@Override
public String getName() {
return sender.getName();
}
@Override
public synchronized boolean isConnected() {
return sender.isConnected();
}
@Override
public void setErrorHandler(ErrorHandler errorHandler) {
if (errorHandler == null) {
throw new IllegalArgumentException("errorHandler is null");
}
this.errorHandler = errorHandler;
}
@Override
public void removeErrorHandler() {
this.errorHandler = DEFAULT_ERROR_HANDLER;
}
}