Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ addresses and access credentials.
This schema registry config file contains the URI of the service and
access credentials.

> We currently support just SASL authentication though it will be easy
> to add other authentication options (or no auth). Please let us know if
> you have a requirement here.
> We currently support no authentication, SASL/PLAIN authentication, and
> TLS client authentication, though it will be easy to add other
> authentication options. Please let us know if you have a requirement
> here.

## Description

Expand Down
36 changes: 31 additions & 5 deletions cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cli

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -62,11 +63,15 @@ func getKey() (apiKey, error) {
}

type config struct {
BootstrapServers string `json:"bootstrap_servers"`
SecurityProtocol string `json:"security_protocol"`
SaslMechanisms string `json:"sasl_mechanisms"`
SaslUsername string `json:"sasl_username"`
SaslPassword string `json:"sasl_password"`
BootstrapServers string `json:"bootstrap_servers"`
SecurityProtocol string `json:"security_protocol"`
SaslMechanisms string `json:"sasl_mechanisms"`
SaslUsername string `json:"sasl_username"`
SaslPassword string `json:"sasl_password"`
TLSClientCertFile string `json:"tls_client_cert_file"`
TLSClientKeyFile string `json:"tls_client_key_file"`
TLSServerCACertFile string `json:"tls_server_ca_cert_file"`
TLSServerInsecureSkipVerify bool `json:"tls_server_insecure_skip_verify"`
}

func LoadKafkaConfig() ([]kgo.Opt, error) {
Expand All @@ -90,10 +95,31 @@ func LoadKafkaConfig() ([]kgo.Opt, error) {
switch c.SecurityProtocol {
case "", "PLAINTEXT", "SASL_PLAINTEXT":
case "SSL", "SASL_SSL":
var tlsConfig tls.Config
if c.TLSClientCertFile != "" && c.TLSClientKeyFile != "" {
cert, err := tls.LoadX509KeyPair(c.TLSClientCertFile, c.TLSClientKeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load key pair from tls_client_cert_file and tls_client_key_file: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if c.TLSServerCACertFile != "" {
caCert, err := os.ReadFile(c.TLSServerCACertFile)
if err != nil {
return nil, fmt.Errorf("failed to read tls_server_ca_cert_file: %w", err)
}
p := x509.NewCertPool()
if !p.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("failed to append certificates from tls_server_ca_cert_file")
}
tlsConfig.RootCAs = p
}
tlsConfig.InsecureSkipVerify = c.TLSServerInsecureSkipVerify
d := &tls.Dialer{
NetDialer: &net.Dialer{
Timeout: 10 * time.Second,
},
Config: &tlsConfig,
}
opts = append(opts, kgo.Dialer(d.DialContext))
default:
Expand Down
14 changes: 9 additions & 5 deletions kafka.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
{
"bootstrap_servers": "<YOUR KAFKA BOOTSTRAP SERVERS>",
"security_protocol": "SASL_SSL",
"sasl_mechanisms": "PLAIN",
"sasl_username": "<YOUR KAFKA USERNAME>",
"sasl_password": "<YOUR KAFKA PASSWORD>"
"bootstrap_servers": "comma-separated list of Kafka bootstrap servers",
"security_protocol": "PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, or SSL (default: PLAINTEXT)",
"sasl_mechanisms": "must be PLAIN for SASL_PLAINTEXT and SASL_SSL",
"sasl_username": "Kafka username for SASL_PLAINTEXT and SASL_SSL",
"sasl_password": "Kafka password for SASL_PLAINTEXT and SASL_SSL",
"tls_client_cert_file": "path to certificate file for client authentication for SASL_SSL and SSL",
"tls_client_key_file": "path to private key file for client authentication for SASL_SSL and SSL",
"tls_server_ca_cert_file": "path to root CA certificate file for server certificate verification for SASL_SSL and SSL",
"tls_server_insecure_skip_verify": "set to true to skip server certificate verification for SASL_SSL and SSL (default: false)"
}