Kafka3 指令指南
基础概念
在 Kafka 目录下的 bin 目录中,我们可以看到很多 .sh
文件和一个 windows
目录,该目录包含了许多 .bat
文件。它们是什么?它们和 Kafka 的使用有何关系?
下面是一些基础概念的解释,我希望你可以通过下面的阅读找到答案。
Shell脚本
Shell 编程语言是一种解释型语言,它允许程序员使用灵活的脚本语法来控制计算机系统。在 Linux 或 Unix 系统上,我们可以使用 vi 或其他文本编辑器编写 Shell 脚本,然后使用 sh
命令来执行它们。Shell 脚本通常以 .sh 扩展名结尾。
bat 脚本
bat 脚本是一种在 Windows 系统上执行批处理命令的程序,用于处理各种 Windows 相关任务,例如编译程序、备份文件、管理系统设置、启动应用程序等等。在 Windows 上,我们可以使用记事本或其他文本编辑器编写 bat 脚本,然后使用 cmd
命令来执行它们。bat 脚本通常以 .bat 扩展名结尾。
Kafka 指令
Kafka 命令行工具提供了一组操作 Kafka 集群的命令。这些Kafka 命令行工具通常以 shell 脚本或 bat 脚本的形式提供,可以在 Linux、Unix 和 Windows 等操作系统上执行。这些脚本文件提供了一种简单而有效的方式,用于执行各种 Kafka 操作,包括但不限于:
- 创建、修改、删除主题
- 启动、停止、重启 Kafka
- 查看 Kafka 集群的状态、主题和消费者组的信息
- 查看 Broker、Controller 和 Producer 的状态
- 向主题发送消息、查看主题中的消息、消费主题中的消息
- 修改 Kafka 配置、修改 Topic 配置等
Kafka 指令非常方便,对于集群管理员和开发人员,是 kafka 配置管理和调试过程中必不可少的一部分。
Option
Kafka 命令行工具的另一个重要特性是它们提供了许多可配置的Option(选项或称标志Flag),这些选项用于自定义命令行工具的行为。选项可以通过命令行参数传递给 Kafka 命令行工具,并可以控制它们的行为。
比如,创建主题时可以使用 --partitions
选项来设置主题的分区数,也可以使用 --replication-factor
选项来设置主题中每个分区的副本数。Kafka 命令行工具的选项通常用 --option_name=value
的形式来指定。这些选项在 shell 脚本或 bat 脚本中使用,可以根据需要自定义 Kafka 命令行工具并指定选项。
下面是一个创建指定分区数和副本数的主题的完整指令样例,该指令将在 shell 脚本中执行:
1 | $ kafka-topics.sh --bootstrap-server localhost:9092 --topic gallifrey --create --partitions 1 -replication-factor 3 |
One Sentence
.sh
和 .bat
文件是分别在 Linux、Unix 和 Windows 系统上执行的脚本文件。它们用于执行 Kafka 的指令。option
是指命令行工具的选项或参数,用于控制工具的行为。
下面的Kafka具体指令说明。
kafka-topics
kafka-topics
主要用于管理 Kafka 的主题,包括创建、删除、列出、查看主题的详细信息等,同时还可以修改主题的配置。
Option | 说明 |
---|---|
--bootstrap-server <String: server to connect to> |
必需:要连接的Kafka服务器。 |
--create |
创建新主题。 |
--partitions <Integer: # of partitions> |
创建或更改主题的分区数量。警告:增加有键的主题的分区会影响消息排序。如果未提供该选项,使用群集默认值。 |
--replication-factor <Integer: replication factor> |
创建的每个分区的副本数量。使用群集默认值,如果未提供该选项。 |
--delete |
删除主题。 |
--list |
列出所有可用主题。 |
--alter |
更改主题的分区数量、副本分配和/或配置。修改分区只能增加不能减少,副本数量不可修改 |
--describe |
列出给定主题的详细信息。 |
--at-min-isr-partitions |
如果设置,仅显示ISR计数等于配置最小值的分区。 |
--command-config <String: command config property file> |
包含传递给Admin Client的配置的属性文件。仅与--bootstrap-server 选项一起使用以描述和更改代理配置。 |
--config <String: name=value> |
主题配置覆盖。包括常见配置项,如清除策略、压缩类型、保留时间等,详见Kafka文档。仅在--create 和--alter 时使用。 |
--delete-config <String: name> |
要删除的主题配置覆盖。不支持--bootstrap-server 选项。 |
--disable-rack-aware |
禁用机架感知的副本分配。 |
--exclude-internal |
运行list或describe命令时排除内部主题。默认情况下,内部主题将被列出。 |
--help |
打印用法信息。 |
--if-exists |
在指定主题存在时,执行操作。 |
--if-not-exists |
在指定主题不存在时,执行操作。 |
--replica-assignment <String: broker_id_for_part1_replica1 : broker_id_for_part1_replica2 , broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> |
分区到Broker及其复制品的手动分配列表,用冒号(:)分隔Broker ID和replica ID,用逗号(,)分隔分区;仅在--create 和--alter 时使用。 |
--topic <String: topic> |
操作的主题名称或使用通配符。仅与--bootstrap-server 选项一起使用。也接受正则表达式,除了--create 选项外。将主题名称放在双引号中,并使用反斜杠( \)前缀转义正则表达式符号;例如 “test.topic”。 |
--topic-id <String: topic-id> |
要描述的主题ID。仅在--bootstrap-server 选项中使用。 |
--topics-with-overrides |
如果设置,仅显示已覆盖配置的主题。 |
--unavailable-partitions |
如果设置,仅显示leader不可用的分区。 |
--under-min-isr-partitions |
如果设置,仅显示ISR计数小于配置最小值的分区。 |
--under-replicated-partitions |
如果设置,仅显示未复制的分区。 |
--version |
显示Kafka版本。 |
kafka-console-producer
kafka-console-producer
主要用于向 Kafka topic 写入数据。它可以将数据输入到指定的 topic 中,并支持从文件中读取数据。通过使用不同的配置选项,用户可以设置生产者的属性,例如指定 broker 地址、序列化方式和写入分区等。
Option | 说明 |
---|---|
--bootstrap-server <String: server to connect to> |
必需:要连接的Kafka服务器。 |
--topic <String: topic> |
记录发送的主题。 |
--batch-size <Integer: size> |
如果消息未同步发送,则在单个批次中要发送的消息数。请注意,如果还设置了max-partition-memory-bytes,则此选项将被替换。默认值为16384 (16k) 。 |
--timeout-ms <Long: timeout> |
将记录发送到代理的超时时间,以毫秒为单位。 |
--compression-codec [String: compression-codec] |
压缩编解码方式:可以是无 ,gzip ,snappy ,lz4 或zstd 。如果未指定值,则默认为gzip 。 |
--help |
打印用法信息。 |
--line-reader <String: reader_class> |
用于从标准输入读取行的类的类名。默认情况下,每行都作为一个单独的消息进行读取。(默认值:kafka.tools.ConsoleProducer$LineMessageReader ) |
--max-block-ms <Long: max block on send> |
生产者在发送请求期间阻塞的最长时间。(默认值:60000) |
--max-memory-bytes <Long: total memory in bytes> |
生产者用于缓冲等待发送到服务器的记录的总内存。这是控制生产者配置中的buffer.memory 的选项。默认值是33554432。 |
--max-partition-memory-bytes <Integer: memory in bytes per partition> |
每个分区分配的缓冲区大小。 当接收到的记录小于此大小时,生产者会尝试将它们乐观地组合在一起,直到达到此大小。 这是控制生产者配置中的batch.size 的选项。默认值为16384。 |
--message-send-max-retries <Integer> |
代理可能因多种原因无法接收消息(如暂时不可用),达到最大重试次数时,生产者会丢弃该消息。这是控制生产者配置中的重试 选项的选项。(默认值:3) |
--metadata-expiry-ms <Long: metadata expiration interval> |
强制刷新元数据的时间段,以毫秒为单位,即使没有看到任何领导者更改也是如此。这是控制生产者配置中的metadata.max.age.ms 的选项。(默认值:300000) |
--producer-property <String: producer_prop> |
以key=value 形式传递自定义属性以传递给生产者。 |
--producer.config <String: config file> |
生产者配置属性文件。请注意,[producer-property]优先于此配置。 |
--property <String: prop> |
以key=value 形式传递自定义属性以传递给生产者或使用者。 |
--request-timeout-ms <Integer: request timeout in milliseconds> |
生产者等待代理答复请求的时间。这是控制生产者配置中的请求超时 选项。(默认值:30000) |
--security-protocol <String: security protocol> |
用于连接代理的安全协议,例如SSL或SASL。 |
--value-serializer <String: serializer class> |
用于序列化记录值的类。默认是org.apache.kafka.common.serialization.StringSerializer 。 |
--version |
显示Kafka版本。 |
--broker-list <String: broker-list> |
已弃用,使用--bootstrap-server 选项代替;如果已指定了--bootstrap-server ,则将被忽略。 以HOST1:PORT1,HOST2:PORT2的形式给出经纪人列表字符串。 |
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | 🚀🚀🚀🚀🚀🚀🚀🚀 |
📌补充:--batch-size
选项控制的是单批次中未同步发送的消息数,这是通过将多条消息乐观地组合成一批消息来实现的。如果未设置此选项,Kafka 生产者将使用默认值 16384。另一个非常重要的选项是 --bootstrap-server
,它指定了连接到 Kafka 服务器所使用的 URL。其他选项,如 --topic
和 --value-serializer
,则会影响消息的目标主题和序列化方式,需根据具体需求设置。
kafka-console-consumer
kafka-console-consumer
主要用于从 Kafka topic 中读取数据。它支持从最早的可用偏移量开始消费数据或从最新的消息开始消费数据,还可以消费一段时间内发布的消息。kafka-console-consumer
提供了多种选项,例如可以指定消费的 topic 和分区,也可以将消费者输出到控制台或文件。此外,kafka-console-consumer
还支持消费者组,因此多个消费者可以共享处理相同主题的数据。
Option | 说明 |
---|---|
--bootstrap-server <String: server to connect to> |
必需:要连接的Kafka服务器。 |
--topic <String: topic> |
必需:要生产消息的主题 ID。 |
--consumer-property <String: consumer_prop> |
以key=value 形式传递自定义属性以传递给使用者。 |
--consumer.config <String: config file> |
使用者配置属性文件。请注意,[consumer-property]优先于此配置。 |
--enable-systest-events |
记录使用者的生命周期事件,以及日志中记录的已使用的消息。(仅供系统测试使用。) |
--formatter <String: class> |
用于格式化 Kafka 消息以供显示的类的名称。(默认值:kafka.tools.DefaultMessageFormatter ) |
--formatter-config <String: config file> |
初始化消息格式化程序的配置属性文件。请注意,[property]优先于此配置。 |
--from-beginning |
如果使用者没有已经建立的偏移量以从中消耗,则从日志中最早出现的消息开始,而不是最新的消息。 |
--group <String: consumer group id> |
使用者的使用者组 ID。 |
--help |
打印用法信息。 |
--include <String: Java regex (String)> |
正则表达式,指定包含在消费中的主题列表。 |
--isolation-level <String> |
使用read_committed以过滤未提交的事务消息,使用read_uncommitted以读取所有消息。(默认值:read_uncommitted ) |
--key-deserializer <String: deserializer for key> |
用于反序列化记录键的类。 |
--max-messages <Integer: num_messages> |
在退出之前要消耗的最大消息数。如果未设置,则继续消费。 |
--offset <String: consume offset> |
要消费的偏移量(非负数),或earliest 表示从开头开始,或latest 表示从结尾开始(默认值:latest)。 |
--partition <Integer: partition> |
要消费的分区。除非指定了--offset ,否则消费从分区结尾开始。 |
--property <String: prop> |
以key=value 形式传递属性以初始化消息格式化程序。默认属性请查看Kafka官方文档,用户还可以传递自定义的属性以配置他们的反序列化程序,具体而言,用户可以传递以key.deserializer. 、value.deserializer. 和headers.deserializer. 前缀为键的属性,以配置他们的反序列化程序。 |
--skip-message-on-error |
如果处理消息时发生错误,则跳过它而不是停止。 |
--timeout-ms <Integer: timeout_ms> |
如果指定,则在指定时间间隔内没有可用于消费的消息时退出。 |
--value-deserializer <String: deserializer for values> |
用于反序列化记录值的类。 |
--version |
显示Kafka版本。 |
--whitelist <String: Java regex (String)> |
已弃用,请改用--include ;如果指定了--inclide ,则被忽略。指定包含在消费中的主题列表的正则表达式。 |
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | 🚀🚀🚀🚀🚀🚀🚀🚀 |
📌补充:--from-beginning
选项表示如果消费者没有建立偏移量,则从日志最早的消息开始消费;--offset
选项表示从指定的偏移量开始消费;--partition
选项表示消费者要从哪个分区开始消费。另外,--group
选项指定消费者的消费者组 ID,这对于协调多个消费者来消费一个主题是非常关键的。同时,--value-deserializer
和 --key-deserializer
选项确定反序列化记录键和记录值的类。其他选项如 --max-messages
和 --timeout-ms
则控制消费者消费的最大数量和等待时间。