Kafka-基础


Kafka-基础

1. 安装

1.1 Kafka运行环境前置要求

Kafka是由Scala语言编写而成的,Scala运行在Java虚拟机上,并且兼容现有的Java程序,因此部署Kafka的时候,需要先安装JDK环境。

  1. 下载JDK:

    https://www.oracle.com/java/technologies/downloads/#java17
    

    复制链接,来到Linux操作系统下载文件:

    wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
    

  2. 解压缩文件

    tar -zxvf jdk-17_linux-x64_bin.tar.gz
    

  3. 配置JDK环境变量

    这里配全局(针对所有用户)的就可以:

    vim /etc/profile
    

    编辑环境变量:

    export JAVA_HOME=/usr/local/src/JDK/jdk-17.0.11
    export PATH=$PATH:$JAVA_HOME/bin
    export CLASSPATH=.:$JAVA_HOME/lib/
    

    配置好环境变量之后,需要重新加载一下:

    source /etc/profile
    

1.2 Kafka下载与安装

Kafka官方下载地址:

https://kafka.apache.org/downloads

[!CAUTION]

这里的下载选中二进制下载。

wget https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz

解压文件:

tar -xvf kafka-3.7.0-src.tgz

[!CAUTION]

本地环境必须安装Java 8+。

Kafka环境启动的两种方式:Kafka可以使用Zookeeper或者KRaft启动,但是只能使用其中一种方式,不能同时使用KRaftApach Kafka的内置共识机制,用于取代Apach Zookeeper

  • 使用Zookeeper启动Kafka:在Kafka的解压的bin目录中,有一个zookeeper-server-start.sh的脚本文件;在在Kafka的解压的config目录中,有一个zookeeper.properties的配置文件;需要使用Zookeeper来启动Kafka的时候,要同时使用两个文件,回到bin目录中,使用命令:

    ./zookeeper-server-start.sh ../config/zookeeper.properties &
    

    上述命令中的: &符号,表示后台启动,如果我此时CTRL+C退出,也不会停止服务。

    zookeeper启动成功:

    启动成功

    注意:必须先启动zookeeper,再启动kafka。因为kafka启动是需要连接zookeeper的(对于以前的kafka老版本来说)。

    Kafka启动命令:

    ./kafka-server-start.sh ../config/server.properties &
    

    Kafka启动成功:

    Kafka启动成功

    最后查看端口的使用情况:

    进程端口查看

    Kafka关闭命令:

    ./kafka-server-stop.sh ../config/server.properties
    

    Zookeeper关闭命令:

    ./zookeeper-server-stop.sh ../config/zookeeper.properties
    

    [!CAUTION]

    这里的两个关闭命令必须按照上述顺序来执行:先关闭Kafka,再关闭Zookeeper,否则有可能导致kafka永远无法关闭。

  • 使用KRaft启动Kafka

    使用Kafka启动KRaft生成Cluster UUID:

    可见我们可以使用如下语句来生成Cluster UUID:

    ./kafka-storage.sh random-uuid
    

    现在我们来看看其他几个选项:

    比如info选项:

    info选项使用

    比如fomat选项:

    format选项使用

    [!CAUTION]

    format选项的-c选项只能够查看kraft的配置,否则就会出现这句话:

    The kafka configuration file appears to be for a legacy cluster. Formatting is only supported for clusters in KRaft mode.

    现在我们使用KRaft来启动Kafka:

    1. 先生成集群UUID:

      ./kafka-storage.sh random-uuid
      
    2. 其次,你可以使用info选项来查看生成的Cluster-ID:

      ./kafka-storage.sh info -c ../config/kraft/server.properties
      

    3. 格式化目录

       ./kafka-storage.sh format -t I4tS2U4CQxqJuZI3e2Rmvg -c ../config/kraft/server.properties 
      
    4. 后台启动Kafka:

      ./kafka-server-start.sh  ../config/kraft/server.properties &
      

      [!CAUTION]

      这里的配置目录要用Kraft的。

    关闭KRaft启动的Kafka:

    ./kafka-server-stop.sh  ../config/kraft/server.properties &
    

1.3 Zookeeper下载与安装

在上面,我们使用了Kafka中的内置的Zookeeper服务,其实我们也可以单独下一个Zookeeper服务。

下载地址:

https://zookeeper.apache.org/releases.html

官网

下载具体版本

下载命令:

wget https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz

压缩文件:

tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz

压缩好之后,先去配置文件的地方:

复制文件:

cp zoo_sample.cfg zoo.cfg

进去看看文件的内容:

不需要改动

启动文件:

./zkServer.sh start

启动成功:

查看进程号:

查看这个进程占用的端口:

查看进程使用端口

[!CAUTION]

上述42447进程占用了3个端口,其中2181、8080、33908;其中如果你本地如果启动了tomcat,那么就会导致端口冲突,因为都要用8080端口,所以这里zookeeper要改一下端口。

回到zookeeper配置文件中,加上如下配置:

admin.serverPort=你想要的端口

端口更改

之后重启即可:

 ./zkServer.sh restart

再次查看端口,你会发现解决了。


现在有了独立的Zookeeper了之后,你也可以使用独立的Zookeeper来启动Kafka。

[!CAUTION]

无论你是使用Kafka内置的Zookeeper架包启动,还是独立的Zookeeper启动,都要先启动Zookeeper,再启动Kafka

回到Kafka的bin目录下,使用命令启动:

./kafka-server-start.sh ../config/server.properties &

可见仍然能够启动成功:

1.4 Docker安装Kafka

[!NOTE]

如果docker命令相关知识不明白的读者,可自行去学习弥补,这里不过多阐述docker方面的知识。

查看kafka镜像列表:

docker search kafka

查看官方镜像:

可见,Kafka并没有官方镜像,我们再去Kafka的官网上看看:

https://kafka.apache.org/quickstart

官方使用

2. Kafka基本概念

Kafka整体架构图:

Kafka整体架构图

2.1 生产者

生产者简单来说就是发送消息的一方,生产者将消息发送到主题,主题再将消息分发到不同的分区。

2.1.1 关键配置参数

生产者的配置参数决定了其行为和性能。以下是一些关键配置参数

  • bootstrap.servers:指定 Kafka Broker 的地址列表,生产者用来连接 Kafka 集群。

  • key.serializervalue.serializer:指定用于序列化键和值的类,将 Java 对象转换为字节数组。

  • acks

    :决定生产者在接收到 Broker 的确认之前发送消息的策略。常见值包括:

    • 0: 不等待 Broker 确认。
    • 1: 等待领导者 Broker 确认。
    • all(或 -1):等待所有副本都确认。
  • retries:设置生产者在发送失败时重试的次数。

  • linger.ms:生产者等待更多消息加入批次的时间,以减少请求次数和提升吞吐量。

  • batch.size:每个批次的大小,达到这个大小时批次将被发送,无论 linger.ms 是否达到。

  • compression.type:设置消息的压缩类型(如 nonegzipsnappylz4zstd)。

2.1.2 分区策略

生产者将消息发送到特定的分区,分区策略决定了消息如何被路由到不同的分区

  • 轮询法(Round Robin):生产者以轮询的方式将消息发送到各个分区,确保负载均衡。
  • 键分区(Key-based Partitioning):如果消息有键,生产者会对键进行哈希并将消息发送到相应的分区。
  • 自定义分区器(Custom Partitioner):用户可以实现自定义的分区策略,控制消息的分配。

2.1.3 发送确认

生产者发送消息后,会根据配置的 acks 参数等待 Broker 的确认:

  • 无确认(acks=0):生产者发送消息后不等待任何确认,延迟最低,但可靠性较差。
  • 领导者确认(acks=1):生产者等待领导者分区确认消息已收到,可靠性和延迟之间平衡。
  • 所有副本确认(acks=all 或 -1):生产者等待所有副本确认消息已收到,可靠性最高,但延迟也较高。

2.1.4 幂等性生产者

Kafka 提供了幂等性生产者功能,确保即使发生重试,消息也不会被重复写入。启用幂等性生产者需要设置 enable.idempotencetrue。幂等性生产者生成一个唯一的 Producer ID(PID)来标识消息。

2.1.5 监控和度量

Kafka 生产者提供了丰富的度量信息,用于监控生产者的性能和状态。这些度量通常包括:

  • 请求率(Request Rate):每秒发送的请求数。
  • 错误率(Error Rate):每秒发送失败的请求数。
  • 批次大小(Batch Size):发送批次的平均大小。
  • 发送延迟(Send Latency):消息发送的平均延迟时间。

了解了上述的基本概念,现在我们来看看如何使用脚本来创建主题。

在你安装的Kafka的bin目录中,找到如下脚本: kafka-console-producer.sh

直接执行它,可以看见一些参数:

现在我们使用这个脚本来发送消息:

[!WARNING]

前提是要有可用的主题,才能够发送消息。

./kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092

当我执行这个命令之后,就进入了输入模式,这里你输入,然后回车,就表示你发送了一条消息。

那么消息到底是否发送成功了呢?我们新开一个会话,在那边操作消费者。

2.2 消费者

消费者简单来说就是接收消息的一方,消费者订阅一个或多个主题,并从这些主题的分区中读取消息。

2.2.1 消费者组(Consumer Group)

  • 消费者组(Consumer Group):是 Kafka 中消费者的逻辑分组。每个消费者组有一个唯一的组 ID。
  • 分区分配:同一消费者组中的消费者会共同消费主题中的消息,每个分区只能由组中的一个消费者消费,确保每条消息只被处理一次。
  • 负载均衡:如果消费者组中的消费者数量少于分区数量,那么一些消费者会消费多个分区。如果消费者数量多于分区数量,多余的消费者将处于空闲状态。

