|
6 | 6 | "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol" |
7 | 7 | (:require [io.pedestal.interceptor :refer [->Interceptor]] |
8 | 8 | [clojure.core.async :as async] |
9 | | - [clojure.data]) |
10 | | - (:refer-clojure :exclude [proxy])) |
| 9 | + [clojure.data] |
| 10 | + [promesa.core :as p] |
| 11 | + [clojure.tools.logging :as log]) |
| 12 | + (:refer-clojure :exclude [proxy]) |
| 13 | + (:import (org.apache.commons.codec.binary Base64InputStream Base64OutputStream) |
| 14 | + (java.io PipedOutputStream PipedInputStream))) |
11 | 15 |
|
12 | 16 | (set! *warn-on-reflection* true) |
13 | 17 |
|
|
29 | 33 | [{:keys [body-ch] :as request}] |
30 | 34 | (let [dec-ch (async/chan 4056) |
31 | 35 | decoder (java.util.Base64/getDecoder)] |
32 | | - (async/go-loop [[final encoded] (async/<! (read-n body-ch 4))] |
33 | | - (if (and (empty? encoded) final) |
34 | | - (async/close! dec-ch) |
| 36 | + (let [b64-decode-error-promise |
| 37 | + (p/promise (fn [resolve reject] |
| 38 | + (when-not (instance? clojure.core.async.impl.channels.ManyToManyChannel body-ch) |
| 39 | + (log/error "grpc-web/proxy did not receive an appropriate body-ch") |
| 40 | + (async/close! dec-ch) |
| 41 | + (log/error "grpc-web interceptor encountered an unexpected body type on-leave") |
| 42 | + (resolve (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" |
| 43 | + {:causes #{:incompatible-body-ch-value-type} |
| 44 | + :body-ch-value-type (type body-ch)}))) |
| 45 | + (async/go-loop [[final encoded] (async/<! (read-n body-ch 4))] |
| 46 | + (if (and (empty? encoded) final) |
| 47 | + (do |
| 48 | + (resolve nil) |
| 49 | + (async/close! dec-ch)) |
| 50 | + (do |
| 51 | + (try |
| 52 | + (doseq [b (.decode decoder (byte-array encoded))] |
| 53 | + (async/>! dec-ch b)) |
| 54 | + (catch Exception e |
| 55 | + (async/close! dec-ch) |
| 56 | + (resolve e))) |
| 57 | + (recur (async/<! (read-n body-ch 4))))))))] |
| 58 | + (-> (assoc request :body-ch dec-ch) |
| 59 | + (assoc :b64-decode-error-promise b64-decode-error-promise))))) |
| 60 | + |
| 61 | +(defn- num->bytes |
| 62 | + "Serializes an integer to a byte-array." |
| 63 | + [num] |
| 64 | + (byte-array (for [i (range 4)] |
| 65 | + (-> (unsigned-bit-shift-right num |
| 66 | + (* 8 (- 4 i 1))) |
| 67 | + (bit-and 0x0FF))))) |
| 68 | + |
| 69 | +(defn- make-grpc-web-trailers-string [trailers] |
| 70 | + (reduce (fn [s [k v]] |
| 71 | + (str s k ":" v "\r\n")) "" trailers)) |
| 72 | + |
| 73 | +(defn- make-grpc-web-trailers-frame [trailers] |
| 74 | + "This is the lightly documented handling of trailers from |
| 75 | + https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2 |
| 76 | + See section beginning `Message framing (vs. http2-transport-mapping`" |
| 77 | + (let [trailer-bytes (.getBytes ^String (make-grpc-web-trailers-string trailers))] |
| 78 | + (byte-array |
| 79 | + (concat |
| 80 | + [0x80] |
| 81 | + (into [] (num->bytes (count trailer-bytes))) |
| 82 | + (into [] trailer-bytes))))) |
| 83 | + |
| 84 | +(defn- generate-trailers |
| 85 | + [b64-ex] |
| 86 | + (let [b64-ex (bean b64-ex) |
| 87 | + {:keys [grpc-status grpc-message]} (cond (= |
| 88 | + (:message b64-ex) |
| 89 | + "Input byte[] should at least have 2 bytes for base64 bytes") |
| 90 | + {:grpc-status 3 :grpc-message "Bad Base64 Encoded Request"} |
| 91 | + true {:grpc-status 13 :grpc-message "Internal Error"})] |
| 92 | + (-> {"grpc-status" grpc-status} |
| 93 | + (cond-> (some? grpc-message) (assoc "grpc-message" grpc-message))))) |
| 94 | + |
| 95 | +(defmulti encode-web-body "Writes trailers to body per grpc-web specification" |
| 96 | + (fn [x] (type (-> x :response :body)))) |
| 97 | + |
| 98 | +(defmethod encode-web-body clojure.core.async.impl.channels.ManyToManyChannel |
| 99 | + [{{:keys [body trailers]} :response :as ctx}] |
| 100 | + (let [body-w-trailers (async/chan 256)] |
| 101 | + (async/go-loop [s (async/<! body)] |
| 102 | + (if (not s) |
| 103 | + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., |
| 104 | + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text |
| 105 | + ;; case |
| 106 | + (let [trailers (async/<! trailers) |
| 107 | + frame (make-grpc-web-trailers-frame trailers)] |
| 108 | + ;;Write trailer frame |
| 109 | + (async/>! body-w-trailers ^bytes frame) |
| 110 | + (async/close! body-w-trailers)) |
35 | 111 | (do |
36 | | - (doseq [b (.decode decoder (byte-array encoded))] |
37 | | - (async/>! dec-ch b)) |
38 | | - (recur (async/<! (read-n body-ch 4)))))) |
39 | | - (assoc request :body-ch dec-ch))) |
| 112 | + (async/>! body-w-trailers ^bytes s) |
| 113 | + (recur (async/<! body))))) |
| 114 | + (-> (assoc-in ctx [:response :body] body-w-trailers) |
| 115 | + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"}))))) |
| 116 | + |
| 117 | +(defmethod encode-web-body nil |
| 118 | + [{{:keys [trailers] :as response} :response :as ctx}] |
| 119 | + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., |
| 120 | + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text |
| 121 | + ;; case |
| 122 | + (let [body-w-trailers (async/chan 256)] |
| 123 | + ;;Write trailer frame |
| 124 | + (async/go [] |
| 125 | + (let [frame (make-grpc-web-trailers-frame trailers)] |
| 126 | + (async/>! body-w-trailers ^bytes frame)) |
| 127 | + (async/close! body-w-trailers)) |
| 128 | + (-> (assoc-in ctx [:response :body] body-w-trailers) |
| 129 | + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"}))))) |
40 | 130 |
|
41 | | -(defn- encode-body |
42 | | - "Consumes bytes from the response body channel and base64 encodes the payload" |
| 131 | +(defmethod encode-web-body :default |
43 | 132 | [{{:keys [body] :as response} :response :as ctx}] |
44 | | - (let [encoder (java.util.Base64/getEncoder) |
45 | | - out-ch (async/chan 4056)] |
| 133 | + (throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" |
| 134 | + {:causes #{:incompatible-body-value-type} |
| 135 | + :body-value-type (type body)}))) |
| 136 | + |
| 137 | +(defmulti encode-web-text-body "Consumes bytes from the response body and base64 encodes the payload" |
| 138 | + (fn [x] (type (-> x :response :body)))) |
| 139 | + |
| 140 | +(defmethod encode-web-text-body clojure.core.async.impl.channels.ManyToManyChannel |
| 141 | + [{{:keys [body trailers]} :response {:keys [b64-decode-error-promise]} :request :as ctx}] |
| 142 | + (let [pos (PipedOutputStream.) |
| 143 | + pis (PipedInputStream. pos) |
| 144 | + ;; N.B. passing a string instead of nil in the last position (the line end) caused no data to send |
| 145 | + b64-is (Base64InputStream. pis true -1 nil)] |
46 | 146 | (async/go-loop [s (async/<! body)] |
47 | 147 | (if (not s) |
48 | | - (async/close! out-ch) |
| 148 | + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., |
| 149 | + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text |
| 150 | + ;; case |
| 151 | + (let [b64-ex @b64-decode-error-promise |
| 152 | + trailers (async/<! trailers) |
| 153 | + frame (make-grpc-web-trailers-frame (if b64-ex |
| 154 | + (generate-trailers b64-ex) |
| 155 | + trailers))] |
| 156 | + ;;Write trailer frame |
| 157 | + (.write pos ^bytes frame) |
| 158 | + (.flush pos) |
| 159 | + (.close pos)) |
49 | 160 | (do |
50 | | - (async/>! out-ch (.encode encoder ^bytes s)) |
| 161 | + (.write pos ^bytes s) |
51 | 162 | (recur (async/<! body))))) |
52 | | - (-> (assoc-in ctx [:response :body] out-ch) |
| 163 | + (-> (assoc-in ctx [:response :body] b64-is) |
53 | 164 | (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) |
54 | 165 |
|
55 | | -(def ^{:no-doc true :const true} content-types |
| 166 | +(defmethod encode-web-text-body nil |
| 167 | + [{{:keys [trailers] :as response} :response {:keys [b64-decode-error-promise]} :request :as ctx}] |
| 168 | + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., |
| 169 | + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text |
| 170 | + ;; case |
| 171 | + (let [b64-ex @b64-decode-error-promise |
| 172 | + frame (make-grpc-web-trailers-frame (if b64-ex |
| 173 | + (generate-trailers b64-ex) |
| 174 | + trailers)) |
| 175 | + pos (PipedOutputStream.) |
| 176 | + pis (PipedInputStream. pos) |
| 177 | + b64-is (Base64InputStream. pis true -1 nil)] |
| 178 | + ;;Write trailer frame |
| 179 | + (.write pos ^bytes frame) |
| 180 | + (.flush pos) |
| 181 | + (.close pos) |
| 182 | + (-> (assoc-in ctx [:response :body] b64-is) |
| 183 | + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) |
| 184 | + |
| 185 | +(defmethod encode-web-text-body :default |
| 186 | + [{{:keys [body] :as response} :response :as ctx}] |
| 187 | + (throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" |
| 188 | + {:causes #{:incompatible-body-value-type} |
| 189 | + :body-value-type (type body)}))) |
| 190 | + |
| 191 | +(def ^{:no-doc true :const true} content-types-text |
56 | 192 | #{"application/grpc-web-text"}) |
57 | 193 |
|
| 194 | +(def ^{:no-doc true :const true} content-types-web |
| 195 | + #{"application/grpc-web" |
| 196 | + "application/grpc-web+proto"}) |
| 197 | + |
58 | 198 | (defn- web-text? |
59 | 199 | [{{:strs [content-type]} :headers}] |
60 | | - (contains? content-types content-type)) |
| 200 | + (contains? content-types-text content-type)) |
61 | 201 |
|
62 | 202 | (defn- accept-web-text? |
63 | 203 | [{{{:strs [accept]} :headers} :request}] |
64 | | - (contains? content-types accept)) |
| 204 | + (contains? content-types-text accept)) |
| 205 | + |
| 206 | +(defn- accept-web? |
| 207 | + "The grpc-web js bindings currently set the `Accept:` header to \"*/*\" which complicates handling trailers. We |
| 208 | + fallback to relying on the content-type to determine a client is likely a browser and requires special response |
| 209 | + content-type handling" |
| 210 | + [{{{:strs [content-type]} :headers} :request}] |
| 211 | + (contains? content-types-web content-type)) |
65 | 212 |
|
66 | 213 | (defn- pred-> |
67 | 214 | "Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'" |
|
74 | 221 |
|
75 | 222 | (defn- leave-handler |
76 | 223 | [{:keys [response] :as ctx}] |
77 | | - ;; TODO "Clarify & implement grpc-web trailer behavior" |
78 | | - (pred-> ctx accept-web-text? encode-body)) |
| 224 | + (-> (pred-> ctx accept-web-text? encode-web-text-body) |
| 225 | + (pred-> accept-web? encode-web-body))) |
79 | 226 |
|
80 | 227 | (defn- exception-handler |
81 | 228 | [ctx e] |
|
84 | 231 | (def proxy |
85 | 232 | "Interceptor that provides a transparent proxy for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol to standard protojure grpc protocol" |
86 | 233 | (->Interceptor ::proxy enter-handler leave-handler exception-handler)) |
| 234 | + |
| 235 | +(defn error-leave-handler [{{:keys [grpc-error]} :response :as ctx}] |
| 236 | + (if grpc-error |
| 237 | + (-> (pred-> ctx accept-web-text? encode-web-text-body) |
| 238 | + (pred-> accept-web? encode-web-body)) |
| 239 | + ctx)) |
| 240 | +;;FIXME when HTTP/3 has a grpc specification |
| 241 | +;; since we rely on protojure.pedestal.interceptors.grpc/error-interceptor to form the grpc compliant trailers, |
| 242 | +;; we expose this error interceptor (and insert in protojure.pedestal.routes/->tablesyntax prior to |
| 243 | +;; interceptors.grpc/error-interceptor) so that this interceptor can check for the grpc-web-text accept content type |
| 244 | +;; and encode appropriately when an exception is thrown. |
| 245 | +;; Once we have a third grpc specification based on transport, better to fix these abstractions such that we have |
| 246 | +;; HTTP1.1/HTTP2/HTTP3 based encoding |
| 247 | +(def error-interceptor |
| 248 | + "Interceptor that writes grpc exception information in a grpc-web compatible encoding" |
| 249 | + (->Interceptor ::grpc-web-error |
| 250 | + identity |
| 251 | + error-leave-handler |
| 252 | + exception-handler)) |
0 commit comments