-
Notifications
You must be signed in to change notification settings - Fork 58
WebSocket proxy for the grpc-gateway #752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
7cd3254
Squashed commit of the following:
hariso b51b9d0
send scanner error
hariso 596753a
send scanner error
hariso 9895167
Merge branch 'main' into haris/grpc-gateway-websockets
hariso 80c504d
rename
hariso 692a988
read loop
hariso e3a87a1
refactor + ping write loop
hariso 2072f7d
refactor
hariso a16ca05
fix pingWriteLoop
hariso 2ae9538
fixes
hariso eb7e986
comments
hariso b584fcf
remove ping-pong
hariso 600ef2f
Squashed commit of the following:
hariso b030581
Merge branch 'main' into haris/grpc-gateway-websockets
hariso b55a906
remove tests
hariso 47f6221
refactor
hariso b0ff958
fix concurrency
lovromazgon 7d4b410
comments
hariso 9fac27d
test
hariso b5da901
tests
hariso 2959aff
lint
hariso b8f1110
Update pkg/foundation/grpcutil/websocket.go
hariso 2601ad8
Update pkg/foundation/grpcutil/websocket.go
hariso File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,252 @@ | ||
| // Copyright © 2022 Meroxa, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package grpcutil | ||
|
|
||
| import ( | ||
| "bufio" | ||
| "context" | ||
| "io" | ||
| "net/http" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/conduitio/conduit/pkg/foundation/log" | ||
| "github.com/gorilla/websocket" | ||
| ) | ||
|
|
||
| type inMemoryResponseWriter struct { | ||
| io.Writer | ||
| header http.Header | ||
| } | ||
|
|
||
| func newInMemoryResponseWriter(writer io.Writer) *inMemoryResponseWriter { | ||
| return &inMemoryResponseWriter{ | ||
| Writer: writer, | ||
| header: http.Header{}, | ||
| } | ||
| } | ||
|
|
||
| func (w *inMemoryResponseWriter) Write(b []byte) (int, error) { | ||
| return w.Writer.Write(b) | ||
| } | ||
| func (w *inMemoryResponseWriter) Header() http.Header { | ||
| return w.header | ||
| } | ||
| func (w *inMemoryResponseWriter) WriteHeader(int) { | ||
| // we don't have a use for the code | ||
| } | ||
| func (w *inMemoryResponseWriter) Flush() {} | ||
|
|
||
| var ( | ||
| defaultWriteWait = 10 * time.Second | ||
| defaultPongWait = 60 * time.Second | ||
| ) | ||
|
|
||
| // webSocketProxy is a proxy around a http.Handler which | ||
| // redirects the response data from the http.Handler | ||
| // to a WebSocket connection. | ||
| type webSocketProxy struct { | ||
| handler http.Handler | ||
| logger log.CtxLogger | ||
| upgrader websocket.Upgrader | ||
|
|
||
| // Time allowed to write a message to the peer. | ||
| writeWait time.Duration | ||
| // Time allowed to read the next pong message from the peer. | ||
| pongWait time.Duration | ||
| // Send pings to peer with this period. Must be less than pongWait. | ||
| pingPeriod time.Duration | ||
| } | ||
|
|
||
| func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *webSocketProxy { | ||
| proxy := &webSocketProxy{ | ||
| handler: handler, | ||
| logger: logger.WithComponent("grpcutil.webSocketProxy"), | ||
| upgrader: websocket.Upgrader{ | ||
| ReadBufferSize: 1024, | ||
| WriteBufferSize: 1024, | ||
| }, | ||
| writeWait: defaultWriteWait, | ||
| pongWait: defaultPongWait, | ||
| pingPeriod: (defaultPongWait * 9) / 10, | ||
| } | ||
|
|
||
| return proxy | ||
| } | ||
|
|
||
| func (p *webSocketProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||
| if !websocket.IsWebSocketUpgrade(r) { | ||
| p.handler.ServeHTTP(w, r) | ||
| return | ||
| } | ||
| p.proxy(w, r) | ||
| } | ||
|
|
||
| // proxy creates a "pipeline" from the underlying response | ||
| // to a WebSocket connection. The pipeline is constructed in | ||
| // the following way: | ||
| // | ||
| // underlying response | ||
| // -> inMemoryResponseWriter | ||
| // -> scanner | ||
| // -> messages channel | ||
| // -> connection writer | ||
| // | ||
| // In the case of an error due to which we need to abort the request | ||
| // and close the WebSocket connection, we need to cancel the request context | ||
| // and stop writing any data to the WebSocket connection. This will | ||
| // automatically halt all the "pipeline nodes" after the underlying response. | ||
| func (p *webSocketProxy) proxy(w http.ResponseWriter, r *http.Request) { | ||
| ctx, cancelFn := context.WithCancel(r.Context()) | ||
| defer cancelFn() | ||
| r = r.WithContext(ctx) | ||
|
|
||
| // Upgrade connection to WebSocket | ||
| conn, err := p.upgrader.Upgrade(w, r, http.Header{}) | ||
| if err != nil { | ||
| p.logger.Err(ctx, err).Msg("error upgrading websocket") | ||
| return | ||
| } | ||
| defer conn.Close() | ||
|
|
||
| // We use a pipe to read the data being written to the underlying http.Handler | ||
| // and then write it to the WebSocket connection. | ||
| responseR, responseW := io.Pipe() | ||
| response := newInMemoryResponseWriter(responseW) | ||
|
|
||
| // Start the "underlying" http.Handler | ||
| go func() { | ||
| p.handler.ServeHTTP(response, r) | ||
| p.logger.Debug(ctx).Err(ctx.Err()).Msg("closing pipes") | ||
| responseW.CloseWithError(io.EOF) | ||
| }() | ||
|
|
||
| messages := make(chan []byte) | ||
| // startWebSocketRead and startWebSocketWrite need to cancel the context | ||
| // if they encounter an error reading from or writing to the WS connection | ||
| go p.startWebSocketRead(ctx, conn, cancelFn) | ||
| go p.readFromHTTPResponse(ctx, responseR, messages) | ||
| p.startWebSocketWrite(ctx, messages, conn, cancelFn) | ||
| } | ||
|
|
||
| // startWebSocketRead starts a read loop on the proxy's WebSocket connection. | ||
| // The read loop will stop if there's been an error reading a message. | ||
| func (p *webSocketProxy) startWebSocketRead(ctx context.Context, conn *websocket.Conn, onDone func()) { | ||
| defer onDone() | ||
|
|
||
| conn.SetReadLimit(512) | ||
| err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) | ||
| if err != nil { | ||
| p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) | ||
| return | ||
| } | ||
|
|
||
| conn.SetPongHandler(func(string) error { | ||
| err := conn.SetReadDeadline(time.Now().Add(p.pongWait)) | ||
| if err != nil { | ||
| // todo return err? | ||
| p.logger.Warn(ctx).Err(err).Msgf("couldn't set read deadline %v", p.pongWait) | ||
| } | ||
| return nil | ||
| }) | ||
|
|
||
| for { | ||
| // The only use we have for reads right now | ||
| // is for ping, pong and close messages. | ||
| // https://pkg.go.dev/github.com/gorilla/websocket#hdr-Control_Messages | ||
| // Also, a read loop can detect client disconnects much quicker: | ||
| // https://groups.google.com/g/golang-nuts/c/FFzQO26jEoE/m/mYhcsK20EwAJ | ||
| _, _, err := conn.ReadMessage() | ||
| if err != nil { | ||
| if p.isClosedConnErr(err) { | ||
| p.logger.Debug(ctx).Err(err).Msg("closed connection") | ||
| } | ||
|
|
||
| p.logger.Warn(ctx).Err(err).Msg("read error") | ||
| break | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (p *webSocketProxy) isClosedConnErr(err error) bool { | ||
| str := err.Error() | ||
| if strings.Contains(str, "use of closed network connection") { | ||
| return true | ||
| } | ||
| return websocket.IsCloseError( | ||
| err, | ||
| websocket.CloseNormalClosure, | ||
| websocket.CloseGoingAway, | ||
| websocket.CloseAbnormalClosure, | ||
| ) | ||
| } | ||
|
|
||
| func (p *webSocketProxy) readFromHTTPResponse(ctx context.Context, responseReader io.Reader, c chan []byte) { | ||
| defer close(c) | ||
| scanner := bufio.NewScanner(responseReader) | ||
|
|
||
| for scanner.Scan() { | ||
| if len(scanner.Bytes()) == 0 { | ||
| p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan") | ||
| continue | ||
| } | ||
|
|
||
| p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text()) | ||
| c <- scanner.Bytes() | ||
| } | ||
|
|
||
| if sErr := scanner.Err(); sErr != nil { | ||
| p.logger.Err(ctx, sErr).Msg("failed reading data from original response") | ||
| c <- []byte(sErr.Error()) | ||
| } | ||
|
|
||
| p.logger.Debug(ctx).Msg("scanner reached end of input data") | ||
| } | ||
|
|
||
| func (p *webSocketProxy) startWebSocketWrite(ctx context.Context, messages chan []byte, conn *websocket.Conn, cancelFn func()) { | ||
| ticker := time.NewTicker(p.pingPeriod) | ||
| defer func() { | ||
| ticker.Stop() | ||
| cancelFn() | ||
| for range messages { | ||
| // throw away | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case message, ok := <-messages: | ||
| conn.SetWriteDeadline(time.Now().Add(p.writeWait)) //nolint:errcheck // always returns nil | ||
| if !ok { | ||
| // readFromHTTPResponse closed the channel. | ||
| err := conn.WriteMessage(websocket.CloseMessage, []byte{}) | ||
| if err != nil { | ||
| p.logger.Warn(ctx).Err(err).Msg("[write] failed sending close message") | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if err := conn.WriteMessage(websocket.TextMessage, message); err != nil { | ||
| p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message") | ||
| return | ||
| } | ||
| case <-ticker.C: | ||
| conn.SetWriteDeadline(time.Now().Add(p.writeWait)) //nolint:errcheck // always returns nil | ||
| if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.