2.2.2 消息消费模式

  • 拉取模式(Pull):Kafka 消费者通过拉取(pull)方式从 Kafka 主题中获取消息。消费者定期向 Broker 发送请求,拉取新消息。
  • 自动提交(Auto Commit):消费者可以自动提交偏移量,这样在每次拉取消息后偏移量都会被自动更新。
  • 手动提交(Manual Commit):消费者可以手动提交偏移量,通常在消息处理完成后提交,确保消息不会重复消费。

2.2.3 偏移量管理(Offset Management)

  • 偏移量(Offset):每条消息在分区内都有一个唯一的偏移量,用于标识消息在分区中的位置。
  • 自动提交(enable.auto.commit):设置为 true 时,消费者会自动提交偏移量。默认的提交间隔由 auto.commit.interval.ms 控制。
  • 手动提交:同步提交偏移量和异步提交偏移量。

2.2.4 再平衡机制(Rebalance)

  • 再平衡(Rebalance):当消费者加入或离开消费者组,或主题的分区数发生变化时,Kafka 会重新分配分区给消费者。再平衡过程中,部分消费者可能会暂时无法消费消息。
  • 再平衡监听器(Rebalance Listener):通过实现 ConsumerRebalanceListener 接口,可以在再平衡之前和之后执行特定操作,例如在再平衡之前提交偏移量,防止消息重复消费。

2.2.5 幂等性消费

  • 幂等性:确保同一消息即使被多次处理,也不会导致不一致的结果。可以通过使用消费者事务性功能或手动管理偏移量实现幂等性。

了解了上述的基本概念,现在我们来看看如何使用脚本来创建主题。

在你安装的Kafka的bin目录中,找到如下脚本:kafka-console-consumer.sh

直接执行它,可以看见一些参数:

从头开始消费消息:

./kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092

从头开始消费消息

当然,如果你不加--from-beginning选项,则每次都是消费的最新消息:

./kafka-console-consumer.sh --topic test-topic --bootstrap-server localhost:9092

现在我生产者发送消息:

生产者发送消息

去消费者处查看消息:

消费者接收消息

2.3 主题(Topic)|分区(Partition)|副本(Replica)

[!note]

如果你了解其他的MQ,比如RabbitMQ,这里的主题与分区简单来说就是RabbitMQ中的交换器与队列。

Kafka中的主题是逻辑上的数据分类或数据流。生产者将消息发布到主题,消费者从主题订阅和读取消息。每个主题都可以包含任意多的消息,这些消息以顺序追加的方式存储。

但是如果我们把所有消息都塞到一个”中心”中去,那么势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储,即是吞吐量不足。所以我们现在引入分区(Partition)的概念。

每个主题可以被分成一个或多个分区。分区是消息存储的基本单位,每个分区在物理上对应一个日志文件目录,消息在分区中按顺序存储并分配一个唯一的偏移量(Offset)

分区的作用:

  1. 提供并行处理能力:通过多个分区,允许多个消费者并行读取,提高处理吞吐量。
  2. 增加数据的可靠性和容错性:通过分区副本机制,可以实现高可用性。

每个分区可以有多个副本(Replica),为实现备份功能,保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有1个或多个副本;副本确保数据的高可用性和容错性。每个分区的副本中,一个是领导者(Leader),其他的是追随者(Follower)。生产者和消费者只与领导者交互。Kafka确保每个分区都有一个领导者来处理读写请求,追随者与领导者保持同步,以便在领导者失败时接管其角色。

Replica副本分为Leader ReplicaFollower Replica

  • Leader:每个分区多个副本中的**”主”副本**,生产者发送数据以及消费者消费数据,都是来自Leader副
  • Follower:每个分区多个副本中的**”从”副本**,实时从Leader副本中同步数据,保持和Leader副本数据的同步, Leader副本发生故障时,某个Follower副本会成为新的Leader副本;

[!caution]

设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic

Leader副本和Follower副本不可能同时出现在一个Broker中。

其实还有一种副本,称为ISR副本,也叫为同步中的副本(In-Sync-Replicas),包含了Leader副本和所有与Leader副本保持同步的Follower副本。

写请求首先由Leader副本处理,之后Follower副本会从Leader上拉取写入的消息,这个过程会有一定的诞延迟,导致Follower副本中保存的消息略少于Leader副本,但是只要没有超出阈值都可以容忍,但是如果一个Follower副本出现异常,比如宕机、网络断开等原因长时间没有同步到消息,那这个时候,Leader就会把它踢出去,Kafka通过ISR集合来维护一个“可用且消息量与Leader相差不多的副本集合,它是整个副本集合的一个子集”;

在Kafka中,一个副本要成为ISR(In-Sync Replicas)副本,需要满足一定条件:

  1. Leader副本本身就是一个ISR副本;
  2. Follower副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超过指定的阈值,超过阈值则该Follower副本将会从ISR列表中剔除
    • replica.lag.time.max.ms:默认是30秒;如果该Follower在此时间间隔内一直没有追上过Leader副本的所有消息,则该 Follower副本就会被剔除ISR列表
    • replica.lag.max.messages:落后了多少条消息时,该Follower副本就会被剔除ISR列表,该配置参数现在在新版本的 Kafka已经过时了

2.3.1 命令行操作主题(Topic)

了解了上述的基本概念,现在我们来看看如何使用脚本来创建主题。

在你安装的Kafka的bin目录中,找到如下脚本: kafka-topics.sh

直接执行它,可以看见需要的一些参数:

示例:

  • 创建主题:

    ./kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092
    

    创建名为test-topic的topic

    其中--bootstrap-server是必要选项,后面跟上Kafka服务IP地址和端口。

  • 删除主题:

    ./kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
    
  • 查看当前所有主题列表:

    ./kafka-topics.sh --list --bootstrap-server localhost:9092
    
  • 描述主题的详细信息:

    ./kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
    

2.3.2 命令行操作分区(Partition)与副本(Replica)

在你安装的Kafka的bin目录中,找到如下脚本: kafka-topics.sh

执行命令:./kafka-topics.sh;之后找到与分区和副本相关的参数:

示例:

  • 创建主题并且指定分区与副本:

    ./kafka-topics.sh --create --topic "parAndRepTopic" --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
    

    创建结果:

    创建结果

    这是因为创建副本的时候,副本因子不能大于当前节点(broker)的个数,但是又不能等于0;由于我这个服务器上只有一个KafkaBroker,所以只有一个主副本,没有从副本,如果你这里写2,相当于有一个主副本,一个从副本,如果你写其他值, 主副本也只会有一个,其他都是从副本。所以这里要想创建成功,只能将副本因子写为1:

    ./kafka-topics.sh --create --topic "parAndRepTopic" --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
    

    执行结果:

2.4 偏移量(Offset)

  • 偏移量是Kafka分区中消息的唯一标识符。
  • 每个消息在其所在的分区中都有一个唯一的偏移量,这个偏移量是消息在分区中的位置。

详细讲解:

  • 唯一性:每个分区中的偏移量是唯一的。偏移量从0开始,随着每条新消息的到来递增。
  • 消息消费:消费者通过跟踪偏移量来记录自己已经处理到哪个位置。消费者读取消息时,可以指定从某个偏移量开始消费,或者从最新的消息开始消费。
  • 故障恢复:由于偏移量是消息在分区中的位置标识,消费者可以通过记录和提交偏移量来实现故障恢复。当消费者发生故障并重新启动时,可以从最后提交的偏移量位置继续消费,确保消息处理的准确性和连续性。
  • 分布式一致性:在有多个消费者的情况下,Kafka通过协调消费者组中的偏移量管理来确保每个消息只被一个消费者处理一次。

2.5 日志末端偏移量(LEO)

在Kafka中,LEO(Log End Offset)指的是一个分区日志中最新消息的偏移量。它代表了分区日志中下一个将要写入的消息的偏移量,即最后一个消息的偏移量加一。LEO是Kafka中用于确保数据一致性和高可用性的重要概念之一。

详细解释:

  1. 偏移量(Offset)

    • 在Kafka中,每个消息在分区中都有一个唯一的偏移量,这是一个从零开始递增的整数,用于标识消息在分区中的位置。
  2. LEO的定义

    • LEO是分区中当前最大的偏移量。它表示分区中最后一条消息的偏移量再加一。例如,如果最后一条消息的偏移量是100,那么LEO就是101。
  3. 副本(Replica)

    • Kafka中的每个分区都有多个副本,其中一个是主副本(Leader),其他的是跟随副本(Follower)。每个副本都会有自己的LEO
  4. 高水位标记(High Watermark, HW)

    • 高水位标记是指Kafka中所有副本都确认接收到的最高消息偏移量。消费者只能读取到高水位标记之前的消息。高水位标记确保了数据的一致性,因为它表示所有副本都同步到这个位置。
  5. LEO与高水位标记的关系

    • 每个副本的LEO可能不同,因为副本之间的数据同步是异步的。高水位标记通常由LEO最低的副本决定,因为这是所有副本都确认接收到的最大偏移量。

LEO的作用:

  1. 数据一致性

    • LEO用于跟踪分区中消息的写入进度,有助于确保分区在主副本和跟随副本之间的数据一致性。
  2. 故障恢复

    • 在主副本故障时,Kafka会选择一个LEO最高的跟随副本作为新的主副本,以最大限度减少数据丢失。
  3. 高可用性

    • 通过定期比较和同步各个副本的LEO,Kafka能确保副本之间的数据一致性和高可用性。

