|
| 1 | +/** |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + * or more contributor license agreements. See the NOTICE file |
| 4 | + * distributed with this work for additional information |
| 5 | + * regarding copyright ownership. The ASF licenses this file |
| 6 | + * to you under the Apache License, Version 2.0 (the |
| 7 | + * "License"); you may not use this file except in compliance |
| 8 | + * with the License. You may obtain a copy of the License at |
| 9 | + * |
| 10 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | + * |
| 12 | + * Unless required by applicable law or agreed to in writing, |
| 13 | + * software distributed under the License is distributed on an |
| 14 | + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + * KIND, either express or implied. See the License for the |
| 16 | + * specific language governing permissions and limitations |
| 17 | + * under the License. |
| 18 | + */ |
| 19 | + |
| 20 | +// TCP/TLS Consumer Example |
| 21 | +// |
| 22 | +// Demonstrates consuming messages over a TLS-encrypted TCP connection |
| 23 | +// using custom certificates from core/certs/. |
| 24 | +// |
| 25 | +// Prerequisites: |
| 26 | +// Start the Iggy server with TLS enabled: |
| 27 | +// IGGY_TCP_TLS_ENABLED=true \ |
| 28 | +// IGGY_TCP_TLS_CERT_FILE=core/certs/iggy_cert.pem \ |
| 29 | +// IGGY_TCP_TLS_KEY_FILE=core/certs/iggy_key.pem \ |
| 30 | +// cargo r --bin iggy-server |
| 31 | +// |
| 32 | +// Run this example (from examples/node/): |
| 33 | +// DEBUG=iggy:* npx tsx src/tcp-tls/consumer.ts |
| 34 | + |
| 35 | +import { readFileSync } from 'node:fs'; |
| 36 | +import { Client, PollingStrategy, Consumer } from 'apache-iggy'; |
| 37 | +import { BATCHES_LIMIT, log, MESSAGES_PER_BATCH, PARTITION_ID, STREAM_ID, TOPIC_ID } from '../utils'; |
| 38 | + |
| 39 | +async function consumeMessages(client: Client): Promise<void> { |
| 40 | + const interval = 500; |
| 41 | + log( |
| 42 | + 'Messages will be consumed from stream: %d, topic: %d, partition: %d with interval %d ms.', |
| 43 | + STREAM_ID, |
| 44 | + TOPIC_ID, |
| 45 | + PARTITION_ID, |
| 46 | + interval, |
| 47 | + ); |
| 48 | + |
| 49 | + let offset = 0; |
| 50 | + let consumedBatches = 0; |
| 51 | + |
| 52 | + while (consumedBatches < BATCHES_LIMIT) { |
| 53 | + try { |
| 54 | + log('Polling for messages...'); |
| 55 | + const polledMessages = await client.message.poll({ |
| 56 | + streamId: STREAM_ID, |
| 57 | + topicId: TOPIC_ID, |
| 58 | + consumer: Consumer.Single, |
| 59 | + partitionId: PARTITION_ID, |
| 60 | + pollingStrategy: PollingStrategy.Offset(BigInt(offset)), |
| 61 | + count: MESSAGES_PER_BATCH, |
| 62 | + autocommit: false, |
| 63 | + }); |
| 64 | + |
| 65 | + if (!polledMessages || polledMessages.messages.length === 0) { |
| 66 | + log('No messages found.'); |
| 67 | + consumedBatches++; |
| 68 | + await new Promise(resolve => setTimeout(resolve, interval)); |
| 69 | + continue; |
| 70 | + } |
| 71 | + |
| 72 | + offset += polledMessages.messages.length; |
| 73 | + |
| 74 | + for (const message of polledMessages.messages) { |
| 75 | + handleMessage(message); |
| 76 | + } |
| 77 | + |
| 78 | + consumedBatches++; |
| 79 | + log('Consumed %d message(s) in batch %d.', polledMessages.messages.length, consumedBatches); |
| 80 | + |
| 81 | + await new Promise(resolve => setTimeout(resolve, interval)); |
| 82 | + } catch (error) { |
| 83 | + log('Error consuming messages: %o', error); |
| 84 | + throw error; |
| 85 | + } |
| 86 | + } |
| 87 | + |
| 88 | + log('Consumed %d batches of messages, exiting.', consumedBatches); |
| 89 | +} |
| 90 | + |
| 91 | +function handleMessage(message: any): void { |
| 92 | + const payload = message.payload.toString('utf8'); |
| 93 | + log( |
| 94 | + `Handling message at offset: ${message.headers.offset}, payload: %s...`, |
| 95 | + payload, |
| 96 | + ); |
| 97 | +} |
| 98 | + |
| 99 | +async function main(): Promise<void> { |
| 100 | + // Configure the client with TLS transport. |
| 101 | + // transport: 'TLS' activates TLS on the TCP connection |
| 102 | + // ca: readFileSync(...) provides the CA certificate to verify the server cert |
| 103 | + // host: 'localhost' must match the server certificate CN/SAN |
| 104 | + const client = new Client({ |
| 105 | + transport: 'TLS', |
| 106 | + options: { |
| 107 | + port: 8090, |
| 108 | + host: 'localhost', |
| 109 | + ca: readFileSync('../../core/certs/iggy_ca_cert.pem'), |
| 110 | + }, |
| 111 | + credentials: { username: 'iggy', password: 'iggy' }, |
| 112 | + }); |
| 113 | + |
| 114 | + try { |
| 115 | + log('TLS consumer has started, selected transport: TLS'); |
| 116 | + log('Connecting to Iggy server over TLS...'); |
| 117 | + |
| 118 | + log('Logging in user...'); |
| 119 | + await client.session.login({ username: 'iggy', password: 'iggy' }); |
| 120 | + log('Logged in successfully.'); |
| 121 | + |
| 122 | + await consumeMessages(client); |
| 123 | + } catch (error) { |
| 124 | + log('Error in main: %o', error); |
| 125 | + process.exitCode = 1; |
| 126 | + } finally { |
| 127 | + await client.destroy(); |
| 128 | + log('Disconnected from server.'); |
| 129 | + } |
| 130 | +} |
| 131 | + |
| 132 | +process.on('unhandledRejection', (reason, promise) => { |
| 133 | + log('Unhandled Rejection at: %o, reason: %o', promise, reason); |
| 134 | + process.exit(1); |
| 135 | +}); |
| 136 | + |
| 137 | +if (import.meta.url === `file://${process.argv[1]}`) { |
| 138 | + void (async () => { |
| 139 | + try { |
| 140 | + await main(); |
| 141 | + } catch (error) { |
| 142 | + log('Main function error: %o', error); |
| 143 | + process.exit(1); |
| 144 | + } |
| 145 | + })(); |
| 146 | +} |
0 commit comments