Skip to content

Commit 88ef86b

Browse files
authored
Stream inspector for destinations (#718)
1 parent 7cde058 commit 88ef86b

19 files changed

Lines changed: 806 additions & 4 deletions

File tree

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ require (
3232
github.com/prometheus/client_model v0.3.0
3333
github.com/prometheus/common v0.37.0
3434
github.com/rs/zerolog v1.28.0
35+
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75
3536
go.buf.build/grpc/go/conduitio/conduit-connector-protocol v1.4.3
3637
go.buf.build/protocolbuffers/go/grpc-ecosystem/grpc-gateway v1.3.50
3738
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91
@@ -83,6 +84,7 @@ require (
8384
github.com/golang/protobuf v1.5.2 // indirect
8485
github.com/golang/snappy v0.0.4 // indirect
8586
github.com/google/flatbuffers v2.0.0+incompatible // indirect
87+
github.com/gorilla/websocket v1.4.2 // indirect
8688
github.com/hashicorp/yamux v0.1.1 // indirect
8789
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
8890
github.com/jackc/pgconn v1.13.0 // indirect
@@ -107,6 +109,7 @@ require (
107109
github.com/pkg/errors v0.9.1 // indirect
108110
github.com/prometheus/procfs v0.8.0 // indirect
109111
github.com/segmentio/kafka-go v0.4.35 // indirect
112+
github.com/sirupsen/logrus v1.8.1 // indirect
110113
github.com/xdg/scram v1.0.5 // indirect
111114
github.com/xdg/stringprep v1.0.3 // indirect
112115
github.com/xitongsys/parquet-go v1.6.2 // indirect

go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
308308
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
309309
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
310310
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
311+
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
312+
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
311313
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
312314
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0 h1:t7uX3JBHdVwAi3G7sSSdbsk8NfgA+LnUS88V/2EKaA0=
313315
github.com/grpc-ecosystem/grpc-gateway/v2 v2.14.0/go.mod h1:4OGVnY4qf2+gw+ssiHbW+pq4mo2yko94YxxMmXZ7jCA=
@@ -530,6 +532,8 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
530532
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
531533
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
532534
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
535+
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
536+
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
533537
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
534538
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
535539
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -555,6 +559,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
555559
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
556560
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
557561
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
562+
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 h1:6fotK7otjonDflCTK0BCfls4SPy3NcCVb5dqqmbRknE=
563+
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75/go.mod h1:KO6IkyS8Y3j8OdNO85qEYBsRPuteD+YciPomcXdrMnk=
558564
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
559565
github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw=
560566
github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
@@ -704,6 +710,7 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
704710
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
705711
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
706712
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
713+
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
707714
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
708715
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
709716
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=

pkg/conduit/runtime.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"google.golang.org/grpc"
5858
"google.golang.org/grpc/credentials/insecure"
5959
"google.golang.org/grpc/health/grpc_health_v1"
60+
"google.golang.org/grpc/reflection"
6061
"google.golang.org/grpc/stats"
6162
"gopkg.in/tomb.v2"
6263

@@ -320,6 +321,10 @@ func (r *Runtime) serveGRPCAPI(ctx context.Context, t *tomb.Tomb) (net.Addr, err
320321

321322
info := api.NewInformation(Version(false))
322323
info.Register(grpcServer)
324+
// Makes it easier to use command line tools to interact
325+
// with the gRPC API.
326+
// https://github.com/grpc/grpc/blob/master/doc/server-reflection.md
327+
reflection.Register(grpcServer)
323328

324329
healthService := api.NewHealthChecker()
325330
grpc_health_v1.RegisterHealthServer(grpcServer, healthService)
@@ -436,14 +441,18 @@ func (r *Runtime) serveHTTPAPI(
436441
return nil, cerrors.Errorf("failed to register metrics handler: %w", err)
437442
}
438443

444+
handler := grpcutil.WithWebsockets(
445+
grpcutil.WithDefaultGatewayMiddleware(
446+
r.logger, allowCORS(gwmux, "http://localhost:4200"),
447+
),
448+
)
449+
439450
return r.serveHTTP(
440451
ctx,
441452
t,
442453
&http.Server{
443-
Addr: r.Config.HTTP.Address,
444-
Handler: grpcutil.WithDefaultGatewayMiddleware(
445-
r.logger, allowCORS(gwmux, "http://localhost:4200"),
446-
),
454+
Addr: r.Config.HTTP.Address,
455+
Handler: handler,
447456
ReadHeaderTimeout: 10 * time.Second,
448457
},
449458
)

pkg/connector/builder.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020

2121
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2222
"github.com/conduitio/conduit/pkg/foundation/log"
23+
"github.com/conduitio/conduit/pkg/inspector"
2324
"github.com/conduitio/conduit/pkg/plugin"
2425
)
2526

27+
const inspectorBufferSize = 1000
28+
2629
// Builder represents an object that can build a connector.
2730
// The main use of this interface is to be able to switch out the connector
2831
// implementations for mocks in tests.
@@ -99,6 +102,7 @@ func (b *DefaultBuilder) Init(c Connector, id string, config Config) error {
99102
v.persister = b.persister
100103
v.pluginDispenser = p
101104
v.errs = make(chan error)
105+
v.inspector = inspector.New(v.logger, inspectorBufferSize)
102106
default:
103107
return ErrInvalidConnectorType
104108
}

pkg/connector/connector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"context"
2222
"time"
2323

24+
"github.com/conduitio/conduit/pkg/inspector"
2425
"github.com/conduitio/conduit/pkg/record"
2526
)
2627

@@ -65,6 +66,10 @@ type Connector interface {
6566
// asynchronously (e.g. persisting state).
6667
Errors() <-chan error
6768

69+
// Inspect returns an inspector.Session which exposes the records
70+
// coming into or out of this connector (depending on the connector type).
71+
Inspect(context.Context) *inspector.Session
72+
6873
// Open will start the plugin process and call the Open method on the
6974
// plugin. After the connector has been successfully opened it is considered
7075
// as running (IsRunning returns true) and can be stopped again with

pkg/connector/destination.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2323
"github.com/conduitio/conduit/pkg/foundation/log"
24+
"github.com/conduitio/conduit/pkg/inspector"
2425
"github.com/conduitio/conduit/pkg/plugin"
2526
"github.com/conduitio/conduit/pkg/record"
2627
)
@@ -57,6 +58,8 @@ type destination struct {
5758
// stopStream is a function that closes the context of the stream
5859
stopStream context.CancelFunc
5960

61+
inspector *inspector.Inspector
62+
6063
// m can lock a destination from concurrent access (e.g. in connector persister).
6164
m sync.Mutex
6265
// wg tracks the number of in flight calls to the plugin.
@@ -115,6 +118,10 @@ func (d *destination) Errors() <-chan error {
115118
return d.errs
116119
}
117120

121+
func (d *destination) Inspect(ctx context.Context) *inspector.Session {
122+
return d.inspector.NewSession(ctx)
123+
}
124+
118125
func (d *destination) Validate(ctx context.Context, settings map[string]string) (err error) {
119126
dest, err := d.pluginDispenser.DispenseDestination()
120127
if err != nil {
@@ -227,6 +234,7 @@ func (d *destination) Write(ctx context.Context, r record.Record) error {
227234
return err
228235
}
229236

237+
d.inspector.Send(ctx, r)
230238
err = d.plugin.Write(ctx, r)
231239
if err != nil {
232240
return cerrors.Errorf("error writing record: %w", err)

pkg/connector/mock/connector.go

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/connector/source.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/conduitio/conduit/pkg/foundation/cerrors"
2323
"github.com/conduitio/conduit/pkg/foundation/log"
24+
"github.com/conduitio/conduit/pkg/inspector"
2425
"github.com/conduitio/conduit/pkg/plugin"
2526
"github.com/conduitio/conduit/pkg/record"
2627
)
@@ -114,6 +115,11 @@ func (s *source) Errors() <-chan error {
114115
return s.errs
115116
}
116117

118+
func (s *source) Inspect(_ context.Context) *inspector.Session {
119+
// TODO implement me
120+
panic("implement me")
121+
}
122+
117123
func (s *source) Validate(ctx context.Context, settings map[string]string) (err error) {
118124
src, err := s.pluginDispenser.DispenseSource()
119125
if err != nil {

pkg/foundation/grpcutil/gateway.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/conduitio/conduit/pkg/foundation/log"
2222
"github.com/google/uuid"
2323
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
24+
"github.com/tmc/grpc-websocket-proxy/wsproxy"
2425
"google.golang.org/protobuf/encoding/protojson"
2526
)
2627

@@ -109,6 +110,10 @@ func WithHTTPEndpointHeader(h http.Handler) http.Handler {
109110
})
110111
}
111112

113+
func WithWebsockets(h http.Handler) http.Handler {
114+
return wsproxy.WebsocketProxy(h)
115+
}
116+
112117
func extractEndpoint(r *http.Request) string {
113118
return r.Method + " " + r.URL.Path
114119
}

pkg/foundation/log/fields.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ const (
3232
PluginTypeField = "plugin_type"
3333
PluginNameField = "plugin_name"
3434
PluginPathField = "plugin_path"
35+
36+
InspectorSessionID = "inspector_session_id"
3537
)

0 commit comments

Comments
 (0)