Skip to content

Latest commit

 

History

History
184 lines (150 loc) · 10.7 KB

File metadata and controls

184 lines (150 loc) · 10.7 KB

Using IBM Event Streams and Apache Kafka to copy documents from Cloudant to Db2 Warehouse on Cloud

This tutorial will use Apache Kafka Connect to publish messages from a Cloudant database to a topic. The messages will then be consumed by Apache Kafka Connect's JDBC connector to write to a Db2 Warehouse on Cloud table.

Learn more about Event Streams key concepts

This tutorial will demonstrate how to:

  1. Set up Cloudant, Event Streams and Db2 Warehouse on Cloud.
  2. Set up Apache Kafka Connect to connect to Event Streams.
  3. Configure Apache Kafka Connect with kafka-connect-cloudant to produce messages for Cloudant documents.
  4. Configure Apache Kafka Connect with kafka-connect-jdbc to consume messages from a topic and write to Db2 Warehouse on Cloud.
  5. Run Apache Kafka Connect in a standalone worker configuration.

N.B. This is an example only intended as a starting point for using IBM Event Streams and Apache Kafka Connect to extract data from Cloudant and insert into Db2 Warehouse on Cloud. It is not intended as a drop-in replacement for the deprecated Cloudant warehouse integration. In particular this example does not create or alter Db2 tables or handle document deletion.

Setting up IBM Cloud services

Cloudant

  1. Create an IBM Cloudant instance.
  2. Replicate the tutorial data.

IBM Event Streams

  1. Create an IBM Event Streams service instance. Create Event Streams instance
  2. Create Service Credentials for IBM Event Streams. Create Event Streams service credentials
  3. Use the Event Streams Manage interface to create a 1) topic name, 2) single partition and, 3) one day retention. Create Event Streams topic name Create Event Streams topic partitions Create Event Streams topic retention Or alternatively use the api_key and the kafka_admin_url from your service credentials to create the topic from the command line e.g.
curl -v -H 'Content-Type: application/json' -H 'Accept: */*' -H 'X-Auth-Token: ${api_key}' -d '{ "name": "exampleTopic", "partitions": 1 }' <kafka_admin_url>/admin/topics

Db2 Warehouse on Cloud

  1. Create an IBM Db2 Warehouse on Cloud instance.
  2. Create a Db2 table to house the data. When using your own data be sure to choose an appropriate table schema and consider the specifics of JSON to Struct conversion as documented by kafka-connect-cloudant (e.g. numbers represented as FLOAT64/double).
  3. Download the Db2 JDBC driver and save the JDBC connection details.
    Note: Keep the connection details for configuring the kafka-connect-jdbc sink connector.

Setting up Apache Kafka Connect

  1. Download Apache Kafka. This tutorial uses version 1.0.0 for Scala 2.11.0 (kafka_2.11-1.0.0.tgz).

  2. Extract the Apacke Kafka installation.

  3. Open the connect-standalone.properties file in the config directory of the extracted Apache Kafka installation.

  4. Add or edit these properties, substituting values from your IBM Event Streams service credentials where appropriate.

    # Substitute your server endpoints as shown in your service credentials
    bootstrap.servers=kafka-1.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093,\
    kafka-2.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093,\
    kafka-3.mh-lndsvjgsxyspcxffwfhydnwfrwggs.eu-gb.containers.appdomain.cloud:9093
    
    # Message producer connection configuration, substitute your username and password
    producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=PLAIN
    producer.ssl.protocol=TLSv1.2
    producer.ssl.enabled.protocols=TLSv1.2
    producer.ssl.endpoint.identification.algorithm=HTTPS
    
    # Message consumer connection configuration, substitute your username and password
    consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=PLAIN
    consumer.ssl.protocol=TLSv1.2
    consumer.ssl.enabled.protocols=TLSv1.2
    consumer.ssl.endpoint.identification.algorithm=HTTPS
    
    # Substitue with the path to the file you want to use to store offset values
    offset.storage.file.filename=offsetFile
    
    # Kafka connect message configuration
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter

Configure kafka-connect-cloudant as a source connector

Configuration for publishing messages from Cloudant to Event Streams using Apache Kafka and kafka-connect-cloudant as a source connector.

  1. Download kafka-connect-cloudant version 0.100.1 jar for use with Kafka 1.0.0 APIs from Maven central.

  2. Create a connect-cloudant-source.properties file.

  3. Add these properties, substituting your details where appropriate.

    # Substitute with a name for your source connector instance
    name=cdt-kafka-example
    connector.class=com.ibm.cloudant.kafka.connect.CloudantSourceConnector
    # Substitute with the name of your topic
    topics=exampleTopic
    # Substitute with your account and database
    cloudant.db.url=https://account.cloudant.com/animaldb
    # Substitute with your Cloudant credentials
    cloudant.db.username=
    cloudant.db.password=
    cloudant.db.since=0
    batch.size=50
    # Configure kafka-connect-cloudant to generate structs from the JSON documents
    cloudant.value.schema.struct=true
    # Flattening is not needed for this tutorial, but if you have nested objects and
    # arrays in your JSON documents consider enabling flattening for use with the
    # kafka-connect-jdbc sink. This may not be necessary for other sink connectors
    # that can handle nested structs.
    cloudant.value.schema.struct.flatten=false
    # Configure kafka-connect-cloudant to omit design documents
    cloudant.omit.design.docs=true

Configure kafka-connect-jdbc as a sink connector

Configuration for consuming messages from Event Streams with Apache Kafka Connect and writing them to a Db2 Warehouse on Cloud using kafka-connect-jdbc as a sink connector.

  1. Download the kafka-connect-jdbc jar from Confluent's Maven repository

  2. Create a connect-jdbc-sink.properties file

  3. Add these properties, substituting your details where appropriate.

    # Substitute with a name for your sink connector instance
    name=db2-kafka-example
    connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
    # Substitute with the name of your topic
    topics=exampleTopic
    # Substitute with the JDBC connection URL from your Db2 Warehouse on Cloud copied earlier
    connection.url=jdbc:db2://dashdb-entry-yp-lon02-01.services.eu-gb.bluemix.net:50001/BLUDB:user=username;password=password;sslConnection=true;
    batch.size=1
    table.name.format=ANIMALS

Note that these options are not currently supported by the JDBC sink connector when using the db2 JDBC dialect:

  • auto.create for creating SQL tables
  • auto.evolve for adjusting SQL tables
  • insert.mode=upsert or insert.mode=update

Running Apache Kafka Connect

Running Apache Kafka Connect in standalone worker configuration to produce and consume messages using the source and sink configurations created earlier.

  1. Add the kafka-connect-cloudant connector, the kafka-connect-jdbc connector, and the Db2 JDBC 4 jar (installed as part of the Db2 driver package) to the Java CLASSPATH as appropriate for your platform (note this example uses the default sqllib/java install path) e.g.

    export CLASSPATH=/path/to/downloaded/kafka-connect-cloudant-0.100.1-kafka-1.0.0.jar:/path/to/downloaded/kafka-connect-jdbc-4.0.0.jar:/sqllib/java/db2jcc4.jar

    Note this example uses the Linux default installation location for the db2jcc4.jar it may be diff

  2. Run the Kafka Connect standalone worker with the property files created and configured earlier.

    /path/to/your/kafka_2.11-1.0.0/bin/connect-standalone.sh /path/to/your/kafka_2.11-1.0.0/config/connect-standalone.properties /path/to/your/connect-cloudant-source.properties /path/to/your/connect-jdbc-sink.properties
  3. The Kafka Connect worker will run

    1. Reading documents from the Cloudant database and publishing them to the Event Streams topic
      • On stdout you will see a message like INFO Return 20 records with last offset "..." as kafka-connect-cloudant publishes messages to the topic.
    2. Consuming messages from the Event Streams topic and inserting them into the Db2 Warehouse on Cloud table.
  4. The worker will continue to run until a terminal error or it is stopped.

  5. View the extracted data imported into the Db2 table.

References