示例:假设一个Kafka分区有以下消息

偏移量: 0  1  2  3
消息:   A  B  C  D

在这种情况下:

  • LEO = 4,因为最后一条消息的偏移量是3,所以下一个消息的偏移量将是4。

如果该分区有两个副本,主副本(LEO=4)和跟随副本(LEO=3):

  • 高水位标记可能是3,因为这是所有副本都确认接收到的最高偏移量。

2.6 高水位值(HW)

在Kafka中,HW(High Watermark,高水位标记)是一个分区中所有副本(包括主副本和跟随副本)都确认接收到的最高偏移量。它是Kafka保证数据一致性和持久性的重要机制之一。消费者只能读取到高水位标记之前的消息,这样可以确保读取到的数据是已经被所有副本确认的,即一致的数据。

详细解释:

  1. 偏移量(Offset)

    • 在Kafka中,每个消息在分区中都有一个唯一的偏移量,这是一个从零开始递增的整数,用于标识消息在分区中的位置。
  2. 高水位标记(HW)的定义

    • 高水位标记是指分区中所有副本都确认接收到的最高消息偏移量。消费者只能读取到这个偏移量之前的消息。
  3. 副本(Replica)

    • Kafka中的每个分区都有多个副本,其中一个是主副本(Leader),其他的是跟随副本(Follower)。每个副本都会有自己的LEO。
  4. LEO(Log End Offset)

    • LEO指的是一个副本中最新消息的偏移量,即最后一条消息的偏移量加一。

高水位标记的作用:

  1. 数据一致性

    • 高水位标记确保了消费者读取的数据是一致且可靠的。只有所有副本都接收到的数据才能被消费者读取。
  2. 故障恢复

    • 在主副本故障时,新的主副本将从LEO最高的副本中选出,而高水位标记确保新的主副本不会丢失已经提交的数据。
  3. 数据可见性

    • 通过高水位标记,Kafka能够控制消费者只能读取到高水位标记之前的消息,确保消费者看到的是一致的数据视图。

示例:假设一个Kafka分区有以下消息

偏移量: 0  1  2  3  4
消息:   A  B  C  D  E

在这种情况下,假设分区有一个主副本和两个跟随副本:

  • 主副本(Leader)的LEO = 5
  • 第一个跟随副本的LEO = 4
  • 第二个跟随副本的LEO = 3

高水位标记(HW)将是所有副本中最小的LEO,即3。这意味着消费者只能读取到偏移量2之前的消息,即消息A、B、C。

当第一个跟随副本同步到偏移量5(LEO = 5)时,高水位标记将更新到4。消费者现在可以读取到消息D(偏移量3之前的所有消息)。

过程:

  1. 写入消息

    • 当一个消息写入主副本时,主副本的LEO增加。
    • 主副本将消息复制到所有跟随副本。
  2. 更新LEO和HW

    • 跟随副本接收到消息后,更新自己的LEO。
    • 高水位标记由所有副本中最小的LEO决定。
  3. 消费者读取消息

    • 消费者只能读取到高水位标记之前的消息,确保读取到的消息已被所有副本确认。

3. Kafka工具

如果我们使用命令行来查看主题、消费者,显然很麻烦,那有没有一些管理类的工具呢?其实是有的,我这里推荐Redisant这家的产品(收费)

网站:

https://www.redisant.cn/

工具下载好之后,就可以使用工具来连接Kafka。

如果你的Kafka无法通过外部工具来连接,则你需要修改Kafka的相关配置:

[!note]

这里的相关配置指的是你启动Kafka时所用的配置,例如:

如果我使用Kraft来启动Kafka,那么命令为:

./kafka-server-start.sh ../config/kraft/server.properties &

这里对应的配置文件就是:/config/kraft/server.properties这个文件。

如果你是使用zookeeper来启动Kafka,那么命令为:

./kafka-server-start.sh ../config/server.properties

这里对应的配置文件就是:/config/server.properties这个文件。

修改配置文件的如下内容为 :

修改为如下内容

将按照上述的内容修改了之后,你就可以使用工具来连接Kafka了。

[!important]

如果这里你还是无法连接,可以先关闭服务器的防火墙看看,看看是不是防火墙的问题。

如果关闭之后,能够访问,那么就是防火墙的问题,你开放对应服务的端口即可,比如Kafka的端口,zookeeper的端口。

firewall-cmd --zone=public --add-port=端口号/tcp --permanent

4. SpringBoot集成Kafka

配置依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.4.RELEASE</version>
</dependency>

[!note]

这里的版本号选择适合自己的版本号即可。

基本的配置文件

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口

[!note]

后面还会有一些配置文件内容,这里我们先给出最基本的。

4.1 Kafka发送消息

4.1.1 初次尝试首发消息

常量:

public class KafkaConstant {
    public static final String KAFKA_TEST_TOPIC = "test-topic";
    public static final String KAFKA_TEST_GROUP = "test-group";
}

生产者:

@Component
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;


    public void sendMessage(){
        kafkaTemplate.send(KafkaConstant.KAFKA_TEST_TOPIC,"hello-test");
    }
}

消费者:

@Component
public class KafkaConsumer {
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    @KafkaListener(topics = {KafkaConstant.KAFKA_TEST_TOPIC},groupId = KafkaConstant.KAFKA_TEST_GROUP)
    public void receiveMessage(String msg){
        System.out.println("Kafka收到消息:"+msg);
    }
}

[!caution]

  1. 当你使用了spring-kafka的依赖之后,你就可以像使用RedisTemplate一样使用KafkaTemplate,spring都帮我们封装好了。
  2. 发送消息使用KafkaTemplatesend这个API
  3. 在消费者侧,必须使用监听(@KafkaListener),其中必须要指明Topic和消费者组ID。
  4. 发送消息之前,要确保外界可以连接Kafka(参考第三小节),否则无法发送消息。

API讲解:

kafkaTemplate.send()

这个send方法表示发送消息,我这里选择参数最多的一个重载方法:

public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, @Nullable V data);
  1. String topic:指定消息要发送到的目标主题。
  2. Integer partition:指定消息要发送到哪个分区。如果为null,则Kafka会根据配置或默认的分区器(partitioner)来选择一个分区。
  3. Long timestamp:指定消息的时间戳。如果为null,Kafka会使用当前时间作为时间戳。
  4. K key:用于消息的分区选择以及消费者在读取消息时的排序。如果为null,则消息键是空的。
  5. @Nullable V data:消息的实际内容,如果为null,则表示发送空消息。

测试:

@RestController
@RequestMapping("test")
public class TestController {
    @Resource
    private KafkaProducer kafkaProducer;
    @GetMapping("t1")
    public void t1(){
        kafkaProducer.sendMessage();
    }
}

[!important]

先启动消费者,因为消费者默认监听的是最新的消息,而非最早的消息,所以要先启动消费者,再启动生产者,这样生产者发送的消息就能够立即被消费者监听到。

启动程序,请求:http://localhost:8888/test/t1;查看结果:

4.1.2 读取最早的消息

默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。这就是我上一小节为什么说要先启动消费者的缘故。

如果希望从第一条消息开始消费,需要将消费者的auto.offset.reset设置为earliest;

[!caution]

如果之前已经用相同的消费者组ID消费过该主题,并且Kafk已经保存了该消费者组的偏移量,那么即使你设置了auto.offset.reset=earliest,该设置也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量或使用一个新的消费者组ID

4.1.2.1 使用新的消费者组ID

我们可以来试试,之前我发送过一条消息到test-topic这个主题中,并且由test-group给消费了,现在我们新增消费者配置,让他能够从头开始消费:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置

[!tip]

消费者消费时,偏移量策略设置可选值:

  • earliest:自动将偏移量重置为最早的偏移量
  • latest:自动将偏移量重置为最新的偏移量
  • none:如果没有为消费者组找到以前的偏移量,则向消费者抛出异常
  • exception:向消费者抛出异常;**(spring-kafka不支持)**

现在重新启动程序,还是使用相同的主题和消费者组:

可见并没有从头开始消费,现在我们将消费者改一下,试试:

可见这次是从头开始消费的。

现在我们使用连接工具来看看两个消费者组对应的offset值:

可见两个消费者组都有自己的offset。

4.1.2.2 手动重置Kafka偏移量(Offset)

这里手动重置Kafka我们选用用命令行来完成,因为Kafka的bin中提供了脚本文件: kafka-consumer-groups.sh

查看可选参数:

更新Offset由三个维度决定:Topic的作用域、重置策略、执行方案。

  1. Topic的作用域

    --all-topics:为consumer group下所有topic的所有分区调整位移

    --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移

    --topic t1:0,1,2:为指定的topic分区调整位移

  2. 重置策略

    --to-earliest:把位移调整到分区当前最小(旧)位移

    --to-latest:把位移调整到分区当前最新位移

    --to-current:把位移调整到分区当前位移

    --to-offset <offset>把位移调整到指定位移处

    --shift-by N: 把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动

    --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000

    --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S

    --from-file <file>:从CSV文件中读取调整策略

  3. 确定执行方案

    什么参数都不加:只是打印出位移调整方案,不具体执行

    --execute执行真正的位移调整

    --export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

例如,这里我要重置offset到最早的地方:

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --topic "test-topic"  --group "test-group" --reset-offsets --to-earliest --execute

命令解释:

  • –topic:表示你要重置的offset再哪个主题
  • –group:表示你要重置的offset属于哪个消费组

命令执行结果:

可见有报错。

[!caution]

