Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -484,9 +484,9 @@ The Apache Software License, Version 2.0
* Apache Yetus
- org.apache.yetus-audience-annotations-0.12.0.jar
* Kubernetes Client
- io.kubernetes-client-java-23.0.0.jar
- io.kubernetes-client-java-api-23.0.0.jar
- io.kubernetes-client-java-proto-23.0.0.jar
- io.kubernetes-client-java-26.0.0.jar
- io.kubernetes-client-java-api-26.0.0.jar
- io.kubernetes-client-java-proto-26.0.0.jar
* Dropwizard
- io.dropwizard.metrics-metrics-core-4.1.12.1.jar
- io.dropwizard.metrics-metrics-graphite-4.1.12.1.jar
Expand Down
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ skyscreamer = "1.5.0"
zstd-jni = "1.5.7-3"
lz4java = "1.10.3"
spring = "6.2.12"
kubernetesclient = "23.0.0"
kubernetesclient = "26.0.0"
aws-sdk = "1.12.788"
hadoop3 = "3.5.0"
jclouds = "2.6.0"
Expand Down
4 changes: 3 additions & 1 deletion pulsar-broker-auth-oidc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ dependencies {
implementation(libs.asynchttpclient)
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.kubernetes.client.java)
implementation(libs.kubernetes.client.java) {
exclude(group = "software.amazon.awssdk")
}
implementation(libs.okhttp3)
implementation(libs.commons.lang3)
implementation(libs.opentelemetry.api)
Expand Down
1 change: 1 addition & 0 deletions pulsar-functions/runtime/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ dependencies {
exclude(group = "org.bouncycastle", module = "bcutil-jdk18on")
exclude(group = "org.bouncycastle", module = "bcprov-jdk18on")
exclude(group = "javax.annotation", module = "javax.annotation-api")
exclude(group = "software.amazon.awssdk")
}
implementation(libs.simpleclient.hotspot)
implementation(libs.prometheus.jmx.collector)
Expand Down
4 changes: 3 additions & 1 deletion pulsar-functions/secrets/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ plugins {

dependencies {
implementation(project(":pulsar-functions:pulsar-functions-proto"))
implementation(libs.kubernetes.client.java)
implementation(libs.kubernetes.client.java) {
exclude(group = "software.amazon.awssdk")
}
implementation(libs.gson)
implementation(libs.commons.lang3)
}
31 changes: 22 additions & 9 deletions tests/integration/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ dependencies {
testImplementation(libs.restassured)
testImplementation(libs.testcontainers.k3s)
testImplementation(libs.jetty.websocket.jetty.client)
testImplementation(libs.joda.time)
testImplementation(libs.kubernetes.client.java) {
exclude(group = "io.prometheus", module = "simpleclient_httpserver")
exclude(group = "org.bouncycastle")
exclude(group = "javax.annotation", module = "javax.annotation-api")
exclude(group = "software.amazon.awssdk")
}
testImplementation(libs.kubernetes.client.java.api.fluent) {
exclude(group = "io.prometheus", module = "simpleclient_httpserver")
exclude(group = "org.bouncycastle")
exclude(group = "javax.annotation", module = "javax.annotation-api")
exclude(group = "software.amazon.awssdk")
}
}

Expand All @@ -79,26 +82,36 @@ tasks.test {
}

// Register a task for each integration test suite
val integrationTestSuiteFile = providers.gradleProperty("integrationTestSuiteFile").getOrElse("pulsar.xml")
val integrationTestSuiteFileProperty = providers.gradleProperty("integrationTestSuiteFile")
val integrationTestSuiteFile = integrationTestSuiteFileProperty.getOrElse("pulsar.xml")
val integrationTestSuiteFileExplicit = integrationTestSuiteFileProperty.isPresent
val integrationTestGroups = providers.gradleProperty("testGroups").orNull
val integrationTestExcludedGroups = providers.gradleProperty("excludedTestGroups").orNull
val ideaActive = providers.systemProperty("idea.active").map { it.toBoolean() }.getOrElse(false)
// When `--tests` is passed on the CLI, let TestNG discover tests directly from the classpath
// instead of restricting discovery to the suite XML — unless -PintegrationTestSuiteFile was
// set explicitly, in which case the user-selected suite still wins.
val hasCliTestsFilter = gradle.startParameter.taskRequests
.flatMap { it.args }
.any { it == "--tests" }
val integrationTest by tasks.registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

useTestNG {
suites("src/test/resources/${integrationTestSuiteFile}")
if (!integrationTestGroups.isNullOrEmpty()) {
includeGroups(integrationTestGroups)
}
if (!integrationTestExcludedGroups.isNullOrEmpty()) {
excludeGroups(integrationTestExcludedGroups)
if (!ideaActive && (!hasCliTestsFilter || integrationTestSuiteFileExplicit)) {
useTestNG {
suites("src/test/resources/${integrationTestSuiteFile}")
if (!integrationTestGroups.isNullOrEmpty()) {
includeGroups(integrationTestGroups)
}
if (!integrationTestExcludedGroups.isNullOrEmpty()) {
excludeGroups(integrationTestExcludedGroups)
}
}
}

val failFastValue = providers.gradleProperty("testFailFast").getOrElse("true").toBoolean()
failFast = failFastValue
val ideaActive = providers.systemProperty("idea.active").map { it.toBoolean() }.getOrElse(false)
val defaultTestRetryCount = if (ideaActive) "0" else "1"
systemProperty("testRetryCount", providers.gradleProperty("testRetryCount").getOrElse(defaultTestRetryCount))
systemProperty("testFailFast", failFastValue.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import lombok.CustomLog;
import lombok.Getter;
import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.KubernetesSecretsProviderConfigurator;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.tools.tar.TarEntry;
import org.apache.tools.tar.TarInputStream;
import org.testcontainers.containers.wait.strategy.Wait;
Expand All @@ -73,14 +75,25 @@
* with the deployed Pulsar instance and Kubernetes cluster.
* The main reason to use this base class is to test features in Pulsar which are integrated into Kubernetes
* APIs.
*
* For debugging purposes, it is useful to have the ability to leave containers running.
* This mode can be activated by setting environment variables
* PULSAR_CONTAINERS_LEAVE_RUNNING=true and TESTCONTAINERS_REUSE_ENABLE=true
* For example:
* PULSAR_CONTAINERS_LEAVE_RUNNING=true TESTCONTAINERS_REUSE_ENABLE=true ./gradlew \
* :tests:integration:integrationTest --rerun --tests PulsarFunctionsK8STest --no-daemon
* Check the logs for KUBECONFIG file location to connect to the k3d cluster for debugging. For example:
* KUBECONFIG=/tmp/kubeconfig10863493890794345578.yaml k9s
* After debugging, one can use this command to kill all containers that were left running:
* docker kill $(docker ps -q --filter "label=pulsarcontainer=true")
*/
@CustomLog
public abstract class AbstractPulsarStandaloneK8STest {
private static final String DEFAULT_IMAGE_NAME = System.getenv().getOrDefault("PULSAR_TEST_IMAGE_NAME",
"apachepulsar/java-test-image:latest");
private static final int PULSAR_NODE_PORT = 30101;
private static final int PULSAR_HTTP_NODE_PORT = 30102;
private static final String K3S_IMAGE_NAME = "rancher/k3s:v1.33.5-k3s1";
private static final String K3S_IMAGE_NAME = "rancher/k3s:v1.34.8-k3s1";
private static final String PULSAR_STANDALONE_POD = "pulsar-standalone-pod";
K3sContainer k3sContainer;
KubeConfig kubeConfig;
Expand All @@ -100,6 +113,14 @@ public final void setupCluster() throws IOException, ApiException, InterruptedEx
k3sContainer = new K3sContainer(DockerImageName.parse(K3S_IMAGE_NAME));
k3sContainer.addExposedPort(PULSAR_NODE_PORT);
k3sContainer.addExposedPort(PULSAR_HTTP_NODE_PORT);
if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
// use Testcontainers reuse containers feature to leave the container running
k3sContainer.withReuse(true);
// add label that can be used to find containers that are left running.
k3sContainer.withLabel("pulsarcontainer", "true");
// add a random label to prevent reuse of containers
k3sContainer.withLabel("pulsarcontainer.random", UUID.randomUUID().toString());
}
k3sContainer.start();
dockerHostName = k3sContainer.getHost();
pulsarBrokerUrl = "pulsar://" + dockerHostName + ":" + k3sContainer.getMappedPort(PULSAR_NODE_PORT);
Expand All @@ -110,8 +131,8 @@ public final void setupCluster() throws IOException, ApiException, InterruptedEx
apiClient = Config.fromConfig(kubeConfig);
kubeConfigFile = File.createTempFile("kubeconfig", ".yaml");
Files.writeString(kubeConfigFile.toPath(), kubeConfigYaml);
log.info().attr("uRL", pulsarBrokerUrl).attr("uRL", pulsarWebServiceUrl).log("Pulsar broker URL: http URL");
log.info().attr("kUBECONFIG", kubeConfigFile.getAbsolutePath()).log("For debugging k8s, use KUBECONFIG");
log.info().attr("URL", pulsarBrokerUrl).attr("URL", pulsarWebServiceUrl).log("Pulsar broker URL: http URL");
log.info().attr("KUBECONFIG", kubeConfigFile.getAbsolutePath()).log("For debugging k8s, use KUBECONFIG");
importPulsarImage();
deployPulsarStandalonePod();
log.info("Waiting for Pulsar cluster to be ready");
Expand All @@ -129,6 +150,10 @@ public final void setupCluster() throws IOException, ApiException, InterruptedEx
public final void cleanupCluster() throws InterruptedException {
if (k3sContainer != null) {
copyLogsToTargetDirectory();
if (PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING) {
log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
return;
}
k3sContainer.stop();
kubeConfigFile.delete();
}
Expand Down
Loading