forked from telehash/telehash-js
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathswitch.js
More file actions
260 lines (212 loc) · 8.46 KB
/
switch.js
File metadata and controls
260 lines (212 loc) · 8.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
var async = require('async');
var hlib = require('./hash');
// global hash of all known switches by ipp or hash
var network = {};
// callbacks must be set first, and must have .data(switch, {telex for app}) and .sock.send() being udp socket send, news(switch) for new switch creation
var master = {data:function(){}, sock:{send:function(){}}, news:function(){}};
exports.setCallbacks = function(m)
{
master = m;
}
// return array of all
function getSwitches()
{
var arr = [];
Object.keys(network).forEach(function(key){
arr.push(network[key]);
});
return arr;
}
exports.getSwitches = getSwitches;
function getSwitch(ipp)
{
if(network[ipp]) return network[ipp];
return new Switch(ipp);
// create new one!
}
exports.getSwitch = getSwitch;
// return array of switches closest to the endh, s (optional optimized staring switch), num (default 5, optional)
function getNear(endh, s, num)
{
// for not just sort all, TODO use mesh, also can use a dirty list mixed with mesh
if(!num) num = 5;
var x = Object.keys(network).sort(function(a, b){
return endh.distanceTo(network[a].hash) - endh.distanceTo(network[b].hash);
});
return x.slice(0, num);
}
exports.getNear = getNear;
// every seen IPP becomes a switch object that maintains itself
function Switch(ipp, via)
{
// initialize the absolute minimum here to keep this lightweight as it's used all the time
this.ipp = ipp;
this.hash = new hlib.Hash(ipp);
network[this.ipp] = this;
this.end = this.hash.toString();
this.via = via; // optionally, which switch introduced us
this.ATinit = Date.now();
master.news(this);
return this;
}
exports.Switch = Switch;
// process incoming telex from this switch
Switch.prototype.process = function(telex, rawlen)
{
// do all the integrity and line validation stuff
if(!validate(this, telex)) return;
// basic header tracking
if(!this.BR) this.BR = 0;
this.BR += rawlen;
// they can't send us that much more than what we've told them to, bad!
if(this.BRout && this.BR - this.BRout > 12000) return;
this.BRin = (telex._br) ? parseInt(telex._br) : undefined;
if(this.BRin < 0) delete this.line; // negativity is intentionally signalled line drop (experimental)
// TODO, if no ATrecv yet but we sent only a single +end last (dialing) and a +pop request for this ip, this could be a NAT pingback and we should re-send our dial immediately
// timer tracking
this.ATrecv = Date.now();
// responses mean healthy
delete this.ATexpected;
delete this.misses;
// process serially per switch
telex._ = this; // async eats 'this'
if(!this.queue) this.queue = async.queue(worker, 1);
this.queue.push(telex);
}
function worker(telex, callback)
{
var s = telex._; delete telex._; // get owning switch, repair
// process reactionables!
if(telex['+end'] && (!telex._hop || parseInt(telex._hop) == 0)) doEnd(s, new hlib.Hash(null, telex['+end']));
if(Array.isArray(telex['.see'])) doSee(s, telex['.see']);
if(Array.isArray(telex['.tap'])) doTap(s, telex['.tap']);
// if there's any signals, check for matching taps to relay to
if(Object.keys(telex).some(function(x){ return x[0] == '+' }) && !(parseInt(telex['_hop']) >= 4)) doSignals(s, telex);
// if there's any raw data, send to master
if(Object.keys(telex).some(function(x){ return (x[0] != '+' && x[0] != '.' && x[0] != '_') })) master.data(s, telex);
callback();
}
function doEnd(s, end)
{
var near = getNear(end);
s.send({_see:near});
}
// automatically turn every new ipp into a switch, important for getNear being useful too
function doSee(s, see)
{
see.forEach(function(ipp){
if(network[ipp]) return;
master.news(new Switch(ipp, s.ipp));
});
}
function doTap(s, tap)
{
// do some validation?
// todo: index these much faster
s.rules = tap;
}
function doSignals(s, telex)
{
// find any network.*.rules and match, relay just the signals
// TODO, check our master.NAT rule, if it matches, parse the th:ipp and send them an empty telex to pop!
}
// send telex to switch, arg.ephemeral === true means don't have to send _ring
Switch.prototype.send = function(telex, arg)
{
if(this.self) return; // flag to not send to ourselves!
// if last time we sent there was an expected response and never got it, count it as a drop for health check
if(this.ATexpected < Date.now()) this.misses = this.misses + 1 || 1;
delete this.ATexpected;
// if we expect a reponse, in 10sec we should count it as a drop if nothing
if(telex['+end'] || telex['.tap']) this.ATexpected = Date.now() + 10000;
// check bytes sent vs received and drop if too much so we don't flood
if(!this.Bsent) this.Bsent = 0;
if(this.Bsent - this.BRin > 10000) {
console.error("FLOODING "+this.ipp+", dropping "+JSON.stringify(telex));
return;
}
if(!this.ring) this.ring = Math.floor((Math.random() * 32768) + 1);
telex._to = this.ipp;
// always try to handshake in case we need to talk again
this.line ? telex._line = this.line : telex._ring = this.ring;
// send the bytes we've received, if any
if(this.BR) telex._br = this.BRout = this.BR;
var json = new Buffer(JSON.stringify(telex)+'\n', "utf8"); // \n is nice for testing w/ netcat
// create actual packet with length first
var msg = new Buffer(json.length+2);
msg.writeInt16BE(json.length,0);
json.copy(msg,2);
if(msg.length > 1400) console.error("WARNING, large datagram might not survive MTU "+msg.length);
// track bytes we've sent
if(!this.Bsent) this.Bsent = 0;
this.Bsent += msg.length;
this.ATsent = Date.now();
// convenience to parse out once
if(!this.ip)
{
this.ip = this.ipp.substr(0, this.ipp.indexOf(':'));
this.port = parseInt(this.ipp.substr(this.ipp.indexOf(':')+1));
}
console.error("-->\t"+ this.ipp+"\t"+msg.toString());
master.sock.send(msg, 0, msg.length, this.port, this.ip);
}
// necessary utility to see if the switch is in a known healthy state
Switch.prototype.healthy = function()
{
if(this.self) return true; // we're always healthy haha
if(this.ATinit > (Date.now() - 10000)) return true; // new switches are healthy for 10 seconds!
if(!this.ATrecv) return false; // no packet, no love
if(this.drops > 2) return false; // three strikes
if(this.Bsent - this.BRin > 10000) return false; // more than 10k hasn't been acked
return true; // <3 everyone else
}
// destroy/drop
Switch.prototype.drop = function()
{
if(this.healthy()) this.send({_br:-10000});
// delete main reference to self, should auto-GC if no others
delete network[this.ipp];
// if meshed, remove all back references
}
// make sure this telex is valid coming from this switch, and twiddle our bits
function validate(s, t)
{
// first, if it's been more than 10 seconds after a line opened,
// be super strict, no more ringing allowed, _line absolutely required
if (s.ATline && s.ATline + 10 < Date.now() && t._line != s.line) return false;
// second, process incoming _line
if (t._line) {
// can't get a _line w/o having sent a _ring
if(s.ring == undefined) return false;
// be nice in what we accept, strict in what we send
t._line = parseInt(t._line);
// must match if exist
if (s.line && t._line != s.line) return false;
// must be a product of our sent ring!!
if (t._line % s.ring != 0) return false;
// we can set up the line now if needed
if(!s.line) {
s.ringin = t._line / s.ring; // will be valid if the % = 0 above
s.line = t._line;
s.ATline = Date.now();
}
}
// last, process any incoming _ring's (remember, could be out of order after a _line and still be valid)
if (t._ring) {
// be nice in what we accept, strict in what we send
t._ring = parseInt(t._ring);
// already had a ring and this one doesn't match, should be rare
if (s.ringin && t._ring != s.ringin) return false;
// make sure within valid range
if (t._ring <= 0 || t._ring > 32768) return false;
// we can set up the line now if needed
if (s.ATline == 0) {
s.ringin = t._ring;
if(!s.ring) s.ring = Math.floor((Math.random() * 32768) + 1);
s.line = s.ringin * s.ring;
s.ATline = Date.now();
}
}
// we're valid at this point, line or otherwise
return true;
}