重置Offset的时候,需要保证某一个消费组是非激活状态,如果是一个稳定的状态,那么就不能够修改,所以,你首先要停止这个消费组的消费,然后再去重置offset,其实也很好想明白,因为如果你在重置Offset的时候,消费组又在消费,那么这个Offset一直都在变化,肯定就无法重置了。

所以我们先停掉这个消费组(停掉之前的Java程序),然后再去重置:

工具查看

可见重置到了0,现在我们再次使用Java程序来消费:

可见消息又重头开始消费了。

4.1.3 Kafka发送Message对象消息

在spring-kafka的依赖中,发送消息使用的是:kafkaTemplate.send()这个API;

send这个API有很多重载的方法,我们来看看:

这里主要讲解一下发送Message对象。

我们跟进send这个方法的源码看看:

由于这个Message是个接口,那么如何构建消息对象呢?

我们把Message接口的源码包下载下来看看:

看看他的类图:

可见他是提供了Message对象的构建的,所以我们可以使用它来构建Message对象:

public void sendMessageObject(){
    Message<String> message = MessageBuilder.withPayload("test-messageObject") //构建消息数据
            .setHeader(KafkaHeaders.TOPIC, "test-Object-Topic") //指定主题
            .build();
    kafkaTemplate.send(message);
}

这里的消息头中,可以可以指定topicgroupid等等信息。那这些,可以自行去看看KafkaHeaders这个类:


测试:

消费者:

@KafkaListener(topics = {"test-Object-Topic"},groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveMessageObject(String msg){
    System.out.println("Kafka收到Message对象消息:"+msg);
}

测试结果:

4.1.4 Kafka发送ProducerRecord对象消息

同上小节一样,我们先来看看源码:

可见ProducerRecord是一个类,可以直接构建对象了。

可见ProducerRecord类没有提供无参构造,必须要有参数,我这里直接使用最复杂的一个构造器来讲解:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
    //省略...
}

这个构造器前面几个参数我们都见过,只是这最后一个不知道,我们进源码看看:

感觉有点和集合以及Map的结构类似,我们再进Header类中瞧瞧:

可见是一个Key-Value的键值对。

Headers的类结构图:

Headers的类结构图:

所以看到这大概明白了,ProducerRecord类构建器的最后一个参数,其实就可以理解为一个Map结构,里面存放一些具有标识性的信息就可以了。

但是不论是Headers还是Header都是结构,无法直接构建对象,他是否有实现类呢,我们来看看:

可见确有一个实现类:RecordHeaders;这样我们使用实现类来构建即可

详细代码:

public void sendProducerRecordObject() {
    Headers headers = new RecordHeaders();
    headers.add("phone", "10086".getBytes(StandardCharsets.UTF_8));

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
            "test-Object-Topic", //主题
            0,//分区
            "messageKey",//消息的key
            "messageValue",//消息值
            headers//消息头
    );
    kafkaTemplate.send(producerRecord);
}

消费者代码:

@KafkaListener(topics = {"test-Object-Topic"},groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveProducerRecordObject(String msg){
    System.out.println("Kafka收到ProducerRecord对象消息:"+msg);
}

测试结果:

查看连接工具:

4.1.5 Kafka发送消息到指定分区

其实你只需调用send方法即可:

4.1.6 Kafka使用默认Topic发送消息

这里主要是使用sendDefault这个API方法:

不过再使用这个方法之前,你得先去配置文件中配置一下,否则就会出现如下问题:

public void sendMessageByDefault() {
    kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"消息key","使用默认topic发送消息");
}

可见使用默认的发送方式,需要一个Topic,去配置文件中配置:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 
    consumer: 
      auto-offset-reset: earliest 
    template:
      default-topic: default-topic #这里配置模板的默认主题

现在我们再来测测:

@KafkaListener(topics = {"default-topic"},groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveMessageDefault(String msg){
    System.out.println("Kafka收到消息:"+msg);
}

[!caution]

你配置了默认的Topic之后,消费者这里也要呼应上,否则是收不到消息的。

kafkaTemplate.sendkafkaTemplate.sendDefault的区别:主要的区别就是是否每次都需要指定主题(Topic),手动指定的方式更加灵活一点,但是稍微麻烦一点,如果应用中的所有消息都采用同一个主题,那么你可以使用默认主题方式,否则更加推荐使用手动指定的方式。

4.1.7 Kafka发送对象消息

在之前,我们发送的消息都是字符串:

private KafkaTemplate<String, String> kafkaTemplate;

现在本小节我们试试发送对象类型的消息:

User对象:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class User {
    private Long id;
    private String name;
}

模板也要更改一下:

@Resource
private KafkaTemplate<String,Object> objectKafkaTemplate;

生产者:

public void sendObjectMessage() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send(KafkaConstant.KAFKA_TEST_TOPIC,user);
}

消费者:

@KafkaListener(topics = {KafkaConstant.KAFKA_TEST_TOPIC},groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveObjectMessage(Object object){
    System.out.println("Kafka收到对象类型的消息:"+object);
}

[!note]

这里的接收参数直接写Object。

执行结果:

类型转换异常

可见,报错说我的User类型不能够转为String类型,这是为啥呢,我的Kafka模板类不是指定的<String,Object>吗?继续往下看这个报错,进一步说明了是Serializer(序列化)为String时出现的错误。说明Value值的序列化是采用的String类型的序列化器,我们来看看到底是不是。

你启动程序的时候,会输出一段Kafka的配置信息:

或者查看配置文件:

生产者的消息值序列化

[!important]

这个配置表示生产者的消息值的序列化方式。

默认采用String序列化器

[!important]

可见消息的键值默认的序列化方式采用的都是String序列化。

现在我们将消息的值使用Json的序列化方式,再来看看结果:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
    template:
      default-topic: default-topic
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

消费结果:

消息结果

[!note]

可见,我消费者出接收消息写的Object类型,其实它是一个ConsumerRecord对象,真正消息的内容是在ConsumerRecord对象中的value属性中的。

ConsumerRecord对象部分源码:

ConsumerRecord对象部分源码

这些配置大部分都已经见过了,这里就不在赘述了。

4.1.8 Kafka发送消息并指定分区和副本

之前我们发送消息的时候,无论是使用send还是sendDefault这个API,只能是选择发送消息到哪个分区中去,无法创建你想要的多个分区和副本。这是因为再使用发送消息的API的时候,**Kafka会默认帮我们自动完成Topic的创建工作,但是这种情况下创建的Topic默认只有一个分区,分区中只有一个主副本**,也就是它本身,没有其他额外的从副本。

我们知道,分区数量以及副本数量的指定只能够再创建主题的时候指定,那么如何使用代码来指定呢?这时候就要用到Kafka的配置类了:

@Configuration
public class KafkaConfig {
    /**
     * 创建一个新的Topic的时候指定分区和副本的数量
     * @return
     */
    @Bean
    public NewTopic newTopic(){
        return new NewTopic("kafka-name",5, (short) 1);
    }
}

NewTopic的部分源码:

NewTopic部分源码

重启SpringBoot服务,然后查看新创建的主题相关的信息:

可见有5个分区,1个副本。那么我现在给这个主题发送一下消息,然后再重启一下SpringBoot服务,因为他是一个配置类,看看每次重启的时候,之前发送的消息是否会丢失:

手动发送一条消息:

手动发送一条消息

重启SpringBoot服务,并且给分区扩容一下,查看分区是否扩容成功且消息是否丢失:

分区扩容

日志输出:

日志输出

工具查看

所以,虽然分区以及副本的指定是在配置类中,每次重启服务的时候都会重新执行一遍,但是如果主题存在了,那么不会重复创建主题,并且主题中的消息是不会丢失的,同时允许你扩容,但是不允许你缩小分区的数量,这个你可以自行验证。

4.1.9 Kafka发送消息时的分区策略

4.1.9.1 默认分区策略(DefaultPartitioner)发送消息

我们来追踪一下Kafka发送消息时的源码:

[!tip]

分区策略接口:

这里就是最后一步了,这里的keyBytes就是消息的key转为byte的,由于我发送消息时没有指定消息的key,所以这里直接采用的stickyPartitionCache的分区计算方法,否则,则采用如下方式获得分区数:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

之前我们创建了这么一个主题:

多个分区的主题

现在我们试试往这个主题中发送消息:

消费者:

public void sendMessageToMultiPartition() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send("kafka-name",user);
}

再次发送:

再发送一次看看:

最后一次发送看看:

可见默认的策略发送消息,始终是到达的那几个分区。


现在我给消息加上key,再来看看源码:

生产者:

public void sendMessageToMultiPartition() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send("kafka-name","user-1",user);
}

关键源码:

分区计算

可见,当我发送消息的时候,指定了消息key的时候,走的是如上图中的代码。

可见消息发送到了分区3中,现在我们再多次发送消息:

可见还是再分区3中,所以,当你指定了消息的key的时候,多次发送的消息key如果相同,那么就会送到同一分区中去

4.1.9.2 轮询分区策略(RoundRobinPartitioner)发送消息

轮询其实很好理解,这里就不作过多的解释,我们直接来看看轮询的关键代码部分:

从上一小节我们得知,Kafka发送消息默认使用的是默认的分区策略,那么是否能够改为轮询的分区策略呢?

当然是可以的,不过只能够Kafka的配置类中完成,不可在配置文件中、发送以及消费消息时指定

配置文件:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
    template:
      default-topic: default-topic
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #新增key的序列化方式

