diff --git a/README.md b/README.md index 14bacbf..c2f7b91 100644 --- a/README.md +++ b/README.md @@ -97,9 +97,10 @@ addresses and access credentials. This schema registry config file contains the URI of the service and access credentials. -> We currently support just SASL authentication though it will be easy -> to add other authentication options (or no auth). Please let us know if -> you have a requirement here. +> We currently support no authentication, SASL/PLAIN authentication, and +> TLS client authentication, though it will be easy to add other +> authentication options. Please let us know if you have a requirement +> here. ## Description diff --git a/cli/flags.go b/cli/flags.go index 91fcb10..097d52f 100644 --- a/cli/flags.go +++ b/cli/flags.go @@ -2,6 +2,7 @@ package cli import ( "crypto/tls" + "crypto/x509" "encoding/json" "flag" "fmt" @@ -62,11 +63,15 @@ func getKey() (apiKey, error) { } type config struct { - BootstrapServers string `json:"bootstrap_servers"` - SecurityProtocol string `json:"security_protocol"` - SaslMechanisms string `json:"sasl_mechanisms"` - SaslUsername string `json:"sasl_username"` - SaslPassword string `json:"sasl_password"` + BootstrapServers string `json:"bootstrap_servers"` + SecurityProtocol string `json:"security_protocol"` + SaslMechanisms string `json:"sasl_mechanisms"` + SaslUsername string `json:"sasl_username"` + SaslPassword string `json:"sasl_password"` + TLSClientCertFile string `json:"tls_client_cert_file"` + TLSClientKeyFile string `json:"tls_client_key_file"` + TLSServerCACertFile string `json:"tls_server_ca_cert_file"` + TLSServerInsecureSkipVerify bool `json:"tls_server_insecure_skip_verify"` } func LoadKafkaConfig() ([]kgo.Opt, error) { @@ -90,10 +95,31 @@ func LoadKafkaConfig() ([]kgo.Opt, error) { switch c.SecurityProtocol { case "", "PLAINTEXT", "SASL_PLAINTEXT": case "SSL", "SASL_SSL": + var tlsConfig tls.Config + if c.TLSClientCertFile != "" && c.TLSClientKeyFile != "" { + cert, err := tls.LoadX509KeyPair(c.TLSClientCertFile, c.TLSClientKeyFile) + if err != nil { + return nil, fmt.Errorf("failed to load key pair from tls_client_cert_file and tls_client_key_file: %w", err) + } + tlsConfig.Certificates = []tls.Certificate{cert} + } + if c.TLSServerCACertFile != "" { + caCert, err := os.ReadFile(c.TLSServerCACertFile) + if err != nil { + return nil, fmt.Errorf("failed to read tls_server_ca_cert_file: %w", err) + } + p := x509.NewCertPool() + if !p.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("failed to append certificates from tls_server_ca_cert_file") + } + tlsConfig.RootCAs = p + } + tlsConfig.InsecureSkipVerify = c.TLSServerInsecureSkipVerify d := &tls.Dialer{ NetDialer: &net.Dialer{ Timeout: 10 * time.Second, }, + Config: &tlsConfig, } opts = append(opts, kgo.Dialer(d.DialContext)) default: diff --git a/kafka.json b/kafka.json index 34d93e6..50a30bc 100644 --- a/kafka.json +++ b/kafka.json @@ -1,7 +1,11 @@ { - "bootstrap_servers": "", - "security_protocol": "SASL_SSL", - "sasl_mechanisms": "PLAIN", - "sasl_username": "", - "sasl_password": "" + "bootstrap_servers": "comma-separated list of Kafka bootstrap servers", + "security_protocol": "PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, or SSL (default: PLAINTEXT)", + "sasl_mechanisms": "must be PLAIN for SASL_PLAINTEXT and SASL_SSL", + "sasl_username": "Kafka username for SASL_PLAINTEXT and SASL_SSL", + "sasl_password": "Kafka password for SASL_PLAINTEXT and SASL_SSL", + "tls_client_cert_file": "path to certificate file for client authentication for SASL_SSL and SSL", + "tls_client_key_file": "path to private key file for client authentication for SASL_SSL and SSL", + "tls_server_ca_cert_file": "path to root CA certificate file for server certificate verification for SASL_SSL and SSL", + "tls_server_insecure_skip_verify": "set to true to skip server certificate verification for SASL_SSL and SSL (default: false)" }