forked from DataDog/datadog-process-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathverify-or-create-topics.sh
More file actions
executable file
·89 lines (72 loc) · 2.27 KB
/
verify-or-create-topics.sh
File metadata and controls
executable file
·89 lines (72 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/bin/bash
log() {
echo "$(date +"%D %T") [INFO] $1" >> /usr/local/bin/verify-or-create-topics.log
}
log "Running verify or create topics script"
if [[ -z "$KAFKA_CREATE_TOPICS" ]]; then
log "KAFKA_CREATE_TOPICS env variable not found"
exit 0
fi
if [[ -z "$START_TIMEOUT" ]]; then
START_TIMEOUT=600
fi
start_timeout_exceeded=false
count=0
step=10
while true; do
kafka-topics.sh --bootstrap-server localhost:$KAFKA_PORT --version
# netstat -lnt | grep -q $KAFKA_PORT
if [ $? -eq 0 ]; then
break
fi
log "Waiting for Kafka to be ready"
sleep $step;
count=$((count + step))
if [ $count -gt $START_TIMEOUT ]; then
start_timeout_exceeded=true
break
fi
done
if $start_timeout_exceeded; then
log "Not able to auto-create topic (waited for $START_TIMEOUT sec)"
exit 1
fi
log "Kafka is now ready"
# Retrieve and split the defined $KAFKA_CREATE_TOPICS string
IFS="${KAFKA_CREATE_TOPICS_SEPARATOR-,}" read -ra DEFINED_TOPICS <<< "$KAFKA_CREATE_TOPICS"
# Retrieve the existing kafka topics
ACTIVE_TOPICS="$($KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper zookeeper | grep -v __consumer_offsets | wc -l)"
log "Active Topic Count: ${ACTIVE_TOPICS}"
log "Defined Topic Count: ${#DEFINED_TOPICS[@]}"
if [[ ${ACTIVE_TOPICS} -ge ${#DEFINED_TOPICS[@]} ]]
then
# Healthy
log "Healthy"
log "Exit Code 0"
exit 0
else
# UnHealthy
log "UnHealthy"
# Expected format:
# name:partitions:replicas:cleanup.policy
IFS="${KAFKA_CREATE_TOPICS_SEPARATOR-,}"; for topicToCreate in $KAFKA_CREATE_TOPICS; do
log "Creating topics: $topicToCreate ..."
IFS=':' read -r -a topicConfig <<< "$topicToCreate"
config=
if [ -n "${topicConfig[3]}" ]; then
config="--config=cleanup.policy=${topicConfig[3]}"
fi
COMMAND="JMX_PORT='' ${KAFKA_HOME}/bin/kafka-topics.sh \\
--create \\
--zookeeper ${KAFKA_ZOOKEEPER_CONNECT} \\
--topic ${topicConfig[0]} \\
--partitions ${topicConfig[1]} \\
--replication-factor ${topicConfig[2]} \\
${config} \\
--if-not-exists"
eval "${COMMAND}"
done
log "Exit Code 1"
# Force unhealthy exit to allow the health check to rerun
exit 1
fi