配置类:

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServers;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

      //...省略之前创建Topic的相关配置

    /**
     * 生产者相关配置
     *
     * @return
     */
    public Map<String, Object> producerConfigs() {
        HashMap<String, Object> props = new HashMap<>();
        //配置Kafka服务器地址以及端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        //配置生产者的value的序列化
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        //配置生产者的key的序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        //配置分区策略
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class.getName());
        return props;
    }

    /**
     * 生产者创建工厂
     *
     * @return
     */
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * kafkaTemplate覆盖默认配置类中的KafkaTemplate
     *
     * @return
     */
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

生产者:

public void sendMessageToMultiPartition() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send("kafka-name",user);
}

发送消息,然后调试看看,看它是否走的是轮询的分区策略:

轮询分区策略

现在我们来实实在在发送消息试试,看看是不是轮询:

发送5条消息再看看:

可见确实是轮询着来的。

4.1.9.3 自定义分区策略

参考之前的分区策略:

可见分区策略都实现了Partition接口,那么我们自定义分区策略,显然也是需要实现这个接口的。

自定义分区策略:

public class CustomerPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //这里面就是分区的具体的计算
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置类修改:

//配置分区策略
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomerPartitioner.class.getName());

4.1.10 生产者消息拦截器

从之前发送消息的源码,我们可以知道大体的流程如下:

拦截器:

[!note]

这里其实是ProducerInterceptor这个类。

相关源码:

序列化器:

分区器:


这里我们主要是讲解一下生产者发送消息的拦截器。

自定义拦截器,你要去实现ProducerInterceptor<K, V>接口:

public class CustomerProducerInterceptor implements ProducerInterceptor<String, Object> {

