Skip to content

Commit 7cd3254

Browse files
committed
Squashed commit of the following:
commit 541cd83 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 29 14:32:49 2022 +0100 simplify tests commit 6737ab4 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 29 13:57:14 2022 +0100 cosmetics commit 9dbc8b1 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 29 13:36:32 2022 +0100 rename commit f218ed2 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 29 13:18:46 2022 +0100 tests commit b7f2baf Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 29 11:40:25 2022 +0100 simplify commit fd699c1 Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 28 13:02:55 2022 +0100 improvements commit 27bf6ec Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 28 12:47:54 2022 +0100 rename method commit 7ec409d Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 28 12:09:34 2022 +0100 linter commit 3593282 Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 28 12:02:16 2022 +0100 init copy commit 52ffbc7 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 20:24:40 2022 +0100 setup commit 623ca25 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 12:10:43 2022 +0100 pr feedback 3 commit 83c6152 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 11:34:29 2022 +0100 Revert "Update pkg/web/api/connector_v1_test.go" This reverts commit 8f40c22. commit 6df36fc Merge: dd8b1b4 8f40c22 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 11:29:21 2022 +0100 Merge branch 'haris/stream-inspector-destination' of github.com:ConduitIO/conduit into haris/stream-inspector-destination commit dd8b1b4 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 11:28:27 2022 +0100 linter commit 7865b60 Author: Haris Osmanagic <haris@meroxa.io> Date: Tue Nov 22 11:26:07 2022 +0100 pr feedback 2 commit 8f40c22 Author: Haris Osmanagić <haris@meroxa.io> Date: Tue Nov 22 11:18:58 2022 +0100 Update pkg/web/api/connector_v1_test.go Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com> commit 5067241 Author: Haris Osmanagić <haris@meroxa.io> Date: Tue Nov 22 11:08:57 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com> commit e7b3306 Author: Haris Osmanagić <haris@meroxa.io> Date: Tue Nov 22 11:08:17 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com> commit 3a0b31b Author: Haris Osmanagić <haris@meroxa.io> Date: Tue Nov 22 10:58:40 2022 +0100 Update pkg/inspector/inspector_test.go Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com> commit 1a94458 Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 21 14:06:27 2022 +0100 linter commit ae1408f Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 21 14:05:46 2022 +0100 linter commit 1848a05 Author: Haris Osmanagic <haris@meroxa.io> Date: Mon Nov 21 14:05:01 2022 +0100 Stream inspector for destinations
1 parent 88ef86b commit 7cd3254

6 files changed

Lines changed: 225 additions & 13 deletions

File tree

go.mod

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/golang/mock v1.6.0
2121
github.com/google/go-cmp v0.5.9
2222
github.com/google/uuid v1.3.0
23+
github.com/gorilla/websocket v1.5.0
2324
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0
2425
github.com/hashicorp/go-hclog v1.3.1
2526
github.com/hashicorp/go-plugin v1.4.6
@@ -32,7 +33,6 @@ require (
3233
github.com/prometheus/client_model v0.3.0
3334
github.com/prometheus/common v0.37.0
3435
github.com/rs/zerolog v1.28.0
35-
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75
3636
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3
3737
go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50
3838
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
@@ -84,7 +84,6 @@ require (
8484
github.com/golang/protobuf v1.5.2 // indirect
8585
github.com/golang/snappy v0.0.4 // indirect
8686
github.com/google/flatbuffers v2.0.0+incompatible // indirect
87-
github.com/gorilla/websocket v1.4.2 // indirect
8887
github.com/hashicorp/yamux v0.1.1 // indirect
8988
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
9089
github.com/jackc/pgconn v1.13.0 // indirect
@@ -109,7 +108,6 @@ require (
109108
github.com/pkg/errors v0.9.1 // indirect
110109
github.com/prometheus/procfs v0.8.0 // indirect
111110
github.com/segmentio/kafka-go v0.4.35 // indirect
112-
github.com/sirupsen/logrus v1.8.1 // indirect
113111
github.com/xdg/scram v1.0.5 // indirect
114112
github.com/xdg/stringprep v1.0.3 // indirect
115113
github.com/xitongsys/parquet-go v1.6.2 // indirect

go.sum

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
308308
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
309309
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
310310
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
311-
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
312-
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
311+
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
312+
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
313313
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
314314
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0=
315315
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA=
@@ -532,8 +532,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
532532
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
533533
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
534534
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
535-
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
536-
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
537535
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
538536
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
539537
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -559,8 +557,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
559557
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
560558
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
561559
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
562-
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
563-
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
564560
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
565561
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
566562
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
@@ -710,7 +706,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
710706
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
711707
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
712708
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
713-
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
714709
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
715710
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
716711
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=

pkg/conduit/runtime.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ func (r *Runtime) serveHTTPAPI(
445445
grpcutil.WithDefaultGatewayMiddleware(
446446
r.logger, allowCORS(gwmux, "http://localhost:4200"),
447447
),
448+
r.logger,
448449
)
449450

450451
return r.serveHTTP(

pkg/foundation/grpcutil/gateway.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/conduitio/conduit/pkg/foundation/log"
2222
"github.com/google/uuid"
2323
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
24-
"github.com/tmc/grpc-websocket-proxy/wsproxy"
2524
"google.golang.org/protobuf/encoding/protojson"
2625
)
2726

@@ -110,8 +109,8 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler {
110109
})
111110
}
112111

113-
func WithWebsockets(h http.Handler) http.Handler {
114-
return wsproxy.WebsocketProxy(h)
112+
func WithWebsockets(h http.Handler, l log.CtxLogger) http.Handler {
113+
return newWebSocketProxy(h, l)
115114
}
116115

117116
func extractEndpoint(r *http.Request) string {
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright © 2022 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpcutil
16+
17+
import (
18+
"bufio"
19+
"context"
20+
"io"
21+
"net/http"
22+
23+
"github.com/conduitio/conduit/pkg/foundation/log"
24+
"github.com/gorilla/websocket"
25+
)
26+
27+
type inMemoryResponseWriter struct {
28+
io.Writer
29+
header http.Header
30+
code int
31+
closed chan bool
32+
}
33+
34+
func newInMemoryResponseWriter(writer io.Writer) *inMemoryResponseWriter {
35+
return &inMemoryResponseWriter{
36+
Writer: writer,
37+
header: http.Header{},
38+
closed: make(chan bool, 1),
39+
}
40+
}
41+
42+
func (w *inMemoryResponseWriter) Write(b []byte) (int, error) {
43+
return w.Writer.Write(b)
44+
}
45+
func (w *inMemoryResponseWriter) Header() http.Header {
46+
return w.header
47+
}
48+
func (w *inMemoryResponseWriter) WriteHeader(code int) {
49+
w.code = code
50+
}
51+
func (w *inMemoryResponseWriter) CloseNotify() <-chan bool {
52+
return w.closed
53+
}
54+
func (w *inMemoryResponseWriter) Flush() {}
55+
56+
// wsProxy is a proxy around a http.Handler which
57+
// redirects the response data from the http.Handler
58+
// to a WebSocket connection.
59+
type wsProxy struct {
60+
handler http.Handler
61+
logger log.CtxLogger
62+
upgrader websocket.Upgrader
63+
}
64+
65+
func newWebSocketProxy(handler http.Handler, logger log.CtxLogger) *wsProxy {
66+
return &wsProxy{
67+
handler: handler,
68+
logger: logger.WithComponent("grpcutil.websocket"),
69+
upgrader: websocket.Upgrader{
70+
ReadBufferSize: 1024,
71+
WriteBufferSize: 1024,
72+
},
73+
}
74+
}
75+
76+
func (p *wsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
77+
if !websocket.IsWebSocketUpgrade(r) {
78+
p.handler.ServeHTTP(w, r)
79+
return
80+
}
81+
p.proxy(w, r)
82+
}
83+
84+
func (p *wsProxy) proxy(w http.ResponseWriter, r *http.Request) {
85+
ctx, cancelFn := context.WithCancel(r.Context())
86+
defer cancelFn()
87+
88+
// upgrade connection to WebSocket
89+
conn, err := p.upgrader.Upgrade(w, r, http.Header{})
90+
if err != nil {
91+
p.logger.Err(ctx, err).Msg("error upgrading websocket")
92+
return
93+
}
94+
defer conn.Close()
95+
96+
// We use a pipe to read the data
97+
// being written to the underlying http.Handler
98+
responseR, responseW := io.Pipe()
99+
response := newInMemoryResponseWriter(responseW)
100+
go func() {
101+
<-ctx.Done()
102+
p.logger.Debug(ctx).Err(ctx.Err()).Msg("closing pipes")
103+
responseW.CloseWithError(io.EOF)
104+
response.closed <- true
105+
}()
106+
107+
go func() {
108+
defer cancelFn()
109+
p.handler.ServeHTTP(response, r)
110+
}()
111+
112+
scanner := bufio.NewScanner(responseR)
113+
114+
for scanner.Scan() {
115+
if len(scanner.Bytes()) == 0 {
116+
p.logger.Warn(ctx).Err(scanner.Err()).Msg("[write] empty scan")
117+
continue
118+
}
119+
120+
p.logger.Trace(ctx).Msgf("[write] scanned %v", scanner.Text())
121+
if err := conn.WriteMessage(websocket.TextMessage, scanner.Bytes()); err != nil {
122+
p.logger.Warn(ctx).Err(err).Msg("[write] error writing websocket message")
123+
return
124+
}
125+
}
126+
// todo properly communicate the error to the client
127+
// and close the connection
128+
if sErr := scanner.Err(); sErr != nil {
129+
p.logger.Err(ctx, sErr).Msg("scanner err")
130+
}
131+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright © 2022 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package grpcutil
16+
17+
import (
18+
"context"
19+
"io"
20+
"net/http"
21+
"net/http/httptest"
22+
"strings"
23+
"testing"
24+
25+
"github.com/conduitio/conduit/pkg/foundation/log"
26+
"github.com/gorilla/websocket"
27+
"github.com/matryer/is"
28+
)
29+
30+
type testHandler struct {
31+
is *is.I
32+
response string
33+
}
34+
35+
func (h *testHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
36+
_, err := w.Write([]byte(h.response))
37+
h.is.NoErr(err)
38+
}
39+
40+
func TestWebSocket_NoUpgradeToWebSocket(t *testing.T) {
41+
is := is.New(t)
42+
ctx, cancel := context.WithCancel(context.Background())
43+
defer cancel()
44+
45+
h := &testHandler{
46+
is: is,
47+
response: "hi there",
48+
}
49+
s := httptest.NewServer(newWebSocketProxy(h, log.Nop()))
50+
defer s.Close()
51+
52+
req, err := http.NewRequestWithContext(ctx, "GET", s.URL, nil)
53+
is.NoErr(err)
54+
55+
resp, err := http.DefaultClient.Do(req)
56+
is.NoErr(err)
57+
is.True(resp.Body != nil) // expected response to have a body
58+
defer resp.Body.Close()
59+
60+
bytes, err := io.ReadAll(resp.Body)
61+
is.NoErr(err)
62+
is.Equal(h.response, string(bytes))
63+
}
64+
65+
func TestWebSocket_UpgradeToWebSocket(t *testing.T) {
66+
is := is.New(t)
67+
68+
h := &testHandler{
69+
is: is,
70+
response: "hi there",
71+
}
72+
s := httptest.NewServer(newWebSocketProxy(h, log.Nop()))
73+
defer s.Close()
74+
75+
// Convert http to ws
76+
wsURL := "ws" + strings.TrimPrefix(s.URL, "http")
77+
78+
// Connect to the server
79+
ws, resp, err := websocket.DefaultDialer.Dial(wsURL, nil)
80+
is.NoErr(err)
81+
defer ws.Close()
82+
defer resp.Body.Close()
83+
84+
msgType, bytes, err := ws.ReadMessage()
85+
is.NoErr(err)
86+
is.Equal(h.response, string(bytes))
87+
is.Equal(websocket.TextMessage, msgType)
88+
}

0 commit comments

Comments
 (0)