Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,19 @@ subprojects {

extensions.configure<SpotlessExtension> {
kotlin {
// by default the target is every '.kt' and '.kts' file in the java source sets
ktfmt().kotlinlangStyle().configure {
it.setMaxWidth(120)
}
ktlint()
.editorConfigOverride(
mapOf("max_line_length" to "120", "ij_kotlin_indent_before_arrow_on_new_line" to true)
)
licenseHeaderFile("${project.rootDir}/license-template.kt", "package")
.updateYearWithLatest(false)
}
kotlinGradle {
ktlint()
.editorConfigOverride(
mapOf("max_line_length" to "120", "ij_kotlin_indent_before_arrow_on_new_line" to true)
)
}
}

extensions.configure<PublishingExtension> {
Expand Down
8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
avro = "1.12.1"
avroPlugin = "1.9.1"
dependencyManagement = "1.1.7"
kotlin = "2.3.10"
kotlin = "2.3.20"
kotlinLogging = "8.0.01"
mockk = "1.14.9"
msal4j = "1.24.0"
sonarqube = "7.2.2.6593"
spotless = "8.2.1"
springBoot = "4.0.2"
sonarqube = "7.2.3.7755"
spotless = "8.3.0"
springBoot = "4.0.3"
springmockk = "5.0.1"

[libraries]
Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=b266d5ff6b90eada6dc3b20cb090e3731302e553a27c5d3e4df1f0d76beaff06
distributionUrl=https\://services.gradle.org/distributions/gradle-9.3.1-bin.zip
distributionSha256Sum=60ea723356d81263e8002fec0fcf9e2b0eee0c0850c7a3d7ab0a63f2ccc601f3
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.0-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion kafka-avro/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ dependencies {

tasks.withType<KotlinCompile> {
dependsOn(
tasks.withType<GenerateAvroJavaTask>()
tasks.withType<GenerateAvroJavaTask>(),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class AvroDeserializer(deserializerSchemas: List<Schema>) : Deserializer<Specifi
logger.trace("Deserializing for {}", topic)
return decoder.decode(payload)
} catch (ex: Exception) {
throw SerializationException("Error deserializing Avro message for topic: ${topic}", ex)
throw SerializationException("Error deserializing Avro message for topic: $topic", ex)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
// SPDX-License-Identifier: Apache-2.0
package com.gxf.utilities.kafka.avro

import java.io.IOException
import java.io.OutputStream
import kotlin.reflect.KClass
import org.apache.avro.message.BinaryMessageEncoder
import org.apache.avro.specific.SpecificData
import org.apache.avro.specific.SpecificRecordBase
import org.slf4j.LoggerFactory
import java.io.IOException
import java.io.OutputStream
import kotlin.reflect.KClass

object AvroEncoder {
val encoders: HashMap<KClass<out SpecificRecordBase>, BinaryMessageEncoder<SpecificRecordBase>> = HashMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
// SPDX-License-Identifier: Apache-2.0
package com.gxf.utilities.kafka.avro

import java.io.ByteArrayOutputStream
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream

class AvroSerializer : Serializer<SpecificRecordBase> {
companion object {
Expand All @@ -22,7 +22,7 @@ class AvroSerializer : Serializer<SpecificRecordBase> {
AvroEncoder.encode(data, outputStream)
return outputStream.toByteArray()
} catch (ex: Exception) {
throw SerializationException("Error serializing Avro message for topic: ${topic}", ex)
throw SerializationException("Error serializing Avro message for topic: $topic", ex)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package com.gxf.utilities.kafka.avro

import com.alliander.gxf.utilities.kafka.avro.AvroSchema1
import com.alliander.gxf.utilities.kafka.avro.AvroSchema2
import java.io.ByteArrayOutputStream
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.io.ByteArrayOutputStream

class AvroEncoderTest {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ package com.gxf.utilities.kafka.oauth.handler
import com.microsoft.aad.msal4j.ClientCredentialFactory
import com.microsoft.aad.msal4j.ClientCredentialParameters
import com.microsoft.aad.msal4j.ConfidentialClientApplication
import java.io.File
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import javax.security.auth.callback.Callback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
Expand All @@ -21,6 +14,13 @@ import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback
import org.apache.kafka.common.security.oauthbearer.internals.secured.BasicOAuthBearerToken
import org.slf4j.LoggerFactory
import java.io.File
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import javax.security.auth.callback.Callback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry

class OAuthAuthenticateCallbackHandler : AuthenticateCallbackHandler {
internal lateinit var tokenFilePath: String
Expand All @@ -47,7 +47,7 @@ class OAuthAuthenticateCallbackHandler : AuthenticateCallbackHandler {

private fun getOptions(saslMechanism: String, jaasConfigEntries: List<AppConfigurationEntry>): Map<String, Any?> {
require(saslMechanism == OAuthBearerLoginModule.OAUTHBEARER_MECHANISM) {
"Unexpected SASL mechanism: ${saslMechanism}"
"Unexpected SASL mechanism: $saslMechanism"
}
require(jaasConfigEntries.size == 1) {
"Must supply exactly 1 non-null JAAS mechanism configuration (size was ${jaasConfigEntries.size})"
Expand Down Expand Up @@ -79,8 +79,10 @@ class OAuthAuthenticateCallbackHandler : AuthenticateCallbackHandler {
for (callback in callbacks) {
when (callback) {
is OAuthBearerTokenCallback -> callback.token(getToken())

is OAuthBearerValidatorCallback ->
throw UnsupportedCallbackException(callback, "Validate callback not yet implemented")

else -> throw UnsupportedCallbackException(callback, "Unknown callback type ${callback.javaClass.name}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import com.gxf.utilities.kafka.oauth.handler.OAuthAuthenticateCallbackHandler.Co
import com.gxf.utilities.kafka.oauth.handler.OAuthAuthenticateCallbackHandler.Companion.SCOPE_CONFIG
import com.gxf.utilities.kafka.oauth.handler.OAuthAuthenticateCallbackHandler.Companion.TOKEN_ENDPOINT_CONFIG
import com.gxf.utilities.kafka.oauth.handler.OAuthAuthenticateCallbackHandler.Companion.TOKEN_FILE_CONFIG
import java.lang.IllegalArgumentException
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
import org.assertj.core.api.Assertions
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import java.lang.IllegalArgumentException
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED

class OAuthAuthenticateCallbackHandlerTest {

Expand Down Expand Up @@ -45,8 +45,8 @@ class OAuthAuthenticateCallbackHandlerTest {
val handler = OAuthAuthenticateCallbackHandler()

Assertions.assertThatThrownBy {
handler.configure(emptyMap<String?, Any>(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, emptyList())
}
handler.configure(emptyMap<String?, Any>(), OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, emptyList())
}
.isInstanceOf(IllegalArgumentException::class.java)
.hasMessage("Must supply exactly 1 non-null JAAS mechanism configuration (size was 0)")
}
Expand All @@ -67,11 +67,10 @@ class OAuthAuthenticateCallbackHandlerTest {
.hasMessage("Could not read Token file from: non-existent-file")
}

private fun options() =
mapOf(
CLIENT_ID_CONFIG to clientId,
TOKEN_ENDPOINT_CONFIG to tokenEndpoint,
SCOPE_CONFIG to scopes,
TOKEN_FILE_CONFIG to tokenFilePath,
)
private fun options() = mapOf(
CLIENT_ID_CONFIG to clientId,
TOKEN_ENDPOINT_CONFIG to tokenEndpoint,
SCOPE_CONFIG to scopes,
TOKEN_FILE_CONFIG to tokenFilePath,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.gxf.utilities.kafka.message

import com.gxf.utilities.kafka.avro.AvroDeserializer
import com.gxf.utilities.kafka.avro.AvroSerializer
import java.util.UUID
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.consumer.Consumer
Expand All @@ -19,6 +18,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.utils.KafkaTestUtils
import java.util.UUID

object IntegrationTestHelper {
fun createByteArrayKafkaConsumer(
Expand Down Expand Up @@ -71,28 +71,25 @@ object IntegrationTestHelper {
return producerFactory.createProducer()
}

private fun producerProps(brokers: String): Map<String, Any> {
return mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
ProducerConfig.LINGER_MS_CONFIG to 1,
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to AvroSerializer::class.java,
)
}
private fun producerProps(brokers: String): Map<String, Any> = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to brokers,
ProducerConfig.BATCH_SIZE_CONFIG to "16384",
ProducerConfig.LINGER_MS_CONFIG to 1,
ProducerConfig.BUFFER_MEMORY_CONFIG to "33554432",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to AvroSerializer::class.java,
)
}

class Message(private var message: String?) : SpecificRecordBase() {
@Suppress("unused") // Used reflectively by Spring Kafka
constructor() : this(null)

companion object {
fun getClassSchema(): Schema =
Schema.Parser()
.parse(
"""{"type":"record","name":"Message","namespace":"com.gxf.utilities.kafka.message","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}"""
)
fun getClassSchema(): Schema = Schema.Parser()
.parse(
"""{"type":"record","name":"Message","namespace":"com.gxf.utilities.kafka.message","fields":[{"name":"message","type":{"type":"string","avro.java.string":"String"}}]}""",
)
}

override fun getSchema() = getClassSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import com.gxf.utilities.kafka.message.IntegrationTestHelper.createByteArrayKafk
import com.gxf.utilities.kafka.message.signing.MessageSigner
import com.gxf.utilities.kafka.message.signing.MessageSigningAutoConfiguration
import com.gxf.utilities.kafka.message.signing.interceptors.MessageSigningInterceptorAutoConfiguration
import java.time.Duration
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.producer.ProducerRecord
import org.assertj.core.api.Assertions.assertThat
Expand All @@ -22,22 +21,26 @@ import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.test.annotation.DirtiesContext
import java.time.Duration

@SpringBootTest(
classes =
[
MessageSigningAutoConfiguration::class,
MessageSigningInterceptorAutoConfiguration::class,
SslAutoConfiguration::class,
KafkaProperties::class,
]
[
MessageSigningAutoConfiguration::class,
MessageSigningInterceptorAutoConfiguration::class,
SslAutoConfiguration::class,
KafkaProperties::class,
],
)
@EmbeddedKafka(topics = ["test-topic"])
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
class MessageSigningInterceptorIT {
@Autowired private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker

@Autowired private lateinit var messageSigner: MessageSigner

@Autowired private lateinit var producerPropertiesForByteArrayRecords: Map<String, Any>

@Autowired private lateinit var producerPropertiesForAvroRecords: Map<String, Any>

val topic = "test-topic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import com.gxf.utilities.kafka.avro.AvroEncoder
import com.gxf.utilities.kafka.message.wrapper.FlexibleSignableMessageWrapper
import com.gxf.utilities.kafka.message.wrapper.SignableMessageWrapper
import io.github.oshai.kotlinlogging.KotlinLogging
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.ssl.pem.PemContent
import org.springframework.core.io.Resource
import org.springframework.stereotype.Component
import java.io.IOException
import java.io.UncheckedIOException
import java.nio.ByteBuffer
Expand All @@ -21,13 +28,6 @@ import java.security.SignatureException
import java.security.spec.X509EncodedKeySpec
import java.util.Base64
import java.util.regex.Pattern
import org.apache.avro.specific.SpecificRecordBase
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.ssl.pem.PemContent
import org.springframework.core.io.Resource
import org.springframework.stereotype.Component

@Component
// Only instantiate when no other bean has been configured
Expand Down Expand Up @@ -213,9 +213,7 @@ class MessageSigner(properties: MessageSigningProperties) {
* @return `true` if this message signer is configured for signature verification and has a public key; `false` if
* not.
*/
fun canVerifyMessageSignatures(key: PublicKey? = verificationKey): Boolean {
return signingEnabled && key != null
}
fun canVerifyMessageSignatures(key: PublicKey? = verificationKey): Boolean = signingEnabled && key != null

/**
* Verifies the signature of the provided `message` using the signature in a message field.
Expand Down Expand Up @@ -345,11 +343,9 @@ class MessageSigner(properties: MessageSigningProperties) {
return verificationSignature.verify(signatureBytes.array())
}

private fun hasAvroHeader(bytes: ByteBuffer): Boolean {
return (bytes.array().size >= AVRO_HEADER_LENGTH) &&
((bytes[0].toInt() and 0xFF) == 0xC3) &&
((bytes[1].toInt() and 0xFF) == 0x01)
}
private fun hasAvroHeader(bytes: ByteBuffer): Boolean = (bytes.array().size >= AVRO_HEADER_LENGTH) &&
((bytes[0].toInt() and 0xFF) == 0xC3) &&
((bytes[1].toInt() and 0xFF) == 0x01)

private fun stripAvroHeader(bytes: ByteBuffer): ByteBuffer {
if (hasAvroHeader(bytes)) {
Expand All @@ -374,16 +370,14 @@ class MessageSigner(properties: MessageSigningProperties) {
}
}

override fun toString(): String {
return String.format(
"MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", sign=%b, verify=%b]",
signatureAlgorithm,
keyAlgorithm,
signatureProvider,
canSignMessages(),
canVerifyMessageSignatures(),
)
}
override fun toString(): String = String.format(
"MessageSigner[algorithm=\"%s\"-\"%s\", provider=\"%s\", sign=%b, verify=%b]",
signatureAlgorithm,
keyAlgorithm,
signatureProvider,
canSignMessages(),
canVerifyMessageSignatures(),
)

companion object {
// Two magic bytes (0xC3, 0x01) followed by an 8-byte fingerprint
Expand Down
Loading
Loading