    /**
     * 发送消息时,会调用该方法,对消息进行拦截,可以在拦截中对消息做一些处理,比如日志记录等等...
     * 在该方法中最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
     *
     * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
     * @return 如果返回null, 则该消息不会发送
     */
    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        System.out.println("拦截消息:" + record.toString());
        return record;
    }

    /**
     * 这个方法在消息被成功发送并得到确认(acknowledgement)或发送失败时被调用。
     * 可以在这里实现一些后处理逻辑,比如记录日志、统计数据、或者处理发送失败的消息。
     *
     * onAcknowledgement运行在producer的IO线程中,所以不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。
     *
     * @param metadata  The metadata for the record that was sent (i.e. the partition and offset).
     *                  If an error occurred, metadata will contain only valid topic and maybe
     *                  partition. If partition is not given in ProducerRecord and an error occurs
     *                  before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
     *                  The metadata may be null if the client passed null record to
     *                  {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //metadata:包含消息的元数据,比如主题、分区、偏移量等。如果消息发送失败,这个参数可能为null
        if (metadata != null) {
            System.out.println("服务器收到该消息:" + metadata.toString());
        } else {
            System.out.println("消息发送失败了,exception = " + exception.getMessage());
        }
    }

    /**
     * 关闭interceptor,主要用于执行一些资源清理工作
     */
    @Override
    public void close() {

    }

    /**
     * 获取配置信息和初始化数据时调用。
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在配置类中添加上拦截器,覆盖掉默认的拦截器:

//覆盖默认拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerProducerInterceptor.class.getName());

现在我们发送一条消息试试:

可见我们的自定义拦截器配置成功了。

[!caution]

如果有多个自定义拦截器,千万不要put多个拦截器,而是应该直接存放一个数组

例如:

  • 错误:

    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 拦截器1);
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 拦截器2);
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 拦截器3);
    
  • 正确:

    // 自定义拦截器
    List<String> interceptors = new ArrayList<>();
    interceptors.add(拦截器1);
    interceptors.add(拦截器1);
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
    

4.2 Kafka接收消息

4.2.1 消息相关注解

  1. @Payload@Payload注解用于标注方法参数,表明这个参数应该绑定到消息的内容(即消息体)上,只要在参数中使用了注解,就表示用这个参数来接收消息体内容。

    @KafkaListener(topics = {"kafka-name"},groupId = KafkaConstant.KAFKA_TEST_GROUP)
    public void receiveObjectMessage(@Payload String msg){
        System.out.println("Kafka收到对象类型的消息:"+msg);
    }
    

    这里就表示msg就是消息体内容。

    收到消息

    如果你这里直接使用User对象,例如:

    @KafkaListener(topics = {"kafka-name"},groupId = KafkaConstant.KAFKA_TEST_GROUP)
    public void receiveObjectMessage(@Payload User user){
        System.out.println("Kafka收到对象类型的消息:"+user);
    }
    

    执行结果报错:

    执行报错

    相当于就是类型转换异常,因为我们发送消息时,序列化的时候使用的是JsonSerializer的序列化方式,这里你接收消息时,反序列化使用JsonDeserializer是否可以呢?

    其实你试过了之后,还是会发现不可以的。

    [!tip]

    **其实这里你一点都不必纠结,你完全可以将消息转为JSON字符串发送消息,然后你接收消息的时候,再将消息转为对应的实体类就可以了。 **

  2. @Header@Header注解用于标注方法参数,指示该参数应该绑定到特定的Kafka消息头部信息上。头部信息通常包含一些重要的元数据,使用@Header注解可以方便地在消息处理方法中获取这些数据。

     @KafkaListener(topics = {"kafka-name"}, groupId = KafkaConstant.KAFKA_TEST_GROUP)
    public void receiveObjectMessage(@Payload String msg,
                                     @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                                     @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                                     @Header(value = KafkaHeaders.OFFSET) String offset) {
        System.out.println("Kafka收到对象类型的消息:" + msg + ";topic: " + topic + " ;partitionId: " + partitionId + " ;offset: " + offset);
    }
    

    执行结果:

    执行结果

4.2.2 接收消息所有内容

如果你仔细看过前面的内容你就会知道,之前我接收对象消息,使用的直接是Object对象:

public void receiveObjectMessage(Object object)

接收的消息结果:

消息结果

可见,接收到的对象其实是:ConsumerRecord这个对象;来看看他的一些源码:

ConsumerRecord部分源码

现在我们直接接收它看看:

@KafkaListener(topics = {"kafka-name"}, groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveObjectMessage(ConsumerRecord<String,Object> consumerRecord) {
    System.out.println("Kafka收到对象类型的消息:" + consumerRecord);
}

可见收到的内容其实是一样的

这里面包含了很多东西,我们需要什么就取什么就可以了,就可以不用注解的形式了。

@KafkaListener(topics = {"kafka-name"}, groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveObjectMessage(ConsumerRecord<String,Object> consumerRecord) {
    System.out.println("Kafka收到对象类型的消息:" + consumerRecord);
    System.out.println("主题:" + consumerRecord.topic());
    System.out.println("分区:" + consumerRecord.partition());
    System.out.println("偏移量:" + consumerRecord.offset());
    System.out.println("消息体:" + consumerRecord.value());
}

执行结果

4.2.3 自动确认和手动确认

在Kafka中,消息确认(acknowledgment)是确保消息从生产者到消费者的传递和处理可靠性的关键部分。Kafka提供了自动确认(automatic acknowledgment)和手动确认(manual acknowledgment)两种机制。

  • 自动确认(Automatic Acknowledgment)

    自动确认是Kafka默认的确认机制。在这种模式下,一旦消费者读取并处理了消息,Kafka会自动将偏移量(offset)提交给Kafka集群,以表示该消息已经被成功处理。

优点:

  1. 简单易用:开发者不需要显式地管理偏移量的提交。
  2. 低延迟:因为偏移量会自动提交,处理速度较快。

缺点:

  1. 可能导致消息丢失:如果消费者在处理完消息但尚未提交偏移量之前崩溃,那么这些消息可能会被再次消费,或者如果偏移量提交了但消费者还没真正处理完消息,那么这些消息就会丢失。
  2. 灵活性较差:开发者无法控制精确的提交时机。

  • 手动确认(Manual Acknowledgment)

    手动确认机制允许消费者在处理完消息后,手动提交偏移量。这提供了更高的控制权和可靠性,因为开发者可以选择在消息完全处理完毕之后再提交偏移量。

优点:

  1. 高可靠性:可以确保消息在被成功处理后才提交偏移量,避免了消息丢失。
  2. 灵活性高:开发者可以根据业务逻辑控制偏移量的提交时机。

缺点:

  1. 复杂度增加:需要开发者自己管理偏移量的提交逻辑。
  2. 潜在的延迟:如果消息处理时间较长,偏移量提交也会相应延迟,可能影响消费速度。

spring-kafka中,我们监听消息使用的是@KafkaListener这个注解,所以,在配置手动确认的时候,我们也是在监听器的配置项下配置:

上图种的7种确认方式其实都是在ContainerProperties.AckMode的枚举中:

Kafka的AckMode(确认模式)指的是消费者在处理消息后如何确认和提交消息的偏移量。Kafka的Spring框架提供了七种不同的AckMode,可以在配置Kafka消费者时选择,以控制消息的处理和提交策略。以下是这七种模式的详细讲解及其优缺点:

确认模式 解释 优点 缺点
AckMode.RECORD 每处理一条消息后立即提交偏移量 高可靠性,每条消息都被单独确认 提交频率高,可能影响性能
AckMode.BATCH 每次拉取到一批消息后提交偏移量 减少提交次数,提高性能 批次中的消息可能重复处理或丢失
AckMode.TIME 在指定的时间间隔内提交偏移量 定期提交,适中性能和可靠性 时间间隔过长或过短影响可靠性和性能
AckMode.COUNT 当处理的消息数量达到指定阈值时提交偏移量 控制提交频率,适中性能和可靠性 消息处理速度不均匀时,提交时机不准确
AckMode.COUNT_TIME 结合消息数量和时间间隔两种机制,当任意一个条件满足时提交偏移量 提供灵活提交机制,适合多种场景 配置和调优复杂
AckMode.MANUAL 完全由用户手动控制何时提交偏移量 最大灵活性,适合精细控制场景 增加代码复杂性和管理开销
AckMode.MANUAL_IMMEDIATE 用户手动控制提交偏移量,并立即提交 高可靠性和灵活性 需要显式管理提交逻辑,增加复杂性

MANUAL与MANUAL_IMMEDIATE的区别:

相同点:

  1. 手动控制:无论是MANUAL还是MANUAL_IMMEDIATE,消费者的偏移量都是手动提交的,这意味着应用程序需要显式地调用Acknowledgment.acknowledge()方法来提交偏移量。

  2. 需要显式调用:在这两种模式下,偏移量的提交都需要应用程序显式地调用Acknowledgment.acknowledge()方法。

不同点:

  1. 提交时机

    • **MANUAL**:当应用程序调用Acknowledgment.acknowledge()方法时,偏移量并不会立即提交,而是会在监听器方法返回后提交。这意味着如果在调用acknowledge()方法和监听器方法返回之间有其他消息被消费,那么这些消息的偏移量也会被一起提交。
    • **MANUAL_IMMEDIATE**:当应用程序调用Acknowledgment.acknowledge()方法时,偏移量会立即提交,不会等待监听器方法返回。这意味着在调用acknowledge()方法后消费的消息的偏移量不会被一起提交。
  2. 使用场景

    • **MANUAL**:适用于需要批量提交偏移量的场景,例如当消费者需要处理大量消息并且可以接受在一次提交中包含多个消息的偏移量时。
    • **MANUAL_IMMEDIATE**:适用于需要立即提交偏移量的场景,例如当消费者需要精确控制每个消息的偏移量提交时。

总结来说,MANUALMANUAL_IMMEDIATE都提供了手动控制偏移量提交的能力,但MANUAL_IMMEDIATE更强调立即提交偏移量。在实际使用中,开发者需要根据业务需求来选择合适的偏移量提交策略。


现在我们来看看代码。

如果我们没有配置如下配置项

spring:
  kafka:
    listener:
      ack-mode:

直接在消费者处使用:

@KafkaListener(topics = {"kafka-name"}, groupId = KafkaConstant.KAFKA_TEST_GROUP)
public void receiveObjectMessage(ConsumerRecord<String,Object> consumerRecord, Acknowledgment ack) {
    System.out.println("Kafka收到对象类型的消息:" + consumerRecord);
    System.out.println("主题:" + consumerRecord.topic());
    System.out.println("分区:" + consumerRecord.partition());
    System.out.println("偏移量:" + consumerRecord.offset());
    System.out.println("消息体:" + consumerRecord.value());

    ack.acknowledge();//手动提交
}

[!note]

上诉新增了参数:Acknowledgment ack,并且进行了手动提交:ack.acknowledge()

现在直接发消息看看:

方法参数不可用异常

可见,你没有配置listener相关的配置项,直接使用Acknowledgment参数对象是会报错的。

现在我们配上看看:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
    template:
      default-topic: default-topic
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: manual_immediate #配置手动确认模式

执行结果:

执行结果

可见成功。

手动确认常用的一点就是,如果你的消费者出现了异常,只要你还没有提交offset,那么就会再次被消费到:

手动制造异常

工具查看

可见,分区0中的消息只被确认消费到了4,现在我把异常移除掉,重新启动消费者看看:

重新启动消费者

可见我重新启动消费者之后,之前因为异常而没有被消费的消息,里面就会再次被消费者监听到,然后重新消费。

4.2.4 指定分区与偏移量进行消费

之前我们消费消息都是直接从指定主题(Topic)读取的消息,但是我们知道,一个Topic中可以有多个分区,这样设计的好处就是为了提高消息的吞吐量。

现在我们再指定Topic中来指定分区与偏移量来消费。

消费者:

@KafkaListener(groupId = KafkaConstant.KAFKA_TEST_GROUP,
        topicPartitions = {
                @TopicPartition(
                        topic = "kafka-name",
                        partitions = {"0", "1", "2"},
                        partitionOffsets = {
                                @PartitionOffset(partition = "3", initialOffset = "2"),
                                @PartitionOffset(partition = "4", initialOffset = "3")
                        }
                )
        })
public void receiveObjectMessage(ConsumerRecord<String, Object> consumerRecord, Acknowledgment ack) {
    System.out.println("主题:" + consumerRecord.topic() + "分区:" + consumerRecord.partition() + "偏移量:" +
            consumerRecord.offset() + "消息体:" + consumerRecord.value());
    ack.acknowledge();//手动提交
}

属性解释:

  • groupId: 消费者组ID,多个消费者可以组成一个组,共享消费某些主题的消息。这里是KafkaConstant.KAFKA_TEST_GROUP
  • topicPartitions: 指定监听的主题及其分区。

注解解释:

  1. @TopicPartition

    @TopicPartition注解用于配置监听的具体主题及其分区。

    属性解析:

    • topic: 指定监听的主题名称,这里是"kafka-name"
    • partitions: 指定要监听的分区,这里是分区"0", "1", 和"2"
    • partitionOffsets: 指定分区及其初始偏移量。
  2. @PartitionOffset:

    @PartitionOffset注解用于配置特定分区的初始偏移量。

    属性解析

    • partition: 指定分区号,这里指定的分区为3,4。
    • initialOffset: 指定分区的初始偏移量,这里指定分区3的偏移量从2开始,分区4的偏移量从3开始。

整体解释:

Kafka消费者属于KafkaConstant.KAFKA_TEST_GROUP组,监听"kafka-name"主题的分区012,以及分区3从偏移量2开始,分区4从偏移量3开始。当有消息到达这些分区时,receiveObjectMessage方法会被调用处理消息并手动提交偏移量。

生产者:

public void sendMessageToMultiPartition() {
    for (int i = 0; i < 20; i++) {
        User user = User.builder().id((long) i).name("念心卓").build();
        objectKafkaTemplate.send("kafka-name",String.valueOf(i),user);
    }
}

查看工具:

可见将消息发送到了这几个分区上,现在我们启动消费者来消费看看:

总共收到17条消息:

总共收到17条消息

为什么只收到17条消息呢?因为我们指定监听的分区是0,1,2,3,4;其中0,1,2三个分区都是从偏移量0开始监听,所以收到15条消息;分区3上由于没有消息,所以这里为0;分区4上有5条消息,但是它的偏移量是从3开始监听,一直到5,所以收到2条消息;所以总和为17条消息。

4.2.5 批量消费消息

之前我们消费消息都是单条单条的消费消息,那么怎么一次性消费多条消息呢?

这里就要来配置批量消费消息:

  1. 配置文件:

    消费者处新增配置文件

    max-poll-records: 10 #批量消费配置,最大每10条消息为一个批次
    

    在监听器处新增配置文件:

    type: batch #表示开启批量消费
    

    新增配置

  2. 消费者代码处,使用List集合来接收消息:

    @KafkaListener(topics = {"batch-kafka"}, groupId = "batch-group-id")
    public void receiveBatchMessage(List<ConsumerRecord<String, Object>> consumerRecordList, Acknowledgment ack) {
        System.out.println("该批次收到的消息数量:" + consumerRecordList.size() + "; 消息的内容:" + consumerRecordList);
        ack.acknowledge();//手动提交
    }
    
  3. 生产者:

    public void sendBatchMessage() {
        for (int i = 0; i < 35; i++) {
            User user = User.builder().id((long) i).name("念心卓").build();
            objectKafkaTemplate.send("batch-kafka",String.valueOf(i),user);
        }
    }
    

执行结果:

执行结果

因为最大每批次是10条消息,所以每一批的数量都会小于等于10;

4.2.6 消费者消息拦截器

之前我们讲解了生产者消息拦截器(4.10小节),现在我们来看看消费者消息拦截器。

参考之前的生产者消息拦截器,我们推测消费者消息拦截器类为:ConsumerInterceptor

ConsumerInterceptor源码

[!tip]

从这个接口方法中的注释你大概就能明白这些方法什么时机被调用。

可见还真是。

继续参考生产者拦截器,我们来自定义消费者消息拦截器。

配置文件新增消费者的key和value的反序列化:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置value的反序列化方式
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer #设置key的反序列化方式

消费者消息拦截器类

@Component
@Slf4j
public class CustomerConsumerInterceptor implements ConsumerInterceptor<String, Object> {
    /**
     * 消息消费前的拦截处理
     *
     * @param records records to be consumed by the client or records returned by the previous interceptors in the list.
     * @return
     */
    @Override
    public ConsumerRecords<String, Object> onConsume(ConsumerRecords<String, Object> records) {
        log.info("onConsume执行了....");
        return records;
    }

    /**
     * 消息(offset)提交前进行拦截处理
     *
     * @param offsets A map of offsets by partition with associated metadata
     */
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        log.info("onCommit执行了....");
    }

    /**
     * 拦截器关闭前进行拦截处理(如果有的话)
     */
    @Override
    public void close() {
        log.info("close执行了....");
    }
    /**
     * 初始化配置(如果有的话)
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

kafka配置类

package com.example.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootStrapServers;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    
    //生产者以及topic相关的配置省略....

    /**
     * 消费者相关配置
     *
     * @return
     */
    public Map<String, Object> consumerConfigs() {
        HashMap<String, Object> props = new HashMap<>();
        //配置Kafka服务器地址以及端口
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        //配置消费者的value的反序列化
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        //配置消费者的key的反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        //覆盖默认拦截器
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomerConsumerInterceptor.class.getName());
        return props;
    }

    /**
     * 消费者者创建工厂
     *
     * @return
     */
    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    /**
     * Kafka监听容器工厂
     *
     * @param consumerFactory
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<?> myKafkaListenerContainerFactory(ConsumerFactory<String,Object> consumerFactory){
        ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        containerFactory.setConsumerFactory(consumerFactory);
        return containerFactory;
    }

}

生产者与消费者:

public void sendObjectMessage() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send(KafkaConstant.KAFKA_TEST_TOPIC,user);
}
@KafkaListener(
        topics = {KafkaConstant.KAFKA_TEST_TOPIC},
        groupId = KafkaConstant.KAFKA_TEST_GROUP,
        containerFactory = "myKafkaListenerContainerFactory"
)
public void receiveMessage(Object msg) {
    System.out.println("Kafka收到消息:" + msg);
}

[!important]

这里需要手动自定消费者监听容器工厂containerFactory

因为Kafka默认是有一个消费者监听容器工厂的,就算你在配置类中自己声明了一个监听容器工厂,如果你在@KafkaListener中不指定,有可能他是使用的默认的监听容器工厂,而没有使用你自定义的监听容器工厂Bean

执行结果:

执行结果

可见消费者消息拦截器是配置成功了的。

4.2.7 消息转发

有时候,我们处理了消息,之后,希望将这个消息投递到另外一个topic中继续处理,这时候,你可能就会使用到消息转发。

在spring-kafka中,消息转发很简单,在消费者处使用一个注解@sendTo即可。

@SendTo: 用于指定@KafkaListener方法的返回值应该发送到的目标主题。

生产者:

public void sendToMessage() {
    User user = User.builder().id(2568L).name("念心卓").build();
    objectKafkaTemplate.send("topicA",user);
}

消费者:

@KafkaListener(topics = "topicA",groupId = "topicAGroupId",containerFactory = "myKafkaListenerContainerFactory")
@SendTo(value = "topicB")
public String receiveMessageA(ConsumerRecord<String,Object> record){
    Object value = record.value();
    System.out.println("收到topicA收到消息:"+value);
    return value+"sendTo topicB";
}


@KafkaListener(topics = "topicB",groupId = "topicBGroupId",containerFactory = "myKafkaListenerContainerFactory")
public void receiveMessageB(ConsumerRecord<String,Object> record){
    Object value = record.value();
    System.out.println("收到topicB收到消息:"+value);
}

执行结果:

启动程序报错

这是因为你的消费者,使用了@SendTo注解,相当于要将消息转发,而发送消息我们知道是要用到KafkaTemplate的,但是@SendTo注解转发消息,是他自己的行为,我们无法手动来使用KafkaTemplate呢,怎么解决呢?

参考:Forwarding Listener Results using @SendTo :: Spring Kafka

官方文档

通过查看官方文档可知,需要给ConcurrentKafkaListenerContainerFactory配置一个template

由于之前我们配置过这个Bean的,在加上ReplyTemplate的配置即可:

/**
 * Kafka监听容器工厂
 *
 * @param consumerFactory
 * @return
 */
