Skip to content

Commit c3e40fb

Browse files
committed
Update support for current application/grpc-web-text, grpc-web js
behavior Signed-off-by: Matt Rkiouak <mrkiouak@gmail.com>
1 parent 93bc8da commit c3e40fb

3 files changed

Lines changed: 176 additions & 76 deletions

File tree

src/protojure/pedestal/interceptors/grpc_web.clj

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,65 @@
44

55
(ns protojure.pedestal.interceptors.grpc-web
66
"A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol"
7-
(:require [io.pedestal.interceptor :refer [->Interceptor]])
8-
(:import (org.apache.commons.codec.binary Base64InputStream))
7+
(:require [io.pedestal.interceptor :refer [->Interceptor]]
8+
[clojure.core.async :as async]
9+
[clojure.data])
910
(:refer-clojure :exclude [proxy]))
1011

1112
(set! *warn-on-reflection* true)
1213

14+
(defn read-n [from-chan n]
15+
"Convenience method for consuming n [n] or less values from a channel [from-chan]"
16+
(async/go-loop [res []]
17+
(let [v (async/<! from-chan)]
18+
(if (nil? v)
19+
[true res]
20+
(let [new-res (conj res v)]
21+
(if (= n (count new-res))
22+
[false new-res]
23+
(recur new-res)))))))
24+
1325
(defn- decode-body
14-
[{:keys [body] :as request}]
15-
(assoc request :body (Base64InputStream. body)))
26+
"Consumes 4 base64 encoded bytes from the body, and writes to a new channel that replaces the prior :body-ch on the
27+
request. Note that any consumption from the :body inputstream that is not followed by a write to the :body-ch key
28+
results in that data being lost to the grpc interceptor :grpc-params/:body-ch key."
29+
[{:keys [body-ch] :as request}]
30+
(let [dec-ch (async/chan 4056)
31+
decoder (java.util.Base64/getDecoder)]
32+
(async/go-loop [[final encoded] (async/<! (read-n body-ch 4))]
33+
(if (and (empty? encoded) final)
34+
(async/close! dec-ch)
35+
(do
36+
(doseq [b (.decode decoder (byte-array encoded))]
37+
(async/>! dec-ch b))
38+
(recur (async/<! (read-n body-ch 4))))))
39+
(assoc request :body-ch dec-ch)))
40+
41+
(defn- encode-body
42+
"Consumes bytes from the response body channel and base64 encodes the payload"
43+
[{{:keys [body] :as response} :response :as ctx}]
44+
(let [encoder (java.util.Base64/getEncoder)
45+
out-ch (async/chan 4056)]
46+
(async/go-loop [s (async/<! body)]
47+
(if (not s)
48+
(async/close! out-ch)
49+
(do
50+
(async/>! out-ch (.encode encoder ^bytes s))
51+
(recur (async/<! body)))))
52+
(-> (assoc-in ctx [:response :body] out-ch)
53+
(update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"})))))
1654

1755
(def ^{:no-doc true :const true} content-types
18-
#{"application/grpc-web-text"
19-
"application/grpc-web-text+proto"})
56+
#{"application/grpc-web-text"})
2057

2158
(defn- web-text?
2259
[{{:strs [content-type]} :headers}]
2360
(contains? content-types content-type))
2461

62+
(defn- accept-web-text?
63+
[{{{:strs [accept]} :headers} :request}]
64+
(contains? content-types accept))
65+
2566
(defn- pred->
2667
"Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'"
2768
[item pred xform]
@@ -32,9 +73,9 @@
3273
(assoc ctx :request (pred-> request web-text? decode-body)))
3374

3475
(defn- leave-handler
35-
[ctx]
76+
[{:keys [response] :as ctx}]
3677
;; TODO "Clarify & implement grpc-web trailer behavior"
37-
ctx)
78+
(pred-> ctx accept-web-text? encode-body))
3879

3980
(defn- exception-handler
4081
[ctx e]

test/protojure/grpc_web_test.clj

Lines changed: 113 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -11,80 +11,125 @@
1111
[io.pedestal.http.body-params :as body-params]
1212
[example.types :as example]
1313
[protojure.protobuf :as pb]
14-
[clojure.data.codec.base64 :as b64]))
14+
[clojure.data.codec.base64 :as b64]
15+
[protojure.pedestal.interceptors.grpc :as grpc]
16+
[clojure.core.async :as async]
17+
[example.hello.Greeter :as greeter]
18+
[clojure.core.async :refer [<!! >!! <! >! go go-loop] :as async]
19+
[protojure.test.utils :as test.utils]
20+
[protojure.grpc.client.api :as grpc-api]
21+
[protojure.grpc.client.providers.http2 :as grpc.http2]
22+
[protojure.grpc.client.utils :as client.utils]
23+
[promesa.core :as p]
24+
[protojure.internal.grpc.client.providers.http2.jetty :as jetty-client]
25+
[protojure.grpc.status :as grpc.status]
26+
[protojure.pedestal.routes :as pedestal.routes]
27+
[example.hello :refer [new-HelloRequest pb->HelloReply]]
28+
[clj-http.client :as client]
29+
[protojure.grpc.codec.lpm :as lpm]))
1530

