This repository provides a step-by-step demo of the Confluent Cloud feature Client-Side Payload Encryption.
- Confluent Cloud cluster with Advanced Stream Governance package
We will produce personal data to Confluent Cloud in the following form
{
"id": "0",
"name": "Anna",
"birthday": "1993-08-01",
"timestamp": "2023-10-07T19:54:21.884Z"
}However, we set up the corresponding configurations to encrypt the entire payload. We then start a consumer with the corresponding configurations to decrypt the payload again.
In order to have a realistic scenario, we do not produce and consume via the CLI but develop a producer and consumer application with Kotlin.
In Azure AD (Entra ID), we create an app with a secret, see this Quickstart documentation. Copy the
- tenant ID
- client ID
- secret value (you need to copy it directly after creation)
Create a Key Vault and a key. Copy the Key Identifier as displayed below
In the Key Vault, we use Access policies to grant permission for the key to the registered application, see the documentation. We provide "All Key Permissions" (in production we recommend following the principle of least privilege).
We register the schema defining the encryption rule
curl --request POST --url 'https://psrc-abc.westeurope.azure.confluent.cloud/subjects/pneff-cspe-test-value/versions' \
--header 'Authorization: Basic <SR API Key>:<SR API Secret>' \ <-- base64 encoded credentials
--header 'content-type: application/json' \
--data '{
"schemaType": "AVRO",
"schema": "{ \"name\": \"PersonalData\", \"type\": \"record\", \"namespace\": \"com.cspeExample\", \"fields\": [{\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"},{\"name\": \"birthday\", \"type\": \"string\"},{\"name\": \"timestamp\",\"type\": [\"string\", \"null\"]}]}",
"metadata": {
"properties": {
"owner": "Patrick Neff",
"email": "pneff@confluent.io"
}
}
}' Be aware, that for CSPE you need to configure encodingRules.
curl --request POST --url 'https://psrc-abc.westeurope.azure.confluent.cloud/subjects/pneff-cspe-test-value/versions' \
--header 'Authorization: Basic <SR API Key>:<SR API Secret>' \ <-- base64 encoded credentials
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
"ruleSet": {
"encodingRules": [
{
"name": "encryptPayload",
"kind": "TRANSFORM",
"type": "ENCRYPT_PAYLOAD",
"mode": "WRITEREAD",
"params": {
"encrypt.kek.name": "pneff-cspe",
"encrypt.kms.key.id": "<Azure Key Identifier>",
"encrypt.kms.type": "azure-kms"
},
"onFailure": "ERROR,NONE"
}
]
}
}'We can check that everything is registered correctly by either executing
curl --request GET \
--url 'https://psrc-abc.westeurope.azure.confluent.cloud/subjects/pneff-cspe-test-value/versions/latest' \
--header 'Authorization: Basic <SR API Key>:<SR API Secret>' \ <-- base64 encoded credentials | jqWe need to add
implementation("io.confluent:kafka-avro-serializer:8.1.0")
implementation("io.confluent:kafka-schema-registry-client-encryption-azure:8.1.0")We need to adjust the configuration by adding
// Encryption
settings.setProperty("rule.executors._default_.param.tenant.id", "<tenant ID>")
settings.setProperty("rule.executors._default_.param.client.id", "<client ID>")
settings.setProperty("rule.executors._default_.param.client.secret", "<secret value>")
// Required since we manually create schemas
settings.setProperty("use.latest.version", "true")
settings.setProperty("auto.register.schemas","false")We continuously produce data with the encryption (the topic pneff-cspe-test needs to be created before) by executing
./gradlew runWe can see in the logs that everything is working fine
[ForkJoinPool.commonPool-worker-1] INFO com.microsoft.aad.msal4j.AcquireTokenSilentSupplier - Returning token from cache
[ForkJoinPool.commonPool-worker-1] INFO com.azure.identity.ClientSecretCredential - Azure Identity => getToken() result for scopes [https://vault.azure.net/.default]: SUCCESS
[ForkJoinPool.commonPool-worker-1] INFO com.azure.core.implementation.AccessTokenCache - Acquired a new access token.
[kafka-producer-network-thread | producer-2] INFO KafkaProducer - event produced to pneff-cspe-testWe configure the consumer with the corresponding configurations and just log the consumed event. We can run it again with
./gradlew runIt may take a few seconds but then we can see all events with decrypted payload:
[main] INFO KafkaConsumer - We consumed the event {"id": "0", "name": "Anna", "birthday": "1993-08-01", "timestamp": "2023-10-07T20:48:02.624Z"}
[main] INFO KafkaConsumer - We consumed the event {"id": "1", "name": "Joe", "birthday": "1996-09-11", "timestamp": "2023-10-07T20:48:18.005Z"}If we do not provide KMS access configuration we receive the Exception Failed to decrypt with all KEKs.

