This repository was archived by the owner on Jun 30, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathmsgpack_rpc_stream.lua
More file actions
156 lines (138 loc) · 4.58 KB
/
msgpack_rpc_stream.lua
File metadata and controls
156 lines (138 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
local mpack = require('mpack')
-- temporary hack to be able to manipulate buffer/window/tabpage
local Buffer = {}
Buffer.__index = Buffer
function Buffer.new(id) return setmetatable({id=id}, Buffer) end
local Window = {}
Window.__index = Window
function Window.new(id) return setmetatable({id=id}, Window) end
local Tabpage = {}
Tabpage.__index = Tabpage
function Tabpage.new(id) return setmetatable({id=id}, Tabpage) end
local function hexdump(str)
local len = string.len(str)
local dump = ""
local hex = ""
local asc = ""
for i = 1, len do
if 1 == i % 8 then
dump = dump .. hex .. asc .. "\n"
hex = string.format("%04x: ", i - 1)
asc = ""
end
local ord = string.byte(str, i)
hex = hex .. string.format("%02x ", ord)
if ord >= 32 and ord <= 126 then
asc = asc .. string.char(ord)
else
asc = asc .. "."
end
end
return dump .. hex .. string.rep(" ", 8 - len % 8) .. asc
end
local Response = {}
Response.__index = Response
function Response.new(msgpack_rpc_stream, request_id)
return setmetatable({
_msgpack_rpc_stream = msgpack_rpc_stream,
_request_id = request_id
}, Response)
end
function Response:send(value, is_error)
local data = self._msgpack_rpc_stream._session:reply(self._request_id)
if is_error then
data = data .. self._msgpack_rpc_stream._pack(value)
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
else
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
data = data .. self._msgpack_rpc_stream._pack(value)
end
self._msgpack_rpc_stream._stream:write(data)
end
local MsgpackRpcStream = {}
MsgpackRpcStream.__index = MsgpackRpcStream
function MsgpackRpcStream.new(stream)
return setmetatable({
_stream = stream,
_previous_chunk = nil,
_pack = mpack.Packer({
ext = {
[Buffer] = function(o) return 0, mpack.pack(o.id) end,
[Window] = function(o) return 1, mpack.pack(o.id) end,
[Tabpage] = function(o) return 2, mpack.pack(o.id) end
}
}),
_session = mpack.Session({
unpack = mpack.Unpacker({
ext = {
[0] = function(c, s) return Buffer.new(mpack.unpack(s)) end,
[1] = function(c, s) return Window.new(mpack.unpack(s)) end,
[2] = function(c, s) return Tabpage.new(mpack.unpack(s)) end
}
})
}),
}, MsgpackRpcStream)
end
function MsgpackRpcStream:write(method, args, response_cb)
local data
if response_cb then
assert(type(response_cb) == 'function')
data = self._session:request(response_cb)
else
data = self._session:notify()
end
data = data .. self._pack(method) .. self._pack(args)
self._stream:write(data)
end
function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
self._stream:read_start(function(data)
if not data then
return eof_cb()
end
local status, type_, id_or_cb
local pos = 1
local len = #data
while pos <= len do
-- grab a copy of pos since pcall() will set it to nil on error
local oldpos = pos
status, type_, id_or_cb, method_or_error, args_or_result, pos = pcall(
self._session.receive, self._session, data, pos)
if not status then
-- write the full blob of bad data to a specific file
local outfile = io.open('./msgpack-invalid-data', 'w')
outfile:write(data)
outfile:close()
-- build a printable representation of the bad part of the string
local printable = hexdump(data:sub(oldpos, oldpos + 8 * 10))
print(string.format("Error deserialising msgpack data stream at pos %d:\n%s\n",
oldpos, printable))
print(string.format("... occurred after %s", self._previous_chunk))
error(type_)
end
if type_ == 'request' or type_ == 'notification' then
self._previous_chunk = string.format('%s<%s>', type_, method_or_error)
if type_ == 'request' then
request_cb(method_or_error, args_or_result, Response.new(self,
id_or_cb))
else
notification_cb(method_or_error, args_or_result)
end
elseif type_ == 'response' then
self._previous_chunk = string.format('response<%s>', type(args_or_result))
if method_or_error == mpack.NIL then
method_or_error = nil
else
args_or_result = nil
end
id_or_cb(method_or_error, args_or_result)
end
end
end)
end
function MsgpackRpcStream:read_stop()
self._stream:read_stop()
end
function MsgpackRpcStream:close(signal)
self._stream:close(signal)
end
return MsgpackRpcStream