16-
(defn grpc-echo [{:keys [body] :as request}]
17-
{:status 200
18-
:body (example/pb->Money body)
19-
:trailers {"grpc-status" 0 "grpc-message" "Got it!"}})
31+
(defonce test-env (atom {}))
2032

21-
(def interceptors [(body-params/body-params)
22-
grpc-web/proxy])
33+
;;-----------------------------------------------------------------------------
34+
;; "Greeter" service endpoint
35+
;;-----------------------------------------------------------------------------
36+
(deftype Greeter []
37+
greeter/Service
38+
(SayHello
39+
[this {{:keys [name]} :grpc-params :as request}]
40+
{:status 200
41+
:body {:message (str "Hello, " name)}})
42+
(SayRepeatHello
43+
[this {{:keys [name]} :grpc-params :as request}]
44+
(let [resp-chan (:grpc-out request)]
45+
(go
46+
(dotimes [_ 3]
47+
(>! resp-chan {:message (str "Hello, " name)}))
48+
(async/close! resp-chan))
49+
{:status 200
50+
:body resp-chan}))
51+
(SayHelloOnDemand
52+
[this {:keys [grpc-params] :as request}]
53+
(let [out-chan (:grpc-out request)]
54+
(go-loop [name (:name (<! grpc-params))]
55+
(if name
56+
(do
57+
(>! out-chan {:message (str "Hello, " name)})
58+
(recur (:name (<! grpc-params))))
59+
(async/close! out-chan)))
60+
{:status 200
61+
:body out-chan}))
62+
(SayHelloError
63+
[this req]
64+
{:status 200
65+
:body "This isn't a protobuf message"})
66+
(SayNil
67+
[this req]
68+
(grpc.status/error :unauthenticated)))
2369