@Bean
public KafkaListenerContainerFactory<?> myKafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory);
    containerFactory.setReplyTemplate(kafkaTemplate()); //配置上kafkaTemplate即可
    return containerFactory;
}

修改过后的执行结果:

执行结果

[!note]

你可能已经注意到了我的消费者中没有使用消费者消息确认,现在我加上,你在看看。

@KafkaListener(topics = "topicA", groupId = "topicAGroupId", containerFactory = "myKafkaListenerContainerFactory")
@SendTo(value = "topicB")
public String receiveMessageA(ConsumerRecord<String, Object> record, Acknowledgment ack) {
    //消费细节省略...
}


@KafkaListener(topics = "topicB", groupId = "topicBGroupId", containerFactory = "myKafkaListenerContainerFactory")
public void receiveMessageB(ConsumerRecord<String, Object> record, Acknowledgment ack) {
    //消费细节省略...
}

启动消费:

报错

报错是报的ACK报错,奇怪了,明明我的配置文件中配置了消息的确认模式,这里为什么ack不行了呢?

配置文件

原因就是,你的消费者监听容器已经是自定义了,但是你的自定义的消费者监听容器中,并没有指定消息的确认模式,同理,如果你想要批量消费消息,也要自定义批量消费相关的配置。

可见消息的确认模式是在listener下的配置,所以,这里配置类的方式也是写在消费者监听容器的配置中:

@Bean
public KafkaListenerContainerFactory<?> myKafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, Object> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    containerFactory.setConsumerFactory(consumerFactory);
    containerFactory.setReplyTemplate(kafkaTemplate()); //配置上kafkaTemplate即可
    //设置消费者监听容器消息的ACK模式
    containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); 
    return containerFactory;
}

再次启动程序:

执行结果

同理消费成功。

4.2.8 消费者分区策略

Kafka消费消息时的分区策略是指Kafka主题topic中哪些分区应该由哪个消费者来消费

消费者分区策略分配接口:ConsumerPartitionAssignor

Kafka提供了多种分区策略,包括RangeAssignorRoundRobinAssignorStickyAssignorCooperativeStickyAssignor。下面是这些分区策略的详细介绍:

  1. RangeAssignor (默认分区策略)

    工作原理:RangeAssignorspring-kafka中的默认的分区策略,它是根据消费者组内的消费者数量和主题数的分区数量,来均匀的为每一个消费者分配分区

    例如我现在有10个分区,一个消费组中有3个消费者。

    计算每个消费者应得的分区数量:分区总数(10) / 消费者数量(3) = 3…余1;

    每个消费者按照理论上都可以得到3个分区,但是由于有一个余数1,所以,按照顺序,这余下的一个分区应该分配给第一个消费者。

    所以,分配的结果为:

    • 消费者1分配到的分区:0,1,2,3
    • 消费者2分配到的分区:4,5,6
    • 消费者3分配到的分区:7,8,9

    优点

    • 简单直观。
    • 在分区数能够均匀分配的情况下,工作负载分配较为均衡。、

    缺点

    • 如果分区数不能均匀分配,某些消费者可能会比其他消费者负载更高。
    • 分区数量和消费者数量变化时,重新分配会导致较多的分区移动

  1. RoundRobinAssignor(轮询分区策略)

    工作原理:轮流将分区分配给消费者。

    例如,如果有6个分区和3个消费者,分配将是:消费者1 -> 分区0,消费者2 -> 分区1,消费者3 -> 分区2,消费者1 -> 分区3,消费者2 -> 分区4,消费者3 -> 分区5。

    优点:能够更好地均匀分配负载,尤其是在分区数和消费者数不均等的情况下。

    缺点:重新平衡时,分区分配的变化可能较大,导致更多的分区移动。


  1. StickyAssignor(粘性分区策略)

    工作原理:尽量保持现有的分区分配不变,仅对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配。

    优点

    • 减少分区重新分配的频率,保持消息处理的稳定性。
    • 保持负载均衡。

    缺点:需要计算的复杂度较高。


  1. CooperativeStickyAssignor(合作粘性分区策略)

    工作原理:是StickyAssignor的改进版,采用协作性的重新平衡机制。在重新平衡时,分两个阶段进行:首先执行一个“预重新分配”阶段,将分区从需要卸载的消费者转移到空闲消费者,然后执行一个“真正的重新分配”阶段,确保分区均匀分配。即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配。

    优点

    • 更加稳定,减少分区频繁移动带来的开销。
    • 更好的负载均衡。
    • 降低分区重新分配对消费者的影响,提高处理稳定性。

    缺点:实现较复杂。


总结:

  • RangeAssignor适用于分区数可以均匀分配的情况,简单直观。
  • RoundRobinAssignor在负载均衡方面表现更好,但在重新平衡时分区变化较大。
  • StickyAssignor在负载均衡和分区稳定性之间取得平衡,但计算复杂度较高。
  • CooperativeStickyAssignor改进了StickyAssignor,提供了更稳定的分区重新分配机制,适合要求高稳定性的场景。

[!note]

这四种分区策略从上往下,个人认为最好的分区策略是CooperativeStickyAssignor

4.3 Kafka消息的存储

Kafka所有的消息(数据)都默认存储在/tmp/kafka-logs目录中,可以通过修改Kafka的配置项log.dirs配置。

如果你是使用zookeeper来启动的,那么log.dirs配置项存在于:

如果你是用Kraft方式启动的,那么log.dirs配置项存在于:

[!warning]

不清楚的可以去看1.2小节中,ZookeeperKraft两种启动方式。

这里我的启动方式是Kraft启动,所以我只需修改Kraft目录下的server.properties文件即可:

Kraft方式启动所用的配置文件

zookeeper方式启动所用的配置文件

这里我先不更改,反正只是做测试,但是一般生产环境下是需要更改的,一般不会放在临时目录下


Kafka所有的消息(数据)都是以日志的方式来保存的。

Kafka一般都是海量的数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>;

比如创建一个名为firstTopictopic,其中3个partition,那么在Kafka的数据目录中就有3个目录,firstTopic-0firstTopic-1

firstTopic-2

我之前一直是使用Kraft方式启动的,所以我的日志文件存储在:/tmp/kraft-combined-logs

我们随便进入一个目录中看看:

基本上每一个目录下都有这么几个文件。

文件解读:

  1. .indx类型的文件:消息索引文件
  2. .log类型的文件:消息数据文件
  3. .timeindex类型的文件:消息的时间戳索引文件
  4. .snapshot类型的文件:快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
  5. leader-epoch-checkpoint文件:记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
  6. partition.metadata文件:存储关于特定分区的元数据(metadata)信息

