0%

消息队列之kafka 指令

[TOC]

实验

消费组(一个分区)

创建 topic

1
docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 1 --topic test

生产消息

1
2
# 生产消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list mykafka:9092 --topic test

两个组名称不同

生产者每生产一份数据,两个消费组都会收到消息。对应 pulsar中 shared 并 subScriptonName 不同, 每一个组都能消费所有的数据。

1
2
3
4
5
6
7
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group1 --from-beginning

# 一个新的 group 加入会消费数据,这里指定的是 --from-beginning, 所以会消费所有的数据
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group2 --from-beginning

# 这里只会消费后面增加的数据,这里的分区指定为0,因为这里没有分区,所以就是0, 如果指定分区为1,那么无法消费数据
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

两个组名称相同,两个随机分组,两个不指定分组

  • 两个组名称相同,则其中一个会阻塞,另一个能一直获取数据,如果活跃的那个连接挂了,那么阻塞的那个client会获取到连接,并处理接下来的数据。
  • 随机分组和不指定分组, 每一个client都能获取到所有的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 分组相同
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning

# 两个随机分组
# kafka-console-consumer.sh 这个脚本如果不指定 consumer group, 会随机生成一个,类似这样 console-consumer-5089
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# 两个不指定分组
python3 main.py

python3 main.py

# Consumer Group: 每个消费者都属于一个特定的Consumer Group,可通过group.id配置项指定,若不指定group name则默认为test-consumer-group, 这句话不对,但是不指定 group name 是什么还不知道, 因为通过下面的指令,看不到 test-consumer-group 这个 group
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

单消费(三个分区)

创建 topic

1
2
# 三个分区
docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 3 --topic test3

生产消息

1
2
# 生产消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list mykafka:9092 --topic test3

两个消费者-不指定分区

两个消费者都能收到同一条消息

1
2
3
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning

两个消费者-指定分组

两个消费者来消费所有的数据

1
2
3
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1 

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1

四个消费者-指定分组

分组和分区不能同时指定,指定了分组,那么同一个组里的 client 会一起去消费数据,一个消费者消费一个分区(为client 分配了的 分区id),如果消费者数量大于分区数量,那么有一些消费就会挂起,直到一些消费者crash, 这些挂起的消费者才可能获得连接,并开始接收消息。

1
2
3
4
5
6
7
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1 

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 -group test_group1

五个消费者-指定分区(不指定分组)

在分区3上无法收到消息。指定了相同分区的消费者,都能收到同一条消息。分区消费对应着 pulsar 中的 shared 并且 subScriptoinName 相同,所有的 worker 一起消费所有的数据。

1
2
3
4
5
6
7
8
9
10
11
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 0

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 1

# 指定了相同分区的消费者,都能收到同一条消息, 因为这里默认会使用不同的 cg
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2

# 在分区3上无法收到消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 3

五个消费者-指定分区和不指定分区混合(不指定分组)

1
2
3
4
5
6
7
8
9
10
11
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 0

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 1

# 指定了相同分区的消费者,都能收到同一条消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --offset latest --partition 2

# 不指定分区,不指定分区的单独消费一份
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test3 --from-beginning

docker 启动

go-science/midd/kafka/standalone

指令

1
2
3
4
5
6
7
8
9
# 我们执行如下命令可以创建一个名为 test 的 Topic,该 Topic 包含一个分区和一个 Replica。
docker exec mykafka kafka-topics.sh --create --zookeeper myzookeeper:2181 --replication-factor 1 --partitions 1 --topic test

# 创建后可以执行如下命令查看当前的 Topics:
docker exec mykafka kafka-topics.sh --list --zookeeper myzookeeper:2181

# 进入容器内操作
docker exec -it mykafka bash
/opt/kafka/bin/kafka-topics.sh --list --zookeeper myzookeeper:2181

消息

注意:如果事先没有使用 kafka-topics 命令来手工创建 Topic,直接使用下面的内容进行消息创建时也会自动创建 Topics。

首先创建消息生产者。执行如下命令启动 Kafka 基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。

1
2
# 注意:此时我们可以尝试输入几行消息,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为 test 的 Topics 中,直到有消费者将其消费掉。
docker exec -it mykafka kafka-console-producer.sh --broker-list mykafka:9092 --topic test

接着创建消息消费者。我们打开另一个命令窗口执行如下执行命令启动 Kafka 基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到输出了之前我们在消息生产客户端中发送的消息。

1
docker exec -it mykafka kafka-console-consumer.sh --bootstrap-server mykafka:9092 --topic test --from-beginning

指令总结

topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 创建topic, 参数 --topic 指定 Topic 名,--partitions 指定分区数,--replication-factor 指定备份数:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 列出所有 Topic
/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

# 查看 Topic
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

# 增加 Topic 的 partition 数
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5

# 查看 topic 指定分区 offset 的最大值或最小值
# time 为 -1 时表示最大值,为 -2 时表示最小值:
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list 127.0.0.1:9092 --partitions 0

# 删除 topic
/opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic test --delete

消息

1
2
# 生产消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息

1
2
3
4
5
6
7
8
9
10
11
# 从头开始
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# 从尾部开始, 从尾部开始取数据,必需要指定分区:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

# 指定分区
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0

# 取指定个数
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 0 --max-messages 1

消费者 Group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 指定 Group
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test -group test_group --from-beginning

# 消费者 Group 列表
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看 Group 详情
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --describe

# 删除 Group 中 Topic
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --topic test --delete

# 删除 Group
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test_group --delete

# 平衡 leader
/opt/kafka/bin/kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

# 自带压测工具
/opt/kafka/bin/kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

消费者组

消费者组是属于集群的,不是属于 topic 的

1
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

参考

Docker - 通过容器部署Kafka环境教程(以及ZooKeeper)

kafka常用命令

docker compose 启动 kafka