24-
(def routes [["/" :get (conj interceptors `grpc-echo)]])
70+
(defn- greeter-mock-routes [interceptors]
71+
(pedestal.routes/->tablesyntax {:rpc-metadata greeter/rpc-metadata
72+
:interceptors interceptors
73+
:callback-context (Greeter.)}))
2574

26-
(def service (let [service-params {:env :prod
27-
::pedestal/routes (into #{} routes)
28-
::pedestal/type protojure.pedestal/config
29-
::pedestal/chain-provider protojure.pedestal/provider}]
30-
(:io.pedestal.http/service-fn (io.pedestal.http/create-servlet service-params))))
75+
(defn- grpc-connect
76+
([] (grpc-connect (:port @test-env)))
77+
([port]
78+
@(grpc.http2/connect {:uri (str "http://localhost:" port) :content-coding "gzip"})))
3179

32-
(deftest grpc-web-text-check
33-
(testing "Check that a round-trip GRPC request works"
34-
(let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))]
35-
(is
36-
(=
37-
(with-out-str (pr (example/pb->Money input-msg)))
38-
(:body (response-for
39-
service
40-
:get "/"
41-
:headers {"Content-Type" "application/grpc-web-text"}
42-
:body (clojure.java.io/input-stream (b64/encode input-msg)))))))))
80+
;;-----------------------------------------------------------------------------
81+
;; Fixtures
82+
;;-----------------------------------------------------------------------------
83+
(defn create-service []
84+
(let [port (test.utils/get-free-port)
85+
interceptors [(body-params/body-params)
86+
pedestal/html-body]
87+
server-params {:env :prod
88+
::pedestal/routes (into #{} (greeter-mock-routes interceptors))
89+
::pedestal/port port
4390

44-
(deftest grpc-web-check
45-
(testing "Check that a round-trip GRPC request works"
46-
(let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))]
47-
(is
48-
(=
49-
(with-out-str (pr (example/pb->Money input-msg)))
50-
(:body (response-for
51-
service
52-
:get "/"
53-
:headers {"Content-Type" "application/grpc-web"}
54-
:body (clojure.java.io/input-stream input-msg))))))))
91+
::pedestal/type protojure.pedestal/config
92+
::pedestal/chain-provider protojure.pedestal/provider}
93+
client-params {:port port}]
5594

56-
(deftest grpc-web-proto-check
57-
(testing "Check that a round-trip GRPC request works"
58-
(let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))]
59-
(is
60-
(=
61-
(with-out-str (pr (example/pb->Money input-msg)))
62-
(:body (response-for
63-
service
64-
:get "/"
65-
:headers {"Content-Type" "application/grpc-web+proto"}
66-
:body (clojure.java.io/input-stream input-msg))))))))
95+
(let [server (test.utils/start-pedestal-server server-params)
96+
client @(jetty-client/connect client-params)
97+
grpc-client (grpc-connect port)]
98+
(swap! test-env assoc :port port :server server :client client :grpc-client grpc-client))))
6799

68-
(deftest grpc-web-text-proto-check
69-
(testing "Check that a round-trip GRPC request works"
70-
(let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))]
71-
(is
72-
(=
73-
(with-out-str (pr (example/pb->Money input-msg)))
74-
(:body (response-for
75-
service
76-
:get "/"
77-
:headers {"Content-Type" "application/grpc-web-text+proto"}
78-
:body (clojure.java.io/input-stream (b64/encode input-msg)))))))))
100+
(defn destroy-service []
101+
(swap! test-env update :grpc-client grpc-api/disconnect)
102+
(swap! test-env update :client jetty-client/disconnect)
103+
(swap! test-env update :server pedestal/stop))
79104

80-
(deftest grpc-web-no-header-match-check
81-
(testing "Check that a round-trip GRPC request works"
82-
(let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))]
83-
(is
84-
(=
85-
(with-out-str (pr (example/pb->Money input-msg)))
86-
(:body (response-for
87-
service
88-
:get "/"
89-
:headers {"Content-Type" "application/grpc"}
90-
:body (clojure.java.io/input-stream input-msg))))))))
105+
(defn wrap-service [test-fn]
106+
(create-service)
107+
(test-fn)
108+
(destroy-service))
109+
110+
(use-fixtures :once wrap-service)
111+
112+
(deftest grpc-web-test-check
113+
(let [in (async/chan 10)
114+
out (async/chan 10)
115+
resp-in (async/chan 10)
116+
resp-out (async/chan 10)]
117+
(lpm/encode new-HelloRequest in out {:encoding identity})
118+
(lpm/decode pb->HelloReply resp-in resp-out {:encoding identity})
119+
(async/>!! in {:name "World"})
120+
(async/close! in)
121+
(testing "Check that a round-trip unary grpc-web-text request works"
122+
(let [lpm (async/<!! (async/into [] out))
123+
b64-encoded (-> (java.util.Base64/getEncoder)
124+
(.encode (byte-array lpm)))
125+
body (-> (java.util.Base64/getDecoder)
126+
(.decode (-> (client/post
127+
(str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello")
128+
{:body b64-encoded
129+
:content-type "application/grpc-web-text"
130+
:accept "application/grpc-web-text"})
131+
:body)))]
132+
(doseq [b body]
133+
(async/>!! resp-in b))
134+
(async/close! resp-in)
135+
(is (= "Hello, World" (:message (async/<!! resp-out))))))))
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
(ns protojure.pedestal.interceptors.grpc-web-test
2+
(:require [clojure.test :refer :all])
3+
(:require [protojure.pedestal.interceptors.grpc-web :refer [read-n]]
4+
[clojure.core.async :as async]))
5+
6+
(deftest read-n-test
7+
(testing "test that read-n reads the appropriate values and returns final on final"
8+
(let [test-ch (async/chan 10)]
9+
(doseq [n [1 2 3 4]]
10+
(async/>!! test-ch n))
11+
(async/close! test-ch)
12+
(let [result (async/<!! (read-n test-ch 3))]
13+
(is (= result [false [1 2 3]]))
14+
(is (= [true [4]] (async/<!! (read-n test-ch 3))))))))

0 commit comments

Comments
 (0)