This tutorial will use Apache Kafka Connect to publish messages from a Cloudant database to a topic. The messages will then be consumed by Apache Kafka Connect's JDBC connector to write to a Db2 Warehouse on Cloud table.
Learn more about Event Streams key concepts
This tutorial will demonstrate how to:
- Set up Cloudant, Event Streams and Db2 Warehouse on Cloud.
- Set up Apache Kafka Connect to connect to Event Streams.
- Configure Apache Kafka Connect with
kafka-connect-cloudantto produce messages for Cloudant documents. - Configure Apache Kafka Connect with
kafka-connect-jdbcto consume messages from a topic and write to Db2 Warehouse on Cloud. - Run Apache Kafka Connect in a standalone worker configuration.
N.B. This is an example only intended as a starting point for using IBM Event Streams and Apache Kafka Connect to extract data from Cloudant and insert into Db2 Warehouse on Cloud. It is not intended as a drop-in replacement for the deprecated Cloudant warehouse integration. In particular this example does not create or alter Db2 tables or handle document deletion.
- Create an IBM Event Streams service instance.

- Create Service Credentials for IBM Event Streams.

- Use the Event Streams
Manageinterface to create a 1) topic name, 2) single partition and, 3) one day retention.
Or alternatively use the api_keyand thekafka_admin_urlfrom your service credentials to create the topic from the command line e.g.
curl -v -H 'Content-Type: application/json' -H 'Accept: */*' -H 'X-Auth-Token: ${api_key}' -d '{ "name": "exampleTopic", "partitions": 1 }' <kafka_admin_url>/admin/topics- Create an IBM Db2 Warehouse on Cloud instance.
- Create a Db2 table to house the data.
When using your own data be sure to choose an appropriate table schema and
consider the specifics of JSON to Struct conversion as documented by
kafka-connect-cloudant(e.g. numbers represented as FLOAT64/double). - Download the Db2 JDBC driver and save the JDBC connection details.
Note: Keep the connection details for configuring thekafka-connect-jdbcsink connector.
-
Download Apache Kafka. This tutorial uses version 1.0.0 for Scala 2.11.0 (
kafka_2.11-1.0.0.tgz). -
Extract the Apacke Kafka installation.
-
Open the
connect-standalone.propertiesfile in theconfigdirectory of the extracted Apache Kafka installation. -
Add or edit these properties, substituting values from your IBM Event Streams service credentials where appropriate.
# Substitute your server endpoints as shown in your service credentials bootstrap.servers=kafka-1.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093,\ kafka-2.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093,\ kafka-3.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093 # Message producer connection configuration, substitute your username and password producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password"; producer.security.protocol=SASL_SSL producer.sasl.mechanism=PLAIN producer.ssl.protocol=TLSv1.2 producer.ssl.enabled.protocols=TLSv1.2 producer.ssl.endpoint.identification.algorithm=HTTPS # Message consumer connection configuration, substitute your username and password consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password"; consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=PLAIN consumer.ssl.protocol=TLSv1.2 consumer.ssl.enabled.protocols=TLSv1.2 consumer.ssl.endpoint.identification.algorithm=HTTPS # Substitue with the path to the file you want to use to store offset values offset.storage.file.filename=offsetFile # Kafka connect message configuration key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter
Configuration for publishing messages from Cloudant to Event Streams using Apache Kafka and kafka-connect-cloudant as a source connector.
-
Download
kafka-connect-cloudantversion 0.100.1 jar for use with Kafka 1.0.0 APIs from Maven central. -
Create a
connect-cloudant-source.propertiesfile. -
Add these properties, substituting your details where appropriate.
# Substitute with a name for your source connector instance name=cdt-kafka-example connector.class=com.ibm.cloudant.kafka.connect.CloudantSourceConnector # Substitute with the name of your topic topics=exampleTopic # Substitute with your account and database cloudant.db.url=https://account.cloudant.com/animaldb # Substitute with your Cloudant credentials cloudant.db.username= cloudant.db.password= cloudant.db.since=0 batch.size=50 # Configure kafka-connect-cloudant to generate structs from the JSON documents cloudant.value.schema.struct=true # Flattening is not needed for this tutorial, but if you have nested objects and # arrays in your JSON documents consider enabling flattening for use with the # kafka-connect-jdbc sink. This may not be necessary for other sink connectors # that can handle nested structs. cloudant.value.schema.struct.flatten=false # Configure kafka-connect-cloudant to omit design documents cloudant.omit.design.docs=true
Configuration for consuming messages from Event Streams with Apache Kafka Connect and writing them to a Db2 Warehouse on Cloud using kafka-connect-jdbc as a sink connector.
-
Download the
kafka-connect-jdbcjar from Confluent's Maven repository -
Create a
connect-jdbc-sink.propertiesfile -
Add these properties, substituting your details where appropriate.
# Substitute with a name for your sink connector instance name=db2-kafka-example connector.class=io.confluent.connect.jdbc.JdbcSinkConnector # Substitute with the name of your topic topics=exampleTopic # Substitute with the JDBC connection URL from your Db2 Warehouse on Cloud copied earlier connection.url=jdbc:db2://dashdb-entry-yp-lon02-01.services.eu-gb.bluemix.net:50001/BLUDB:user=username;password=password;sslConnection=true; batch.size=1 table.name.format=ANIMALS
Note that these options are not currently supported by the JDBC sink connector when using the db2 JDBC dialect:
auto.createfor creating SQL tablesauto.evolvefor adjusting SQL tablesinsert.mode=upsertorinsert.mode=update
Running Apache Kafka Connect in standalone worker configuration to produce and consume messages using the source and sink configurations created earlier.
-
Add the
kafka-connect-cloudantconnector, thekafka-connect-jdbcconnector, and the Db2 JDBC 4 jar (installed as part of the Db2 driver package) to the JavaCLASSPATHas appropriate for your platform (note this example uses the defaultsqllib/javainstall path) e.g.export CLASSPATH=/path/to/downloaded/kafka-connect-cloudant-0.100.1-kafka-1.0.0.jar:/path/to/downloaded/kafka-connect-jdbc-4.0.0.jar:/sqllib/java/db2jcc4.jarNote this example uses the Linux default installation location for the db2jcc4.jar it may be diff
-
Run the Kafka Connect standalone worker with the property files created and configured earlier.
/path/to/your/kafka_2.11-1.0.0/bin/connect-standalone.sh /path/to/your/kafka_2.11-1.0.0/config/connect-standalone.properties /path/to/your/connect-cloudant-source.properties /path/to/your/connect-jdbc-sink.properties
-
The Kafka Connect worker will run
- Reading documents from the Cloudant database and publishing them to the Event Streams topic
- On stdout you will see a message like
INFO Return 20 records with last offset "..."askafka-connect-cloudantpublishes messages to the topic.
- On stdout you will see a message like
- Consuming messages from the Event Streams topic and inserting them into the Db2 Warehouse on Cloud table.
- Reading documents from the Cloudant database and publishing them to the Event Streams topic
-
The worker will continue to run until a terminal error or it is stopped.