-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhelper.txt
More file actions
94 lines (74 loc) · 3.61 KB
/
helper.txt
File metadata and controls
94 lines (74 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
* Criar conector
POST http://localhost:8083/connectors
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/db_metrics",
"connection.user": "user_metrics",
"connection.password": "password_metrics",
"topics": "CLICK_COUNTS_TABLE_OUTPUT",
"insert.mode": "insert",
"auto.create": "true",
"auto.evolve": "true",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "ConvertStartTime,ConvertEndTime",
"transforms.ConvertStartTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertStartTime.field": "START_TIME",
"transforms.ConvertStartTime.target.type": "Timestamp",
"transforms.ConvertEndTime.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.ConvertEndTime.field": "END_TIME",
"transforms.ConvertEndTime.target.type": "Timestamp"
}
}
* Atualizar conector
PUT http://localhost:8083/connectors/postgres-sink-connector/config
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/db_metrics",
"connection.user": "user_metrics",
"connection.password": "password_metrics",
"topics": "CLICK_COUNTS_TABLE_AVRO_OUTPUT",
"insert.mode": "insert",
"auto.create": "true",
"auto.evolve": "true",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
* Obter status do conector
GET http://localhost:8083/connectors/postgres-sink-connector/status
* Reiniciar conector
POST http://localhost:8083/connectors/postgres-sink-connector/restart
* Excluir conector
DELETE http://localhost:8083/connectors/postgres-sink-connector
* Criar topico
docker exec clickstream-kafka-1 kafka-topics --create --topic click_events --bootstrap-server kafka:9092 --partitions 3 --replication-factor 1
* Cria o Stream
docker exec clickstream-ksql-1 ksql -e "CREATE STREAM IF NOT EXISTS click_events_stream (itemId STRING, campaignId STRING, timestamp STRING) WITH (KAFKA_TOPIC='click_events',VALUE_FORMAT='JSON');"
* Cria a tabela de agregacao
docker exec clickstream-ksql-1 ksql -e "CREATE TABLE IF NOT EXISTS CLICK_COUNTS_TABLE WITH (KAFKA_TOPIC='CLICK_COUNTS_TABLE_OUTPUT', VALUE_FORMAT='AVRO', PARTITIONS=3, REPLICAS=1) AS SELECT CLICK_EVENTS_STREAM.CAMPAIGNID AS CAMPAIGNID, AS_VALUE(CLICK_EVENTS_STREAM.CAMPAIGNID) AS ID, COUNT(*) AS CLICK_COUNT, WINDOWSTART AS START_TIME, WINDOWEND AS END_TIME FROM CLICK_EVENTS_STREAM CLICK_EVENTS_STREAM WINDOW TUMBLING (SIZE 2 MINUTES) GROUP BY CLICK_EVENTS_STREAM.CAMPAIGNID EMIT CHANGES;"
* Consome os dados de um topico:
docker exec clickstream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic CLICK_COUNTS_TABLE_OUTPUT --from-beginning
docker exec clickstream-kafka-1 kafka-console-consumer --bootstrap-server kafka:9092 --topic click_events --from-beginning
psql -h localhost -p 5432 -U user_metrics -d db_metrics
-- Listar bancos de dados
\l
-- Conectar a um banco de dados
\c mydatabase
-- Listar tabelas
\dt
-- Visualizar estrutura de uma tabela
\d my_table
-- Executar consulta
SELECT * FROM my_table;
-- Sair do psql
\q
------
URL
Connect UI: http://localhost:8084/
Items Store: http://localhost:8080/