4.4 Kafka的_consumer_offsets主题数据查看

[!caution]

只有以Kraft方式启动的Kafka才能够看见_consumer_offsets主题,以zookeeper方式启动的Kafka是看不见的。

Kafka中的_consumer_offsets主题是一个内部主题,专门用来管理和存储消费者组的位移(offsets)。位移是消费者读取到的消息的偏移量,通过位移的管理,Kafka能够实现消费消息的准确性和可靠性。

_consumer_offsets主题的主要作用是记录每个消费者组对每个分区的消费进度。它使得消费者能够在重启、崩溃或重新平衡之后从上次停止的位置继续消费,而不是从头开始或错过消息。

4.4.1 数据结构

_consumer_offsets主题中的消息是键值对(key-value pair)形式,具体结构如下:

  • Key (键):

    • 消费者组(Consumer Group ID)
    • 主题(Topic)
    • 分区(Partition)
  • Value (值):

    • 位移(Offset):当前消费到的消息的偏移量
    • 元数据(Metadata):一些额外的信息,如消费者ID、提交时间戳等

4.4.2 分区

_consumer_offsets主题默认有50个分区。这是为了确保高并发情况下,位移提交操作能够分散到多个分区,提高性能。如果需要,可以通过修改Kafka配置文件中的offsets.topic.num.partitions参数来调整分区数量。

4.4.3 消费位移提交机制

消费者在消费消息时,会周期性地将当前消费的位移提交到_consumer_offsets主题中。提交位移有两种方式:

  • 自动提交(auto commit):由Kafka消费者客户端自动提交,频率由配置文件中的消费者处的auto-commit-interval控制。
  • 手动提交(manual commit):由应用程序代码显式调用提交。

4.4.4 数据保留策略

_consumer_offsets主题的数据有自己的保留策略,独立于其他普通主题。默认情况下,Kafka保留这些位移数据7天(168小时)。这个保留时间可以通过配置offsets.retention.minutes参数来修改。

4.5 消息的offset

之前我们简单讲解了一下偏移量(offset)的概念。这里我们再来详细看一下offset

4.5.1 生产者offset

生产者发送消息到KafkaBroker中的某一个Topic下的Partition中,Kafka内部会为每一条消息分配一个唯一的Offset,该Offset就是消息在Partition中的位置。

在单个Partition(分区)中,消息是按照Offset的顺序存储的,这就保证了消息的顺序性。

在多个Partition(分区)中,消息会按照生产者的分区策略分配到不同的Partition中去,多个消费者从不同的Partition中取消息消费,这无法保证消息消费的顺序性。

[!note]

生产者处的Offset不需要用户过多关心,Broker会为我们搞定。

4.5.2 消费者offset

消费者Offset是消费者需要知道自己已经读取到哪个位置了,接下来需要从哪个位置开始继续读取消息;

每个消费者组(Consumer Group)中的消费者都会独立地维护自己的Offset,当消费者从某个Partition读取消息时,它会记录当前读取到的Offset,这样,即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息;

[!caution]

消费者Offset需要消费消息并提交后才记录Offset。

我现在创建一个新的主题来测试:

配置类修改:

@Bean
public NewTopic newTopic() {
    return new NewTopic("kafka-consumer-offset", 5, (short) 1);
}

配置文件:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-commit-interval:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

生产者发送25条消息:

public void sendBatchMessageOffset() {
    for (int i = 0; i < 35; i++) {
        User user = User.builder().id((long) i).name("念心卓").build();
        objectKafkaTemplate.send("kafka-consumer-offset",JSONUtil.toJsonStr(user));
    }
}

创建3个消费者:

@KafkaListener(topics = "kafka-consumer-offset",groupId = "kafka-consumer-offset-groupId",concurrency = "3")
public void receiveMessageOffset(ConsumerRecord<String, Object> record) {
    Object value = record.value();
    System.out.println("收到topicB收到消息:" + value);
}

[!note]

@KafkaListener注解中,有一个属性:concurrency,它可以指定消费者的数量

这里我们先不启动消费者,只让生产者发送消息:

各个Partition接收消息情况

现在我们启动消费者来消费:

可见消息也是成功消费了。

现在我们调整一下消费者消费消息的确认模式:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9092 #这里写kafka服务器的IP以及端口
    consumer: #配置消费者
      auto-offset-reset: earliest #加上这个消费者配置
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-commit-interval:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      ack-mode: manual_immediate #手动确认消息

现在我们消费者和生产者一起启动,在看看消费情况:

消费情况

可见,因为我的消费者消费了消息,并没有手动ACK,所以消息的Offset并没有更新,当你再次启动消费者来消费,又会消费到之前没有ACK的消息。

现在开启消费者ACK:

@KafkaListener(topics = "kafka-consumer-offset",groupId = "kafka-consumer-offset-groupId",concurrency = "3")
public void receiveMessageOffset(ConsumerRecord<String, Object> record,Acknowledgment ack) {
    Object value = record.value();
    System.out.println("收到topicB收到消息:" + value);
    ack.acknowledge(); //手动ACK
}

重启消费者,看看是否能够消费到之前的消息:

消费情况

消费情况

可见确实消费到了之前没有ACK的消息,并且这次我手动确认了,所以消息的Offset也更新了。

4.5.3 总结

  1. 生产者的Offset不用过多关心,Kafka服务会帮我去维护好
  2. 消费者的Offset,如果开启了手动确认模式,由于消费者因为未知原因宕机了,那么由于消息并没有被确认,Offset也并不会更新;当消费者服务恢复正常之后,会从宕机之前的offset继续消费消息,不会导致消息丢失。当成功消费之后,并且手动ACK消息之后,Offset才会更新。
  3. 消费者从什么位置开始消费,就看消费者的Offset是多少,消费者启动后它的Offset是多少。

5. Kafka集群搭建

Kafka中,由于启动方式有两种:Zookeeper、Kraft;所以,这里他的集群搭建也是这两种。

严格说来,Kafka的集群搭建应该使用不同的服务器,这里资源有限,我就在同一个服务器上搭建3份Kafka服务,来模拟集群的搭建。

5.1 Zookeeper方式搭建集群

如果使用Zookeeper方式启动,那么你集群的搭建正常来说有多份Kafka,就要保证多份Zookeeper;这里同样资源有限,我就用一个Zookeeper服务来对应3个Kafka服务。

  1. 首先复制3个Kafka服务

  2. 修改里面使用Zookeeper方式启动的配置文件

    • 修改broker.id,每一个Kafka服务要对应唯一一个broker.id,并且这个id只能是0~255的整数
    • 修改两个监听端口,一个listeners表示Kafka服务内部使用的监听IP与端口;一个advertised.listeners表示外网连接的IP与端口。
    • 修改数据存储位置,因为是集群,要写不同的数据存储位置,否则会导致覆盖。
    • 由于只用了一个Zookeeper服务,所以Zookeeper服务地址不用改变,如果有多个Zookeeper服务,那么多个服务使用逗号分割。

    配置修改

    zookeeper服务配置

    [!note]

    按照配置文件的修改方式,同理修改另外两个Kafka服务配置。

  3. 启动三个Kafka服务看看

    [!caution]

    因为是Zookeeper方式启动,所以千万别忘了先启动Zookeeper服务再去启动Kafka。

    Kafka服务1

    Kafka服务2

    Kafka服务3

    可见,这里有一个集群ID,3者都是一样的,表示集群搭建成功了。

    工具查看


现在我们使用SpringBoot来创建一个Topic并且发送消息试试:

@Configuration
public class KafkaConfig {
    //省略其他配置...

    /**
     * 创建一个新的Topic的时候指定分区和副本的数量
     *
     * @return
     */
    @Bean
    public NewTopic newTopic() {
        return new NewTopic("clusterTopic", 3, (short) 3);
    }
}

配置文件:

spring:
  kafka:
    bootstrap-servers: 192.168.254.141:9091,192.168.254.141:9092,192.168.254.141:9093 #如果是集群,这里直接写多个服务器的IP和端口
# 其他配置...

工具查看:

这里的[3,2,1]等就是broker.id,这里的Leader就是表示主副本再那个broker.id上,也就是再那个服务器上。

这样集群就搭建好了。

5.2 Kraft方式搭建集群

Kraft搭建集群的方式和Zookeeper方式大同小异,这里我们资源有限,还是在同一个机器上复制出2份Kafka来搭建集群。

  1. 复制2份Kafka出来

    之前在Zookeeper方式搭建集群小节中已经操作了,这里就省略了。

  2. 修改里面使用Kraft方式启动的配置文件

    修改细节

    同理,数据文件的存放地址也要修改

    第二个Kafka服务:

    第二个Kafka服务

    第三个Kafka服务:

    第三个Kafka服务

  3. 启动允许Kraft集群

    • 生成Cluster UUI(集群UUI)

      ./kafka-server-start.sh ../config/kraft/server.properties &
      
    • 格式化日志目录

      ./kafka-storage.sh format -t zs_R6XShRAi-6Yhcp_UFXg -c ../config/kraft/server.properties
      

    • 启动Kafka

      ./kafka-server-start.sh ../config/kraft/server.properties &
      

[!note]

由于你启动是一个一个启动的,所以,当你启动的时候,由于还有其他节点没有启动,就会有如下报错:

这是因为你配置文件中配置了如下内容:

当出现其他节点服务不可用或者还没来得及启动的时候,就会报warning,但是当你其他服务正常起来了之后,就不会在报错了。


文章作者: 念心卓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 念心卓 !
  目录