Kafka3 入门指南
Kafka简介
定义
Kafka 是一个分布式流式处理平台。
流平台具有三个关键功能:
- 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
- 容错的持久方式存储记录消息流: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
- 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
应用场景
Kafka 主要有两大应用场景:
- 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
- 数据处理: 构建实时的流数据处理程序来转换或处理数据流。
Kafka3.x版本的区别
Kafka 3.x 版本与之前的版本最大的区别是,Kafka 3.0 版本开始支持了 KIP-500(Kafka Improvement Proposal #500),这是一项重大的协议变更,允许 Kafka3.x 版本在不依赖 ZooKeeper 的情况下独立运行。
在 Kafka 3.0 之前,Kafka 使用 ZooKeeper 来进行元数据存储和管理。但是,随着 Kafka 的使用场景越来越广泛,单独使用 ZooKeeper 创建和维护集群变得越来越复杂和不可靠。因此,Kafka 3.0 引入了一个新的元数据管理协议,称为 KRaft。KRaft 是一个内置的元数据存储和管理系统,可以独立于 ZooKeeper 运行。
除此之外,Kafka 3.x 还引入了新的功能和改进,例如 Broker 的热迁移、优化的控制器选举、提高网络吞吐量的增量改进、支持多种身份验证方案、更快的分区重平衡等等。同时,Kafka 还针对安全性和可靠性进行了一些改进,例如使用更安全的 TLS 版本、增加了对集群升级的支持、提供了更细粒度的监控和指标等等。
编写语言
Kafka的核心服务端部分是用Scala语言编写的,主要包括Broker、Controller等组件,而Producer、Consumer等客户端API则提供了多种编程语言的实现,比如Java、Python、Go、Ruby等,方便用户进行开发和集成。由于Kafka提供了多种编程语言的支持,因此可以很方便地与不同的语言和框架进行集成和使用。
扩展
- 十分钟了解Kafka :What is Apache Kafka? - YouTube
- Apache ZooKeeper
Kafka基础架构
名词
在解释Kafka的基础架构,之前我们得先对一些了解kafka架构的名词有一个基本的了解,下面的一张名词释义表
名词 | 解释 |
---|---|
Producer(生产者) | Kafka 中发送消息的客户端,可以将消息发送到特定的 Kafka 主题并指定消息所属的分区。 |
Broker(代理) | Kafka 集群中的消息传输和存储组件。每个 Broker 负责处理消息的读取、写入和复制,并彼此保持同步,以提供卓越的可靠性,性能和可伸缩性。 |
Cluster(集群) | Kafka Broker 的集合 |
消费者(Consumer) | Kafka 中接收消息的客户端,可以从特定的主题和分区中接收消息,并将这些消息传递给消费者的应用程序进行处理。 |
Consumer Group(消费者组) | 消费者组由一个或多个消费者组成,并用于协调消费者之间的消费进度。所有消费者组成员订阅的主题和分区中只有一个消费者可以读取消息,而其他消费者则处于空闲状态。当活跃的消费者读取完消息后,Kafka 会自动将下一条消息转移到其他消费者。 |
Application(应用程序 | 客户端使用 Kafka APIs 与 Kafka 集群进行交互的应用程序,将处理生产者发送的消息,并从消费者接收消息以进行处理。 |
Topic(主题) | Kafka 中可分配给消息的特定主题或类别。生产者可以将消息写入特定主题的分区,而消费者可以从订阅主题和分区中读取消息。 |
Partition(分区) | 在 Kafka 中,每个主题被分为一个或多个分区,并分配给 Kafka 集群中的各个 Broker。每个分区可以作为一个独立的线性日志进行处理,并被多个生产者和消费者使用。 |
Partition Rebalance(分区再平衡) | 在消费者组成员变化(即增加/减少组成员)时,Kafka 可能会执行一次分区重新平衡,以重新分配主题分区给相应的消费者组成员。分区重新平衡可以确保组中的所有消费者均衡地读取主题分区的所有消息,以提高系统的效率和可伸缩性。 |
Replica(副本) | 在 Kafka 中,每个分区有一个作为 Leader 的副本和多个作为 Follower 的副本,以保证消息传输的可靠性。 Leader 副本负责所有的读取和写入请求,并在其他副本上复制写入操作的日志,以确保副本集中的数据完全一致。 |
ZooKeeper | Kafka 使用 ZooKeeper 来实现 Kafka Broker 对等节点和集群元数据的协调和管理。 ZooKeeper可用于识别集群中存活的 Broker,并跟踪它们之间的网络分区和故障转移。 |
Kafka Raft Metadata Mode | 在 Kafka Raft Metadata Mode 中,Kafka Controller 负责存储和管理集群的元数据,而不是 ZooKeeper。这个 Controller 使用 Raft 一致性算法来确保集群状态的一致性,而无需依赖于外部服务。同时,Kafka Raft Metadata Mode 还简化了集群的管理和部署,并提高了集群的可伸缩性和安全性。 |
Kafka Controller | Kafka 集群中的一种 Broker,负责管理 Broker 节点的状态和监控,并执行自动的 Leader 选举和分区重新平衡。 |
Coordinator(消费者组协调器) | 用于跟踪和管理消费者组的状态和进度的特殊 Kafka Broker。协调器负责管理组成员的加入和离开,并通知消费者哪些消息已经被读取和哪些消息仍然可被读取。消费者通过与协调器保持联系来实现对组中其他成员的发现和保持同步。 |
Base
graph LR P(Producer) -- 生产消息 --> B(Kafka Broker) B -- 存储消息并服务消费者 --> C(Kafka Consumer) C -- 消费消息 --> E(应用程序) subgraph Kafka Cluster B --> F(Kafka Controller) F -- 管理Broker工作状态 --> B B --> G(Kafka Consumer Group Coordinator) G -- 管理Consumer组状态 --> C end
Partion
graph LR P(Producer) -- 发布消息 --> T(Kafka Topic) T -- 分配到不同的 partition 中 --> KP1(Kafka Partition 1) T -- 分配到不同的 partition 中 --> KP2(Kafka Partition 2) KP1 -- 存储消息并服务消费者 --> C1(Kafka Consumer) KP2 -- 存储消息并服务消费者 --> C2(Kafka Consumer) subgraph Kafka Cluster KP1 -.-> B1(Kafka Broker 1) KP2 -.-> B2(Kafka Broker 2) B1 -- 移动 partition --> B2 B2 -- 移动 partition --> B1 end C1 -- 消费消息 --> E1(应用程序) C2 -- 消费消息 --> E2(应用程序) subgraph Kafka Topic T --> TP1(Kafka Topic Partition 1) T --> TP2(Kafka Topic Partition 2) end subgraph Kafka Partition KP1 --> PData1(Kafka Partition Data) KP2 --> PData2(Kafka Partition Data) end
Kafka的基础架构主要由以下几个组件构成:
- Broker:Kafka集群中的每个服务器节点被称为Broker。每个Broker都有一个唯一的ID,可以通过这个ID来唯一标识一个Broker。每个Broker都可以用来存储和处理消息。
- Topic:消息发布者将消息发布到一个指定的Topic中,同时可以在该Topic上订阅消息。每个Topic都有一个唯一的名称,并且可以有多个订阅者订阅该Topic。Kafka将消息存储在Topic中,每个Topic可以有多个Partition,每个Partition中存储着一部分消息。
- Partition:为了实现更高的并发和吞吐量,Kafka将每个Topic分为多个Partition。每个Partition都是有序的消息队列,并且可以在不同的Broker中存储。一条消息只会被存储在同一个Partition中的一台Broker上。
- Producer:消息发布者将消息发布到指定的Topic中,同时可以指定该消息归属的Partition。Kafka的Producer不会等待Broker的响应,而是将消息缓存在本地,异步发送到Broker。
- Consumer:消息订阅者可以订阅一个或多个Topic中的消息,并且可以将自己组合成一个Consumer Group,以便于多个消费者可以共同组成一个消费者组,来消费一个或多个Topic中的消息。每个消息只能被同一个Consumer Group中的一个消费者消费一次。
Kafka 集群中的消息以 Topic 为单位进行管理和存储,Topic 可以被分成多个 Partition 来提高消息的处理能力和可用性。每个 Partition 的多个副本被分配在不同的 Broker 上,以保证数据的高可靠性和可用性。当一个 Broker 节点发生故障时,副本会自动选举新的 Partition Leader,并继续服务。Broker 节点之间也相互复制数据,以保证数据的冗余和数据的高可用性。Kafka 集群还提供了 Consumer Group 的概念,多个 Consumer 可以组成一个 Consumer Group 并协作消费消息,以满足不同的场景需求。
Zookeeper
ZooKeeper:使用 ZooKeeper 来管理和控制集群状态,包括存储 Broker 的元数据、负责 Leader 选举、协调 Producer 和 Consumer 等
graph LR P(Producer) -->|生产消息| B(Kafka Broker) B -->|存储消息,服务消费者| C(Kafka Consumer) C -->|消费消息| E(应用程序) subgraph Kafka 集群 B -->|多副本复制| R1((副本1)) B -->|多副本复制| R2((副本2)) B -->|多副本复制| R3((副本3)) B --> F(Kafka Controller) F -- 管理 Broker 工作状态 --> B B --> G(Kafka Consumer Group Coordinator) G -- 管理 Consumer 组状态 --> C end Z(ZooKeeper) -->|管理Broker元数据| B
KRaft mode
Kafka 3.x 引入了 Kafka Raft Metadata Mode,使得 Kafka 集群管理和存储集群的元数据无需依赖于 ZooKeeper。下面是基于 Kafka Raft Metadata Mode 的 Kafka 3.x 的基础架构:
graph LR P(Producer) -->|生产消息| B(Kafka Broker) B -->|存储消息,服务消费者| C(Kafka Consumer) C -->|消费消息| E(应用程序) subgraph Kafka 集群 B -->|多副本复制| R1((副本1)) B -->|多副本复制| R2((副本2)) B -->|多副本复制| R3((副本3)) B --> F(Kafka Controller) F -- 管理 Broker 工作状态 --> B B --> G(Kafka Consumer Group Coordinator) G -- 管理 Consumer 组状态 --> C end B1(Kafka Broker1) --> D(Kafka Raft Metadata Mode) D -->|存储集群元数据| F D -->|Leader选举,状态复制| D1(Kafka Broker2) D -->|Leader选举,状态复制| D2(Kafka Broker3)
消息队列模型
Kafka 支持两种不同的消息队列模型,即点对点模型和发布-订阅模型,可以根据不同的业务场景选择最适合的模型。
点对点模型(队列模型)
阐述
“点对点模型”(Point-to-Point Model),也称为 “队列模型”(Queue Model)。
在点对点模型中,生产者(Producer)将消息发送到一个特定的队列,而消费者(Consumer)则从该队列中接收消息。与发布-订阅模型不同的是,在点对点模型中,生产者和消费者之间的关系是一对一的,即一个生产者生产的消息只能被一个消费者消费。 比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
应用场景
点对点模型通常适用于一些特定场景,比如需要严格控制消息处理顺序、需要完全消费所有消息或者需要使用非多播传输等场景。
假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完整的消息内容。
这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。
mermaid图解
graph LR; A(Producer) -- 写入消息到 --> B(Kafka Broker) B -- 存储消息并记录 offset --> C(Kafka Partition) C -- 读取消息 --> D(Comsumer) D -- 记录消费位置 offset --> C
队列模型存在的问题:
假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完整的消息内容。
这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。
发布-订阅模型
阐述
在发布-订阅模型中,消息生产者(Producer)将消息发布到一个或多个主题(Topic),每个主题可以分区成多个分区(Partition)。消息订阅者(Consumer)可以订阅一个或多个主题,并在消费者组(Consumer Group)中加入一个或多个消费者来消费主题中的消息。
具体来说,当消息生产者(Producer)将消息发布到一个主题中时,所有订阅该主题的消费者都可以接收到该消息,但不同的消费者可能会接收到不同的消息。这是因为 Kafka 会将每个分区中的消息均匀分布给所有订阅该主题的消费者,从而实现负载均衡和并行处理。同时,同一个消费者组中只有一个消费者能够消费一个特定的分区,这也保证了在同一个消费者组中不同消费者之间的不重复消费。
应用场景
发布-订阅模型通常适用于一些通知、广播等场景,例如电商网站中的订单、物流消息发布。
总的来说,发布-订阅模型是 Kafka 最常用的消息队列模型,具有高吞吐量、低延迟、可扩展等特点,适用于大规模数据流处理和传输的场景。
mermaid图解
基础
graph LR A(Producer) -- 发布消息到 --> B(Kafka Topic) B -- 存储消息 --> C(Kafka Broker) D(Consumer) -- 订阅主题 --> B C -- 读取消息并发送给订阅该主题的所有消费者 --> D
完整
graph LR P(Producer) -- 发布消息 --> T(Kafka Topic) T -- 分配到不同的partition中 --> KP1(Kafka Partition 1) T -- 分配到不同的partition中 --> KP2(Kafka Partition 2) KP1 -- 存储消息并服务消费者 --> C1(Kafka Consumer) KP2 -- 存储消息并服务消费者 --> C2(Kafka Consumer) C1 -- 消费消息 --> E1(应用程序) C2 -- 消费消息 --> E2(应用程序) subgraph Kafka Cluster KP1 -.-> B(Kafka Broker) KP2 -.-> B B -.-> F(Kafka Controller) F -.-> B B --> G(Kafka Consumer Group Coordinator) G -.-> C1 G -.-> C2 end
使用KRaft模式搭建集群
前期准备与解释
硬件
要搭建集群,至少需要两台可以运行Kafka的,拥有公网ip或者处理同一局域网下(可以进行网络通信)的Linux主机,当然集群的当然越多机器越好,可惜本文作者(我)太穷了
下文的所有操作都在两台Linux主机上执行
Why KRaft?
Kafka3.x版本的KRaft 模式可以在不使用Zookeeper的情况下搭建集群,脱离了Zookeeper的外部依赖,我相信未来肯定主流
Why Linux?
其实Kafka的文件夹下有一个window的脚本库,也可以的用window去跑Kafka的,但是截至发文的今天,我使用的Kafka版本是
kafka_2.12-3.4.0,这个版本下使用window和KRaft模式搭建会出现一个bug(花了很长时间也没解决,如果有人发现了解决方案,请在本文下留言),详情请看KAFKA-14273
所以下面的操作都将在Linux系统上进行
Why Apache Kafka?
Kafka有多个不同的发行版本,最常见的包括:
Apache Kafka:Apache Kafka是Kafka的开源版本,由Apache基金会托管和支持。它是最原始和最基本的Kafka版本,非常稳定和可靠。但是,使用它需要您自己进行配置和管理。
Confluent Kafka:Confluent是一家领先的Kafka技术公司,该公司创建了一系列Kafka工具和服务,包括Confluent Kafka。Confluent Kafka是建立在Apache Kafka之上的增强版,提供了添加功能,如Kafka Connect和Kafka Streams等,使其更易于管理和扩展。
Cloudera Kafka:Cloudera是另一家领先的Kafka技术公司,它的Kafka发行版是Cloudera Kafka。它是一个基于Apache Kafka的增强版,提供了一些管理工具和集成功能,如Cloudera Manager和Navigator。
不同发行版本之间的区别在于每个版本提供的额外功能、集成工具和管理选项。我们下面的内容都基于Apache Kafka
下载与解压
ps:在下面的教程中我将用$
表示prompt,复制代码到控制台时请自行删除
下载
下载地址:Apache Kafka,我下载的是scala2.12的版本:https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
解压
将下载好的tgz文件复制到主机的新路径下,这边推荐是home/user/
路径下,可以减少一些关于root权限的问题,执行以下命令进行解压
1 | $ tar -zxvf kafka_2.13-3.4.0.tgz |
注意:这里是user
是用户目录,如果你想设置和我同一个位置,请自行将user
更改为自己的用户名
Config配置
上述步骤完成后,为了搭建集群,我们得进行一些config配置:
- 在
/kafka_2.12-3.4.0/config/kafka/
打开server.properties
文件:这里不要打开错误的文件,在/kafka_2.12-3.4.0/config/
也有一个server.properties
配置文件,但是那个是用zookeeper设置的,最明显的区别的是kafka
文件夹下只有三个properties
配置文件 - 分别设置不同
node.id
标识主机,例如:主机1:node.id=1
,主机2:node.id=2
- 共同设置
controller.quorum.voters=1@主机1地址:9093,2@主机2地址:9093
,这是Controller的选举队列 - 设置
log.dirs=/home/user/kafka-ouput/kraft-combined-logs
,我这里设置数据存储路径/home/user/kafka-ouput/kraft-combined-logs
。当然这里你也这里你可以设置任意文件路径来存放数据,但是千万不要使用相对路径(因为使用相对路径,文件在下面生成时,会相对于你命令执行的路径生成存储目录和相关文件,这一点特别不好)
一些没有用到但有用的参数:
advertised.listeners=PLAINTEXT://localhost:9092
这是broker对客户端发布的listener信息,包括linstener名称PLAINTEXT
,主机地址localhost
,端口号9092
inter.broker.listener.name=PLAINTEXT
,可以设置通信(普通)broker的名字controller.listener.names=CONTROLLER
可以设置Controller的名字
运行
首先我们要生成一个数据存储的文件夹,步骤如下
进入kafka文件目录
../kafka_2.13-3.4.0/
执行以下代码运行脚本,生成随机uuid,注意这个uuid是用来标识集群的集群id
cluster-id
,我们两台主机要搭建一个集群,使用的集群id应该是一致的,所以下面的命令只要在其中一台主机运行一次就好了1
$ ./bin/kafka-storage.sh random-uuid
在两台主机上分别执行以下代码运行脚本,用上面生成的(同一个)uuid生成文件存储文件夹
1
$ ./bin/kafka-storage.sh format --config ./config/kraft/server.properties --cluster-id 生成uuid
在两台主机上分别执行以下代码,运行Kafka
1
$ ./bin/kafka-server-start.sh ./config/kraft/server.properties
如果两台主机都没有
ERROR
,你基本就运行成功了,之后你可以Ctrl-C
来结束运行加上
-daemon
参数后kafka将在后台运行1
$ ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
此时你要结束kafka运行就要另一个kafka-stop-start.sh脚本了
1
$ ./bin/kafka-server-stop.sh
测试
在完成上述的配置和运行步骤后,两台主机的kafka服务都在后台运行起来后,为了测试一下集群是否真的成功搭建好了,我们可以创建两个Topic测试一下:
在上面的操作中,我们发现了每次都使用kafka的sh脚本都得加上路径十分麻烦,所以我们可以先把kafka的
bin
文件路径加入环境变量:a. 修改
/etc/enviroment
文件,追加以下内容并保存1
2export KAFKA_HOME=/path/to/kafka/directory
export PATH=$PATH:$KAFKA_HOME/binb. 执行以下指令,更新环境变量
1
$ source /etc/environment
“/path/to/kafka/directory”是你Kafka的文件路径
在两个主机分别创建一个Topic
a. 主机1输入以下指令,如果创建成功会出现
Created topic gallifreyOne
1
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic gallifreyOne --create
b. 主机2输入以下指令,如果创建成功会出现
Created topic gallifreyTwo
1
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic gallifreyTwo --create
在任意主机输入一下指令,查看所有的Topic,如果出现两个主机分别创建的Topic
gallifreyOne
gallifreyTwo
说明集群搭建成功1
$ kafka-topics.sh --bootstrap-server localhost:9092 --list
🚩下一站
至此,你已经了解Kafka的基础概念,基础架构,队列模型和集群搭建,下一步的学习将围绕Kafka的使用展开,你可以在各种视频网站和Apache Kafka官网上等途径找到相关教程,如果你对我写的教程还感兴趣的话,想进一步跟着学习Kafka的话,请看我另一篇Blog: