Skip to content

Commit 40462a7

Browse files
committed
Add expected trailer behavior for grpc-web-text case
Signed-off-by: Matt Rkiouak <mrkiouak@gmail.com>
1 parent c3e40fb commit 40462a7

4 files changed

Lines changed: 260 additions & 36 deletions

File tree

src/protojure/pedestal/interceptors/grpc.clj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,9 @@
117117
assoc
118118
:headers {"Content-Type" "application/grpc+proto"}
119119
:status 200
120-
:body ""
121-
:trailers (generate-trailers {:grpc-status status :grpc-message msg})))
120+
:body (async/close! (async/chan 1))
121+
:trailers (generate-trailers {:grpc-status status :grpc-message msg})
122+
:grpc-error true))
122123

123124
(def error-interceptor
124125
(err/error-dispatch

src/protojure/pedestal/interceptors/grpc_web.clj

Lines changed: 188 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,12 @@
66
"A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol"
77
(:require [io.pedestal.interceptor :refer [->Interceptor]]
88
[clojure.core.async :as async]
9-
[clojure.data])
10-
(:refer-clojure :exclude [proxy]))
9+
[clojure.data]
10+
[promesa.core :as p]
11+
[clojure.tools.logging :as log])
12+
(:refer-clojure :exclude [proxy])
13+
(:import (org.apache.commons.codec.binary Base64InputStream Base64OutputStream)
14+
(java.io PipedOutputStream PipedInputStream)))
1115

1216
(set! *warn-on-reflection* true)
1317

