-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.sh
executable file
·97 lines (88 loc) · 2.39 KB
/
consumer.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/bin/bash
# Simple wrapper for (docked) message consumption from kafka
YELLOW=$(tput setaf 3)
RESET=$(tput sgr0)
IMAGE='wurstmeister/kafka:latest'
BROKERS="${KAFKA_BROKERS:-localhost:9092}"
OFFSET="${KAFKA_OFFSET:-earliest}"
ISOLATION_LEVEL="${KAFKA_ISOLATION_LEVEL:-read_committed}"
FROM_BEGINNING_FLAG=''
TOPIC="${1:-${KAFKA_TOPIC:-$(whoami)-test}}"
usage() {
echo 'Usage: consumer [OPTIONS]'
echo ''
echo 'OPTIONS:'
echo ''
echo '-b, --brokers,'
echo '--bootstrap-server The kafka broker list to initially use to identify the cluster.'
echo ' You can also use the $KAFKA_BROKERS var.'
echo ''
echo '--from-beginning Pass the "from beginning" flag through to the console consumer'
echo ''
echo '-g, --group, The consumer group ID to use to consume'
echo '--consumer-group'
echo ''
echo '--isolation read_committed or read_uncommitted. Defaults to the former.'
echo ' See kafka docs. You can also use the $ISOLATION_LEVEL var.'
echo ''
echo '-o, --offset The kafka offset to use if none exists for the consumer group.'
echo ' Defaults to earliest'
echo ''
echo '-t, --topic TOPIC The kafka topic from which to read; you can also use the'
echo ' $KAFKA_TOPIC env var. If neither is set, defaults'
echo " to $(whoami)-test"
}
# Parse args
while [[ "$1" != '' ]]; do
case "$1" in
-b|--brokers|--bootstrap-server)
shift
BROKERS="$1"
shift
;;
--from-beginning)
shift
FROM_BEGINNING_FLAG='--from-beginning'
;;
-g|--group|--consumer-group)
shift
GROUP="$1"
shift
;;
--isolation)
shift
ISOLATION_LEVEL="$1"
shift
;;
-o|--offset)
shift
OFFSET="$1"
shift
;;
-t|--topic)
shift
TOPIC="$1"
shift
;;
--help)
shift
usage
exit 0
;;
-*)
echo "Unrecognised flag $1" >&2
exit 1
;;
*)
echo "Unexpected argument $1" >&2
exit 2
;;
esac
done
echo "${YELLOW}Reading from $TOPIC at $BROKERS...$RESET"
docker run --rm --entrypoint kafka-console-consumer.sh --net host $IMAGE \
--topic "$TOPIC" \
--bootstrap-server "$BROKERS" \
--group "${GROUP:-"$(whoami)-docked-console-consumer-group"}" \
$FROM_BEGINNING_FLAG \
--skip-message-on-error