Kafka data flow hands-on with public data. Created three EC2 instances to simulating the message broker.
-
Producers:
- Fetch public real estate transaction data from S3 storage.
- Generate Kafka topics,
apartinfo. - Send the data to the queue via Logstash.
-
Kafka Cluster:
- Central message broker that manages topics and queues.
- Contains Topics(
apartinfoin this case)
-
Consumers:
- Subscribes to topics in the Kafka cluster.
- Receives and processes data from the cluster.
- Runs python script to displays recieved data with polling every second.
Producer --> Kafka Cluster(Queue) --> Consumer
$ ssh -i "dataEng-seoul.pem" ec2-user@<your-ec2-public-ip>.ap-northeast-1.compute.amazonaws.com- Use medium-type instances.
-
Download and Extract Kafka
$ wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz $ tar xvf kafka_2.13-3.6.1.tgz $ ln -s kafka_2.13-3.6.1 kafka
-
Start Kafka Services
$ cd kafka $ ./bin/zookeeper-server-start.sh config/zookeeper.properties & $ ./bin/kafka-server-start.sh config/server.properties &
-
Verify Services
$ sudo netstat -anp | egrep "9092|2181"
-
Create a Kafka Topic
$ bin/kafka-topics.sh --create --topic apartinfo --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 & -
List Topics
$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092
-
Consume Messages
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic apartinfo --from-beginning
-
Add Logstash Repository
$ sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch $ sudo vi /etc/yum.repos.d/logstash.repo
Add the following:
[logstash-8.x] name=Elastic repository for 8.x packages baseurl=https://artifacts.elastic.co/packages/8.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-mdInstall and Configure Logstash
$ sudo yum install logstash -y
- Open the initialization file:
$ vi ~/.bash_profileAdd the following lines:
export LS_HOME=/usr/share/logstash PATH=$PATH:$LS_HOME/bin
Apply the changes:
$ source ~/.bash_profile
$ logstash --version -
Create Logstash Config File
$ vi apartinfo.conf
Add S3 config info for fetching log data.
input { s3 { access_key_id => "accesskey" secret_access_key => "security_key" region => "ap-northeast-2" prefix => "ods/danji_master.json/" #set bucket directory bucket => "fc-storydata" additional_settings => { force_path_style => true follow_redirects => false } } } output { stdout { } kafka { codec => json topic_id => "apartinfo" bootstrap_servers => ["[172.31.6.238:kafka server ip]:9092"] } } -
Run Logstash
$ logstash -f apartinfo.conf
-
Create a configuration file for the Kafka consumer:
$ vi consumer_ls.conf
-
Add the following configuration:
input { kafka { bootstrap_servers => "172.31.6.238:9092" group_id => "apart_info" topics => ["apartinfo"] # Topic name consumer_threads => 1 # Number of consumer threads } } output { stdout { codec => rubydebug } }Run Logstash with the configuration file:
$ logstash -f /home/ec2-user/consumer_ls.conf
-
Install the required dependencies:
$ sudo yum install pip -y $ pip install confluent_kafka
-
Create a Python script for the consumer:
$ python3 consumer_ph.py