@@ -29,39 +33,182 @@
2933
[{:keys [body-ch] :as request}]
3034
(let [dec-ch (async/chan 4056)
3135
decoder (java.util.Base64/getDecoder)]
32-
(async/go-loop [[final encoded] (async/<! (read-n body-ch 4))]
33-
(if (and (empty? encoded) final)
34-
(async/close! dec-ch)
35-
(do
36-
(doseq [b (.decode decoder (byte-array encoded))]
37-
(async/>! dec-ch b))
38-
(recur (async/<! (read-n body-ch 4))))))
39-
(assoc request :body-ch dec-ch)))
36+
(let [b64-decode-error-promise
37+
(p/promise (fn [resolve reject]
38+
(when-not (instance? clojure.core.async.impl.channels.ManyToManyChannel body-ch)
39+
(log/error "grpc-web/proxy did not receive an appropriate body-ch")
40+
(async/close! dec-ch)
41+
(log/error "grpc-web interceptor encountered an unexpected body type on-leave")
42+
(resolve (ex-info "grpc-web interceptor encountered an unexpected body type on-leave"
43+
{:causes #{:incompatible-body-ch-value-type}
44+
:body-ch-value-type (type body-ch)})))
45+
(async/go-loop [[final encoded] (async/<! (read-n body-ch 4))]
46+
(if (and (empty? encoded) final)
47+
(do
48+
(resolve nil)
49+
(async/close! dec-ch))
50+
(do
51+
(try
52+
(doseq [b (.decode decoder (byte-array encoded))]
53+
(async/>! dec-ch b))
54+
(catch Exception e
55+
(async/close! dec-ch)
56+
(resolve e)))
57+
(recur (async/<! (read-n body-ch 4))))))))]
58+
(-> (assoc request :body-ch dec-ch)
59+
(assoc :b64-decode-error-promise b64-decode-error-promise)))))
60+
61+
(defn- num->bytes
62+
"Serializes an integer to a byte-array."
63+
[num]
64+
(byte-array (for [i (range 4)]
65+
(-> (unsigned-bit-shift-right num
66+
(* 8 (- 4 i 1)))
67+
(bit-and 0x0FF)))))
68+
69+
(defn- make-grpc-web-trailers-string [trailers]
70+
(reduce (fn [s [k v]]
71+
(str s k ":" v "\r\n")) "" trailers))
72+
73+
(defn- make-grpc-web-trailers-frame [trailers]
74+
"This is the lightly documented handling of trailers from
75+
https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2
76+
See section beginning `Message framing (vs. http2-transport-mapping`"
77+
(let [trailer-bytes (.getBytes ^String (make-grpc-web-trailers-string trailers))]
78+
(byte-array
79+
(concat
80+
[0x80]
81+
(into [] (num->bytes (count trailer-bytes)))
82+
(into [] trailer-bytes)))))
83+
84+
(defn- generate-trailers
85+
[b64-ex]
86+
(let [b64-ex (bean b64-ex)
87+
{:keys [grpc-status grpc-message]} (cond (=
88+
(:message b64-ex)
89+
"Input byte[] should at least have 2 bytes for base64 bytes")
90+
{:grpc-status 3 :grpc-message "Bad Base64 Encoded Request"}
91+
true {:grpc-status 13 :grpc-message "Internal Error"})]
92+
(-> {"grpc-status" grpc-status}
93+
(cond-> (some? grpc-message) (assoc "grpc-message" grpc-message)))))
94+
95+
(defmulti encode-web-body "Writes trailers to body per grpc-web specification"
96+
(fn [x] (type (-> x :response :body))))
97+
98+
(defmethod encode-web-body clojure.core.async.impl.channels.ManyToManyChannel
99+
[{{:keys [body trailers]} :response :as ctx}]
100+
(let [body-w-trailers (async/chan 256)]
101+
(async/go-loop [s (async/<! body)]
102+
(if (not s)
103+
;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g.,
104+
;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text
105+
;; case
106+
(let [trailers (async/<! trailers)
107+
frame (make-grpc-web-trailers-frame trailers)]
108+
;;Write trailer frame
109+
(async/>! body-w-trailers ^bytes frame)
110+
(async/close! body-w-trailers))
111+
(do
112+
(async/>! body-w-trailers ^bytes s)
113+
(recur (async/<! body)))))
114+
(-> (assoc-in ctx [:response :body] body-w-trailers)
115+
(update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"})))))
116+
117+
(defmethod encode-web-body nil
118+
[{{:keys [trailers] :as response} :response :as ctx}]
119+
;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g.,
120+
;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text
121+
;; case
122+
(let [body-w-trailers (async/chan 256)]
123+
;;Write trailer frame
124+
(async/go []
125+
(let [frame (make-grpc-web-trailers-frame trailers)]
126+
(async/>! body-w-trailers ^bytes frame))
127+
(async/close! body-w-trailers))
128+
(-> (assoc-in ctx [:response :body] body-w-trailers)
129+
(update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"})))))
40130

41-
(defn- encode-body
42-
"Consumes bytes from the response body channel and base64 encodes the payload"
131+
(defmethod encode-web-body :default
43132
[{{:keys [body] :as response} :response :as ctx}]
44-
(let [encoder (java.util.Base64/getEncoder)
45-
out-ch (async/chan 4056)]
133+
(throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave"
134+
{:causes #{:incompatible-body-value-type}
135+
:body-value-type (type body)})))
136+
137+
(defmulti encode-web-text-body "Consumes bytes from the response body and base64 encodes the payload"
138+
(fn [x] (type (-> x :response :body))))
139+
140+
(defmethod encode-web-text-body clojure.core.async.impl.channels.ManyToManyChannel
141+
[{{:keys [body trailers]} :response {:keys [b64-decode-error-promise]} :request :as ctx}]
142+
(let [pos (PipedOutputStream.)
143+
pis (PipedInputStream. pos)
144+
;; N.B. passing a string instead of nil in the last position (the line end) caused no data to send
145+
b64-is (Base64InputStream. pis true -1 nil)]
46146
(async/go-loop [s (async/<! body)]
47147
(if (not s)
48-
(async/close! out-ch)
148+
;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g.,
149+
;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text
150+
;; case
151+
(let [b64-ex @b64-decode-error-promise
152+
trailers (async/<! trailers)
153+
frame (make-grpc-web-trailers-frame (if b64-ex
154+
(generate-trailers b64-ex)
155+
trailers))]
156+
;;Write trailer frame
157+
(.write pos ^bytes frame)
158+
(.flush pos)
159+
(.close pos))
49160
(do
50-
(async/>! out-ch (.encode encoder ^bytes s))
161+
(.write pos ^bytes s)
51162
(recur (async/<! body)))))
52-
(-> (assoc-in ctx [:response :body] out-ch)
163+
(-> (assoc-in ctx [:response :body] b64-is)
53164
(update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"})))))
54165

55-
(def ^{:no-doc true :const true} content-types
166+
(defmethod encode-web-text-body nil
167+
[{{:keys [trailers] :as response} :response {:keys [b64-decode-error-promise]} :request :as ctx}]
168+
;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g.,
169+
;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text
170+
;; case
171+
(let [b64-ex @b64-decode-error-promise
172+
frame (make-grpc-web-trailers-frame (if b64-ex
173+
(generate-trailers b64-ex)
174+
trailers))
175+
pos (PipedOutputStream.)
176+
pis (PipedInputStream. pos)
177+
b64-is (Base64InputStream. pis true -1 nil)]
178+
;;Write trailer frame
179+
(.write pos ^bytes frame)
180+
(.flush pos)
181+
(.close pos)
182+
(-> (assoc-in ctx [:response :body] b64-is)
183+
(update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"})))))
184+
185+
(defmethod encode-web-text-body :default
186+
[{{:keys [body] :as response} :response :as ctx}]
187+
(throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave"
188+
{:causes #{:incompatible-body-value-type}
189+
:body-value-type (type body)})))
190+
191+
(def ^{:no-doc true :const true} content-types-text
56192
#{"application/grpc-web-text"})
57193

194+
(def ^{:no-doc true :const true} content-types-web
195+
#{"application/grpc-web"
196+
"application/grpc-web+proto"})
197+
58198
(defn- web-text?
59199
[{{:strs [content-type]} :headers}]
60-
(contains? content-types content-type))
200+
(contains? content-types-text content-type))
61201

62202
(defn- accept-web-text?
63203
[{{{:strs [accept]} :headers} :request}]
64-
(contains? content-types accept))
204+
(contains? content-types-text accept))
205+
206+
(defn- accept-web?
207+
"The grpc-web js bindings currently set the `Accept:` header to \"*/*\" which complicates handling trailers. We
208+
fallback to relying on the content-type to determine a client is likely a browser and requires special response
209+
content-type handling"
210+
[{{{:strs [content-type]} :headers} :request}]
211+
(contains? content-types-web content-type))
65212

66213
(defn- pred->
67214
"Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'"
@@ -74,8 +221,8 @@
74221

75222
(defn- leave-handler
76223
[{:keys [response] :as ctx}]
77-
;; TODO "Clarify & implement grpc-web trailer behavior"
78-
(pred-> ctx accept-web-text? encode-body))
224+
(-> (pred-> ctx accept-web-text? encode-web-text-body)
225+
(pred-> accept-web? encode-web-body)))
79226

80227
(defn- exception-handler
81228
[ctx e]
@@ -84,3 +231,22 @@
84231
(def proxy
85232
"Interceptor that provides a transparent proxy for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol to standard protojure grpc protocol"
86233
(->Interceptor ::proxy enter-handler leave-handler exception-handler))
234+
235+
(defn error-leave-handler [{{:keys [grpc-error]} :response :as ctx}]
236+
(if grpc-error
237+
(-> (pred-> ctx accept-web-text? encode-web-text-body)
238+
(pred-> accept-web? encode-web-body))
239+
ctx))
240+
;;FIXME when HTTP/3 has a grpc specification
241+
;; since we rely on protojure.pedestal.interceptors.grpc/error-interceptor to form the grpc compliant trailers,
242+
;; we expose this error interceptor (and insert in protojure.pedestal.routes/->tablesyntax prior to
243+
;; interceptors.grpc/error-interceptor) so that this interceptor can check for the grpc-web-text accept content type
244+
;; and encode appropriately when an exception is thrown.
245+
;; Once we have a third grpc specification based on transport, better to fix these abstractions such that we have
246+
;; HTTP1.1/HTTP2/HTTP3 based encoding
247+
(def error-interceptor
248+
"Interceptor that writes grpc exception information in a grpc-web compatible encoding"
249+
(->Interceptor ::grpc-web-error
250+
identity
251+
error-leave-handler
252+
exception-handler))

src/protojure/pedestal/routes.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
name (keyword fqs (str method "-handler"))
3737
handler (handler name (partial method-fn callback-context))]
3838
[(str "/" fqs "/" method)
39-
:post (-> (consv grpc/error-interceptor interceptors)
39+
:post (-> (vec (concat [grpc.web/error-interceptor grpc/error-interceptor] interceptors))
4040
(conj grpc.web/proxy
4141
(grpc/route-interceptor rpc)
4242
handler))

test/protojure/grpc_web_test.clj

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,18 @@
2626
[protojure.pedestal.routes :as pedestal.routes]
2727
[example.hello :refer [new-HelloRequest pb->HelloReply]]
2828
[clj-http.client :as client]
29-
[protojure.grpc.codec.lpm :as lpm]))
29+
[protojure.grpc.codec.lpm :as lpm]
30+
[protojure.test.grpc.TestService.server :as test.server]
31+
[protojure.pedestal.interceptors.grpc-web :as grpc.web]
32+
[taoensso.timbre.appenders.core :as appenders]
33+
[taoensso.timbre :as log]
34+
[taoensso.timbre.tools.logging :refer [use-timbre]]))
35+
36+
(use-timbre)
37+
38+
(log/set-config! {:level :trace
39+
:ns-whitelist ["protojure.*"]
40+
:appenders {:println (appenders/println-appender {:stream :auto})}})
3041

3142
(defonce test-env (atom {}))
3243

@@ -72,6 +83,21 @@
7283
:interceptors interceptors
7384
:callback-context (Greeter.)}))
7485

86+
;;-----------------------------------------------------------------------------
87+
;; TestService service endpoint
88+
;;-----------------------------------------------------------------------------
89+
90+
(deftype TestService []
91+
test.server/Service
92+
(Metadata
93+
[_ request]
94+
(throw (Exception. "foobar"))))
95+
96+
(defn- test-service-mock-routes [interceptors]
97+
(pedestal.routes/->tablesyntax {:rpc-metadata test.server/rpc-metadata
98+
:interceptors interceptors
99+
:callback-context (TestService.)}))
100+
75101
(defn- grpc-connect
76102
([] (grpc-connect (:port @test-env)))
77103
([port]
@@ -85,7 +111,9 @@
85111
interceptors [(body-params/body-params)
86112
pedestal/html-body]
87113
server-params {:env :prod
88-
::pedestal/routes (into #{} (greeter-mock-routes interceptors))
114+
::pedestal/routes (into #{} (concat
115+
(greeter-mock-routes interceptors)
116+
(test-service-mock-routes interceptors)))
89117
::pedestal/port port
90118

91119
::pedestal/type protojure.pedestal/config
@@ -122,14 +150,43 @@
122150
(let [lpm (async/<!! (async/into [] out))
123151
b64-encoded (-> (java.util.Base64/getEncoder)
124152
(.encode (byte-array lpm)))
125-
body (-> (java.util.Base64/getDecoder)
126-
(.decode (-> (client/post
127-
(str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello")
128-
{:body b64-encoded
129-
:content-type "application/grpc-web-text"
130-
:accept "application/grpc-web-text"})
131-
:body)))]
132-
(doseq [b body]
153+
body (-> (client/post
154+
(str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello")
155+
{:body b64-encoded
156+
:content-type "application/grpc-web-text"
157+
:accept "application/grpc-web-text"})
158+
:body)
159+
decoded-body (.decode (java.util.Base64/getDecoder) body)]
160+
(doseq [b (into [] decoded-body)]
133161
(async/>!! resp-in b))
134162
(async/close! resp-in)
135-
(is (= "Hello, World" (:message (async/<!! resp-out))))))))
163+
(is (= "Hello, World" (:message (async/<!! resp-out))))))))
164+
165+
(deftest grpc-web-exception-check
166+
(testing "Check that wff grpc-web trailers are received when a handler throws an exception"
167+
(let [resp (client/post
168+
(str "http://localhost:" (:port @test-env) "/protojure.test.grpc.TestService/Metadata")
169+
{:content-type "application/grpc-web-text"
170+
:accept "application/grpc-web-text"})]
171+
(is (= "gAAAAHxncnBjLXN0YXR1czoxMw0KZ3JwYy1tZXNzYWdlOmphdmEubGFuZy5FeGNlcHRpb24gaW4gSW50ZXJjZXB0b3IgOnByb3RvanVyZS50ZXN0LmdycGMuVGVzdFNlcnZpY2UvTWV0YWRhdGEtaGFuZGxlciAtIGZvb2Jhcg0K"
172+
(:body resp))))))
173+
174+
(deftest grpc-web-bad-request-encoding-check
175+
(testing "Check that wff grpc-web trailers are received when a handler throws an exception"
176+
(let [resp (client/post
177+
(str "http://localhost:" (:port @test-env) "/protojure.test.grpc.TestService/Metadata")
178+
{:content-type "application/grpc-web-text"
179+
:accept "application/grpc-web-text"
180+
:body (byte-array [1])})]
181+
(is (= "gAAAADhncnBjLXN0YXR1czozDQpncnBjLW1lc3NhZ2U6QmFkIEJhc2U2NCBFbmNvZGVkIFJlcXVlc3QNCg=="
182+
(:body resp))))))
183+
184+
(deftest grpc-web-bad-request-encoding-check
185+
(testing "Check that wff grpc-web trailers are received when a handler throws an exception"
186+
(let [resp (client/post
187+
(str "http://localhost:" (:port @test-env) "/protojure.test.grpc.TestService/Metadata")
188+
{:content-type "application/grpc-web"
189+
:accept "application/grpc-web"
190+
:body (byte-array [1])})]
191+
(is (clojure.string/includes?
192+
(:body resp) "grpc-status:13\r")))))

0 commit comments

Comments
 (0)