Skip to content

Commit 27f9165

Browse files
committed
Add TCP/TLS integration tests and examples for Java SDK
Test TLS with CA cert, server cert, plain TCP failure, and message flow over TLS
1 parent 514b5c5 commit 27f9165

4 files changed

Lines changed: 519 additions & 0 deletions

File tree

examples/java/build.gradle.kts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,12 @@ tasks.register<JavaExec>("runAsyncConsumer") {
110110
mainClass.set("org.apache.iggy.examples.async.AsyncConsumer")
111111
}
112112

113+
tasks.register<JavaExec>("runTcpTlsProducer") {
114+
classpath = sourceSets["main"].runtimeClasspath
115+
mainClass.set("org.apache.iggy.examples.tcptls.producer.TcpTlsProducer")
116+
}
117+
118+
tasks.register<JavaExec>("runTcpTlsConsumer") {
119+
classpath = sourceSets["main"].runtimeClasspath
120+
mainClass.set("org.apache.iggy.examples.tcptls.consumer.TcpTlsConsumer")
121+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iggy.examples.tcptls.consumer;
21+
22+
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
23+
import org.apache.iggy.consumergroup.Consumer;
24+
import org.apache.iggy.identifier.StreamId;
25+
import org.apache.iggy.identifier.TopicId;
26+
import org.apache.iggy.message.Message;
27+
import org.apache.iggy.message.PolledMessages;
28+
import org.apache.iggy.message.PollingStrategy;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.math.BigInteger;
33+
import java.nio.charset.StandardCharsets;
34+
import java.util.Optional;
35+
36+
/**
37+
* TCP/TLS Consumer Example
38+
*
39+
* <p>Demonstrates consuming messages over a TLS-encrypted TCP connection
40+
* using custom certificates from core/certs/.
41+
*
42+
* <p>Prerequisites: Start the Iggy server with TLS enabled:
43+
* <pre>
44+
* IGGY_TCP_TLS_ENABLED=true \
45+
* IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
46+
* IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
47+
* cargo r --bin iggy-server
48+
* </pre>
49+
*/
50+
public final class TcpTlsConsumer {
51+
52+
private static final StreamId STREAM_ID = StreamId.of("tls-stream");
53+
private static final TopicId TOPIC_ID = TopicId.of("tls-topic");
54+
55+
private static final long PARTITION_ID = 0L;
56+
57+
private static final int BATCHES_LIMIT = 5;
58+
59+
private static final long MESSAGES_PER_BATCH = 10L;
60+
private static final long INTERVAL_MS = 500;
61+
62+
private static final Logger log = LoggerFactory.getLogger(TcpTlsConsumer.class);
63+
64+
private TcpTlsConsumer() {}
65+
66+
public static void main(String[] args) {
67+
// Build a TCP client with TLS enabled.
68+
// enableTls() activates TLS on the TCP transport
69+
// tlsCertificate(...) points to the CA certificate used to verify the server cert
70+
var client = IggyTcpClient.builder()
71+
.host("localhost")
72+
.port(8090)
73+
.enableTls()
74+
.tlsCertificate("../../core/certs/iggy_ca_cert.pem")
75+
.credentials("iggy", "iggy")
76+
.buildAndLogin();
77+
78+
consumeMessages(client);
79+
}
80+
81+
private static void consumeMessages(IggyTcpClient client) {
82+
log.info(
83+
"Messages will be consumed from stream: {}, topic: {}, partition: {} with interval {}ms.",
84+
STREAM_ID,
85+
TOPIC_ID,
86+
PARTITION_ID,
87+
INTERVAL_MS);
88+
89+
BigInteger offset = BigInteger.ZERO;
90+
int consumedBatches = 0;
91+
92+
Consumer consumer = Consumer.of(0L);
93+
94+
while (true) {
95+
if (consumedBatches == BATCHES_LIMIT) {
96+
log.info("Consumed {} batches of messages, exiting.", consumedBatches);
97+
return;
98+
}
99+
100+
try {
101+
PolledMessages polledMessages = client.messages()
102+
.pollMessages(
103+
STREAM_ID,
104+
TOPIC_ID,
105+
Optional.of(PARTITION_ID),
106+
consumer,
107+
PollingStrategy.offset(offset),
108+
MESSAGES_PER_BATCH,
109+
false);
110+
111+
if (polledMessages.messages().isEmpty()) {
112+
log.info("No messages found.");
113+
Thread.sleep(INTERVAL_MS);
114+
continue;
115+
}
116+
117+
for (Message message : polledMessages.messages()) {
118+
handleMessage(message, offset);
119+
}
120+
121+
consumedBatches++;
122+
123+
offset = offset.add(BigInteger.valueOf(polledMessages.messages().size()));
124+
125+
Thread.sleep(INTERVAL_MS);
126+
127+
} catch (InterruptedException e) {
128+
Thread.currentThread().interrupt();
129+
break;
130+
} catch (Exception e) {
131+
log.error("Error polling messages", e);
132+
break;
133+
}
134+
}
135+
}
136+
137+
private static void handleMessage(Message message, BigInteger offset) {
138+
String payload = new String(message.payload(), StandardCharsets.UTF_8);
139+
log.info("Handling message at offset {}, payload: {}...", offset, payload);
140+
}
141+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iggy.examples.tcptls.producer;
21+
22+
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
23+
import org.apache.iggy.identifier.StreamId;
24+
import org.apache.iggy.identifier.TopicId;
25+
import org.apache.iggy.message.Message;
26+
import org.apache.iggy.message.Partitioning;
27+
import org.apache.iggy.stream.StreamDetails;
28+
import org.apache.iggy.topic.CompressionAlgorithm;
29+
import org.apache.iggy.topic.TopicDetails;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.math.BigInteger;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.Optional;
37+
38+
import static java.util.Optional.empty;
39+
40+
/**
41+
* TCP/TLS Producer Example
42+
*
43+
* <p>Demonstrates producing messages over a TLS-encrypted TCP connection
44+
* using custom certificates from core/certs/.
45+
*
46+
* <p>Prerequisites: Start the Iggy server with TLS enabled:
47+
* <pre>
48+
* IGGY_TCP_TLS_ENABLED=true \
49+
* IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \
50+
* IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \
51+
* cargo r --bin iggy-server
52+
* </pre>
53+
*/
54+
public final class TcpTlsProducer {
55+
56+
private static final String STREAM_NAME = "tls-stream";
57+
private static final StreamId STREAM_ID = StreamId.of(STREAM_NAME);
58+
59+
private static final String TOPIC_NAME = "tls-topic";
60+
private static final TopicId TOPIC_ID = TopicId.of(TOPIC_NAME);
61+
62+
private static final long PARTITION_ID = 0L;
63+
64+
private static final int BATCHES_LIMIT = 5;
65+
66+
private static final int MESSAGES_PER_BATCH = 10;
67+
private static final long INTERVAL_MS = 500;
68+
69+
private static final Logger log = LoggerFactory.getLogger(TcpTlsProducer.class);
70+
71+
private TcpTlsProducer() {}
72+
73+
public static void main(String[] args) {
74+
// Build a TCP client with TLS enabled.
75+
// enableTls() activates TLS on the TCP transport
76+
// tlsCertificate(...) points to the CA certificate used to verify the server cert
77+
var client = IggyTcpClient.builder()
78+
.host("localhost")
79+
.port(8090)
80+
.enableTls()
81+
.tlsCertificate("../../core/certs/iggy_ca_cert.pem")
82+
.credentials("iggy", "iggy")
83+
.buildAndLogin();
84+
85+
createStream(client);
86+
createTopic(client);
87+
produceMessages(client);
88+
}
89+
90+
private static void produceMessages(IggyTcpClient client) {
91+
log.info(
92+
"Messages will be sent to stream: {}, topic: {}, partition: {} with interval {}ms.",
93+
STREAM_NAME,
94+
TOPIC_NAME,
95+
PARTITION_ID,
96+
INTERVAL_MS);
97+
98+
int currentId = 0;
99+
int sentBatches = 0;
100+
101+
Partitioning partitioning = Partitioning.partitionId(PARTITION_ID);
102+
103+
while (sentBatches < BATCHES_LIMIT) {
104+
try {
105+
Thread.sleep(INTERVAL_MS);
106+
} catch (InterruptedException e) {
107+
Thread.currentThread().interrupt();
108+
break;
109+
}
110+
111+
List<Message> messages = new ArrayList<>();
112+
for (int i = 0; i < MESSAGES_PER_BATCH; i++) {
113+
currentId++;
114+
String payload = "message-" + currentId;
115+
messages.add(Message.of(payload));
116+
}
117+
118+
client.messages().sendMessages(STREAM_ID, TOPIC_ID, partitioning, messages);
119+
sentBatches++;
120+
log.info("Sent {} message(s).", MESSAGES_PER_BATCH);
121+
}
122+
123+
log.info("Sent {} batches of messages, exiting.", sentBatches);
124+
}
125+
126+
private static void createStream(IggyTcpClient client) {
127+
Optional<StreamDetails> stream = client.streams().getStream(STREAM_ID);
128+
if (stream.isPresent()) {
129+
return;
130+
}
131+
client.streams().createStream(STREAM_NAME);
132+
log.info("Stream {} was created.", STREAM_NAME);
133+
}
134+
135+
private static void createTopic(IggyTcpClient client) {
136+
Optional<TopicDetails> topic = client.topics().getTopic(STREAM_ID, TOPIC_ID);
137+
if (topic.isPresent()) {
138+
log.warn("Topic already exists and will not be created again.");
139+
return;
140+
}
141+
client.topics()
142+
.createTopic(
143+
STREAM_ID,
144+
1L,
145+
CompressionAlgorithm.None,
146+
BigInteger.ZERO,
147+
BigInteger.ZERO,
148+
empty(),
149+
TOPIC_NAME);
150+
log.info("Topic {} was created.", TOPIC_NAME);
151+
}
152+
}

0 commit comments

Comments
 (0)