|
6 | 6 | "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol" |
7 | 7 | (:require [io.pedestal.interceptor :refer [->Interceptor]] |
8 | 8 | [clojure.core.async :as async] |
9 | | - [clojure.data]) |
10 | | - (:refer-clojure :exclude [proxy])) |
| 9 | + [clojure.data] |
| 10 | + [promesa.core :as p]) |
| 11 | + (:refer-clojure :exclude [proxy]) |
| 12 | + (:import (org.apache.commons.codec.binary Base64InputStream Base64OutputStream) |
| 13 | + (java.io PipedOutputStream PipedInputStream))) |
11 | 14 |
|
12 | 15 | (set! *warn-on-reflection* true) |
13 | 16 |
|
|
29 | 32 | [{:keys [body-ch] :as request}] |
30 | 33 | (let [dec-ch (async/chan 4056) |
31 | 34 | decoder (java.util.Base64/getDecoder)] |
32 | | - (async/go-loop [[final encoded] (async/<! (read-n body-ch 4))] |
33 | | - (if (and (empty? encoded) final) |
34 | | - (async/close! dec-ch) |
35 | | - (do |
36 | | - (doseq [b (.decode decoder (byte-array encoded))] |
37 | | - (async/>! dec-ch b)) |
38 | | - (recur (async/<! (read-n body-ch 4)))))) |
39 | | - (assoc request :body-ch dec-ch))) |
| 35 | + (let [b64-decode-error-promise (p/promise (fn [resolve reject] |
| 36 | + (async/go-loop [[final encoded] (async/<! (read-n body-ch 4))] |
| 37 | + (if (and (empty? encoded) final) |
| 38 | + (do |
| 39 | + (resolve nil) |
| 40 | + (async/close! dec-ch)) |
| 41 | + (do |
| 42 | + (try |
| 43 | + (doseq [b (.decode decoder (byte-array encoded))] |
| 44 | + (async/>! dec-ch b)) |
| 45 | + (catch Exception e |
| 46 | + (async/close! dec-ch) |
| 47 | + (reject e))) |
| 48 | + (recur (async/<! (read-n body-ch 4))))))))] |
| 49 | + (-> (assoc request :body-ch dec-ch) |
| 50 | + ;;FIXME the below promise is never checked for an error, but should be prior to responding -- this will present |
| 51 | + ;; as a bug where b64 decode failures aren't reported to the client, rather it will be as if the request body |
| 52 | + ;; was nil |
| 53 | + (assoc :b64-decode-error-promise b64-decode-error-promise))))) |
40 | 54 |
|
41 | | -(defn- encode-body |
42 | | - "Consumes bytes from the response body channel and base64 encodes the payload" |
43 | | - [{{:keys [body] :as response} :response :as ctx}] |
44 | | - (let [encoder (java.util.Base64/getEncoder) |
45 | | - out-ch (async/chan 4056)] |
| 55 | +(defn- num->bytes |
| 56 | + "Serializes an integer to a byte-array." |
| 57 | + [num] |
| 58 | + (byte-array (for [i (range 4)] |
| 59 | + (-> (unsigned-bit-shift-right num |
| 60 | + (* 8 (- 4 i 1))) |
| 61 | + (bit-and 0x0FF))))) |
| 62 | + |
| 63 | +(defn- make-grpc-web-trailers-string [trailers] |
| 64 | + (reduce (fn [s [k v]] |
| 65 | + (str s k ":" v "\r\n")) "" trailers)) |
| 66 | + |
| 67 | +(defn- make-grpc-web-trailers-frame [trailers] |
| 68 | + (let [trailer-bytes (.getBytes ^String (make-grpc-web-trailers-string trailers))] |
| 69 | + (byte-array |
| 70 | + (concat |
| 71 | + [0x80] |
| 72 | + (into [] (num->bytes (count trailer-bytes))) |
| 73 | + (into [] trailer-bytes))))) |
| 74 | + |
| 75 | +(defmulti encode-body "Consumes bytes from the response body and base64 encodes the payload" |
| 76 | + (fn [x] (type (-> x :response :body)))) |
| 77 | + |
| 78 | +(defmethod encode-body clojure.core.async.impl.channels.ManyToManyChannel |
| 79 | + [{{:keys [body trailers] :as response} :response :as ctx}] |
| 80 | + (let [pos (PipedOutputStream.) |
| 81 | + pis (PipedInputStream. pos) |
| 82 | + ;; N.B. passing a string instead of nil in the last position (the line end) caused no data to send |
| 83 | + b64-is (Base64InputStream. pis true -1 nil)] |
46 | 84 | (async/go-loop [s (async/<! body)] |
47 | 85 | (if (not s) |
48 | | - (async/close! out-ch) |
| 86 | + (let [frame (make-grpc-web-trailers-frame (async/<! trailers))] |
| 87 | + ;;Write trailer frame |
| 88 | + (.write pos ^bytes frame) |
| 89 | + (.flush pos) |
| 90 | + (.close pos)) |
49 | 91 | (do |
50 | | - (async/>! out-ch (.encode encoder ^bytes s)) |
| 92 | + (.write pos ^bytes s) |
51 | 93 | (recur (async/<! body))))) |
52 | | - (-> (assoc-in ctx [:response :body] out-ch) |
| 94 | + (-> (assoc-in ctx [:response :body] b64-is) |
53 | 95 | (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) |
54 | 96 |
|
| 97 | +(defmethod encode-body nil |
| 98 | + [{{:keys [trailers] :as response} :response :as ctx}] |
| 99 | + (let [frame (make-grpc-web-trailers-frame trailers) |
| 100 | + pos (PipedOutputStream.) |
| 101 | + pis (PipedInputStream. pos) |
| 102 | + b64-is (Base64InputStream. pis true -1 nil)] |
| 103 | + ;;Write trailer frame |
| 104 | + (.write pos ^bytes frame) |
| 105 | + (.flush pos) |
| 106 | + (.close pos) |
| 107 | + (-> (assoc-in ctx [:response :body] b64-is) |
| 108 | + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) |
| 109 | + |
| 110 | +(defmethod encode-body :default |
| 111 | + [{{:keys [body] :as response} :response :as ctx}] |
| 112 | + (throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" |
| 113 | + {:causes #{:incompatible-body-value-type} |
| 114 | + :body-value-type (type body)}))) |
| 115 | + |
55 | 116 | (def ^{:no-doc true :const true} content-types |
56 | 117 | #{"application/grpc-web-text"}) |
57 | 118 |
|
|
84 | 145 | (def proxy |
85 | 146 | "Interceptor that provides a transparent proxy for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol to standard protojure grpc protocol" |
86 | 147 | (->Interceptor ::proxy enter-handler leave-handler exception-handler)) |
| 148 | + |
| 149 | +(defn error-leave-handler [{{:keys [grpc-error]} :response :as ctx}] |
| 150 | + (if grpc-error |
| 151 | + (pred-> ctx accept-web-text? encode-body) |
| 152 | + ctx)) |
| 153 | +;;FIXME when HTTP/3 has a grpc specification |
| 154 | +;; since we rely on protojure.pedestal.interceptors.grpc/error-interceptor to form the grpc compliant trailers, |
| 155 | +;; we expose this error interceptor (and insert in protojure.pedestal.routes/->tablesyntax prior to |
| 156 | +;; interceptors.grpc/error-interceptor) so that this interceptor can check for the grpc-web-text accept content type |
| 157 | +;; and encode appropriately when an exception is thrown. |
| 158 | +;; Once we have a third grpc specification based on transport, better to fix these abstractions such that we have |
| 159 | +;; HTTP1.1/HTTP2/HTTP3 based encoding |
| 160 | +(def error-interceptor |
| 161 | + "Interceptor that writes grpc exception information in a grpc-web compatible encoding" |
| 162 | + (->Interceptor ::grpc-web-error |
| 163 | + identity |
| 164 | + error-leave-handler |
| 165 | + exception-handler)) |
0 commit comments