[TOC]
实验
消费组(一个分区)
创建 topic
1
| docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 1 --topic test
|
生产消息
1 2
| /opt/kafka/bin/kafka-console-producer.sh --broker-list mykafka:9092 --topic test
|
两个组名称不同
生产者每生产一份数据,两个消费组都会收到消息。对应 pulsar中 shared 并 subScriptonName 不同, 每一个组都能消费所有的数据。
1 2 3 4 5 6 7
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group1 --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group2 --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
|
两个组名称相同,两个随机分组,两个不指定分组
- 两个组名称相同,则其中一个会阻塞,另一个能一直获取数据,如果活跃的那个连接挂了,那么阻塞的那个client会获取到连接,并处理接下来的数据。
- 随机分组和不指定分组, 每一个client都能获取到所有的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
python3 main.py
python3 main.py
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
|
单消费(三个分区)
创建 topic
1 2
| docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 3 --topic test3
|
生产消息
1 2
| /opt/kafka/bin/kafka-console-producer.sh --broker-list mykafka:9092 --topic test3
|
两个消费者-不指定分区
两个消费者都能收到同一条消息
1 2 3
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning
|
两个消费者-指定分组
两个消费者来消费所有的数据
1 2 3
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
|
四个消费者-指定分组
分组和分区不能同时指定,指定了分组,那么同一个组里的 client 会一起去消费数据,一个消费者消费一个分区(为client 分配了的 分区id),如果消费者数量大于分区数量,那么有一些消费就会挂起,直到一些消费者crash, 这些挂起的消费者才可能获得连接,并开始接收消息。
1 2 3 4 5 6 7
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1
|
五个消费者-指定分区(不指定分组)
在分区3上无法收到消息。指定了相同分区的消费者,都能收到同一条消息。分区消费对应着 pulsar 中的 shared 并且 subScriptoinName 相同,所有的 worker 一起消费所有的数据。
1 2 3 4 5 6 7 8 9 10 11
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 0
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 3
|
五个消费者-指定分区和不指定分区混合(不指定分组)
1 2 3 4 5 6 7 8 9 10 11
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 0
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 1
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning
|
docker 启动
go-science/midd/kafka/standalone
指令
1 2 3 4 5 6 7 8 9
| docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 1 --topic test
docker exec mykafka kafka-topics.sh --list --zookeeper myzookeeper:2181
docker exec -it mykafka bash /opt/kafka/bin/kafka-topics.sh --list --zookeeper myzookeeper:2181
|
消息
注意:如果事先没有使用 kafka-topics 命令来手工创建 Topic,直接使用下面的内容进行消息创建时也会自动创建 Topics。
首先创建消息生产者。执行如下命令启动 Kafka 基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。
1 2
| docker exec -it mykafka kafka-console-producer.sh --broker-list mykafka:9092 --topic test
|
接着创建消息消费者。我们打开另一个命令窗口执行如下执行命令启动 Kafka 基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到输出了之前我们在消息生产客户端中发送的消息。
1
| docker exec -it mykafka kafka-console-consumer.sh --bootstrap-server mykafka:9092 --topic test --from-beginning
|
指令总结
topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --delete
|
消息
1 2
| /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
|
消费消息
1 2 3 4 5 6 7 8 9 10 11
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1
|
消费者 Group
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete
/opt/kafka/bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
/opt/kafka/bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
|
消费者组
消费者组是属于集群的,不是属于 topic 的
1
| /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
|
参考
Docker - 通过容器部署Kafka环境教程(以及ZooKeeper)
kafka常用命令
docker compose 启动 kafka