Kafka
概述
定义
Kafka是一个分布式的发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
消息队列
使用消息队列的好处
- 解耦
- 可恢复性:一部分组件失效后,不会影响整个系统,消息队列降低了进程之间的耦合性,加入消息队列的消息仍然可以在系统恢复后处理
- 缓冲:解决生产消息和消费消息的处理速度不一致
- 灵活性、峰值处理能力
- 异步通信
消息队列的两种模式
点对点:一对一,消费者拉取数据,消息收到后消息删除
发布/订阅:一对多,消费者消费数据之后不会清除消息,消息可以给多个消费者使用,Kafka是主动拉取消息
优点:可以控制获取消息的速度
缺点:不断去问订阅是否有新消息
Kafka基础架构
Producer :消息生产者,就是向 kafka broker 发消息的客户端;
Consumer :消息消费者,向 kafka broker 取消息的客户端;
消息队列:主题Topic(用于分类)、分区Partition(用于负载均衡)、Leader主(用于备份)
消费者:消费者组(一个主题可以被一组组多人消费,一个分区只能被一个组中一人消费)
Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
安装部署
集群规划
jar包下载
http://kafka.apache.org/downloads.html
集群部署
命令行操作
1)查看当前服务器中的所有 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –list
2)创建 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –create –replication-factor 3 –partitions 1 – topic first
选项说明:
–topic 定义 topic 名
–replication-factor 定义副本数
–partitions 定义分区数
3)删除 topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –delete –topic first
需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
4)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh –brokerlist hadoop102:9092 –topic first >hello world
5)消费消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ –zookeeper hadoop102:2181 –topic first
Kafka架构深入
工作流程
Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。
每个 partition(分区) 对应于一个 log 文 件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset(偏移量,就是第几条数据的编号)。消费者组中的每个消费者,都会实时记录自己 消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。
根据偏移量先找到该偏移量在哪个索引中,再在索引中找到偏移量,再获取偏移量对应真实数据的位置,再找到Log文件中数据。
存储机制
由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位 效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment(分片)。每个 segment 对应两个文件——“.index”文件和“.log”文件。
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
“.index”文件存储大量的索引信息(左边为偏移量(第几条数据),右边为数据的具体位置),“.log”文件存储大量的数据,索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。
这些文件位于一个文件夹下,该文件夹的命名 规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first0,first-1,first-2。
Kafka生产者
分区策略
分区原因
- 方便在集群中扩展:提高负载能力
- 可提高并发:可以以partition为单位读写
分区的原则
我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象
(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;
(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后 面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin(轮询) 算法。
数据可靠性保证(数据丢失、重复)
为保证 producer 能可靠的发送数据到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
问:何时发送ack?
- 半数以上完成同步,就发 送 ack
- 优点:延迟低
- 缺点:选举新leader时,需要2n+1台副本,确保n台故障仍可以超过半数
- 全部完成同步,就发送ack
- 优点:选举新leader时,需要n+1台副本,确保n台挂掉还有1个存活,也算全部完成
- 缺点:延迟高
ISR(优秀的follower集合、副本同步队列)
问题:采用全部完成同步的方案,如果所有follower开始同步数据,有一台因为故障,很长时间不能完成同步,leader需要一直等下去,为解决这个问题,就有了ISR。
- 原理:Leader维护了一个动态的 in-sync replica set (ISR),意思是和leader保持紧密同步的leader。当ISR中的follower完成数据同步之后,leader就会发送ack。如果长时间没有同步完成,该follower就会被踢出ISR。这个时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
- 以前版本是按两个条件进入ISR:同步时间差,同步数据量差。但之后版本移除了同步数据量差,因为在同步时同步数据量没有达到被踢出,同步时间又快被加入,反复进出会影响性能。
ack 应答机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失, 所以没必要等 ISR 中的 follower 全部接收成功。
所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡, 选择以下的配置。
acks参数配置
0:生产者不等待Kafka集群的ack,这个操作延迟最小,Kafka刚接收还没有写入磁盘就已经返回,但如果集群故障,就有可能丢失数据
1:生产者等待ack,分区中的leader写入成功之后就会返回ack。但如果follower同步完成之前,leader发生故障,就会丢失数据
-1:生产者等待ack,分区中ISR中的leader和follower全部写入同步成功才返回ack。但如果follower同步完成后,leader在没有发送ack之前发生故障,推选出新的leader,生产者没有收到ack就会再次发送消息给刚推选出来的leader,因为新leader已经同步成功,所以会出现数据重复
极端情况下:-1也是可能出现丢失数据。当partition中的follower效率都不高,ISR中只有一个leader,当自己写完之后也是满足了ISR全部完成的情况,直接返回ack,如果follower同步完成前leader发生故障,就会数据丢失,与1的情况相同。
数据一致性(副本间数据一致)
LEO:每个副本的最大偏移量offset,最后的数据
HW:消费者能见到的最大偏移量,ISR中最小的LEO
follower故障
会被临时踢出ISR,等恢复之后,先从磁盘中读取上次记录的LEO,再将高于HW的部分截取掉,从HW开始向leader同步,等LEO=该Partition的HW,追上leader之后,就重新加入ISR
leader故障
会在ISR中选举出一个新的leader,为保证多个副本数据之间的一致性,其他的follower会先将各个log文件中高于HW的部分截掉,再从新的leader中同步数据
Exactly Once(精准一次)
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 **At Least Once **语义(最少发送一次请求)。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被 发送一次,即 At Most Once 语义(最多发送一次请求)。
对于一些非常重要的信息,比如说 交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义(精准一次)。
At Least Once + 幂等性 = Exactly Once
所谓的
工作原理:
要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。
开启幂等性的 Producer 在 初始化的时候会被分配一个 PID(生产者ID),发往同一 Partition 的消息会附带 Sequence Number(序列号)。而 Broker 端会对<PID,Partition,SeqNumber>消息的标识做缓存,当具有相同主键的消息提交时,Broker 只 会持久化一条。
总结
ack:可靠性{0(只发一次,不管确认),1(发一次,等写的确认),-1(发一次或多次,等写等同步的的确认)}
isr:HW(高水位,isr中最小的LEO,消费者可见的最大数据,保证一致性(消费、存储))、LEO(每个副本的最大偏移量)
消费者
消费方式
consumer 采用 pull(拉)模式从 broker 中读取数据。
- 缺点:如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。
- 解决:消费时传入时间参数,如果没有数据可以消费,就等待一段时间再返回
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。
分区分配策略
RoundRobin(轮询)按照组来分
将所有分区看作一个整体轮流分配给消费者
问题:如果消费者A订阅T1,B订阅T2,轮询会把T1,T2看成一个整体,然后分配,有可能将T1给了B,T2给了1
结论:轮询只能使用在消费者订阅相同主题
优点:分配均匀
缺点:当消费者组订阅的主题不一样时,会错分配
Range,按照主题来分(先看订阅后看组)
比如AB消费者都订阅了T1(012)T2(012),T1主题被拆成01,2分别给AB,T2也是如此
优点:不会错误分配
缺点:分配不均匀
总结:当消费者组中每个消费者都订阅相同的topic时,推荐使用轮询,而如果一个消费者组中的消费者订阅不同的topic,那么使用range
分配策略被调用的时机:消费者组中成员发生变化时,重新分配
offset 的维护
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故 障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
消费者是按组消费的,比如消费者组有消费者A,消息由3个分区,每个偏移量都是10,当B加入消费者组,重新分配时,B获得一个分区,继续在10之后消费。
offset=消费者组+主题+分区
Kafka 高效读写数据
- 顺序写磁盘
- 零复制技术:直接与操作系统交互,跳过用户空间
- 分布式(分区):并发读写
Zookeeper 在 Kafka 中的作用
Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
Controller 的管理工作都是依赖于 Zookeeper 的。
Kafka 事务
事务可以保证 Kafka 在 Exactly Once 语义的基 础上,生产和消费可以跨分区和会话的精准一次,要么全部成功,要么全部失败。
面试题
什么是ISR、OSR、AR
副本同步队列、同步速度慢会被踢出到OSR列表、所有副本
HW、LEO代表什么
高水位,消费者可观察最大offset、分区中每个副本最大的offset
怎么体现消息顺序性
区内有序
拦截器、序列化器、分区器
生产者客户端整体结构是什么样子,由几个线程来处理,分别是什么
2个线程,main线程(发数据、拦截器、序列化器、分区器), Sender 线程,一个线程共享变量——RecordAccumulator。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。
消费组中消费者个数不能超过topic的分区,因为会有消费者消费不到数据,导致资源浪费
消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1?
+1
有哪些情形会造成重复消费?
先消费再提交
那些情景会造成消息漏消费?
先提交后消费
topic 的分区数可不可以增加/减少?如果可以怎么增加?如果不可以,那又是为什么?
增加可以,减少不行:数据无法处理
Kafka 有内部的 topic 吗?如果有是什么?有什么所用?
有,给普通的消费者存offset用
Kafka 分区分配的概念?
range(按主题分配,先看订阅后看组,不会分配错误,但分配不均匀),roundrobin:(按组分配,混合一起,分配均匀但没有订阅的可能也会得到消息)
日志目录结构?
.log 用于存放详细数据
.index 用于存放索引
每个Partition有一个log存放数据,当log过大1G,就会分片索引
指定一个offset,如何查找对应的消息
先通过二分查找法定位到index文件,根据offset对应的值,值就是Log里数据的详细位置
Kafka Controller的作用
通过zk在Kafka集群中选了一个broker当老大,负责管理集群 broker 的上下线,所有 topic 的分区副本分配和 leader 选举等工作。
哪些地方需要选举?
controller(抢资源)、leader(ISR,看同步时间,看同步条数)
失效副本是指什么?有哪些应对措施?
失效的leader、follower,leader失效(在isr中重新选举出新leader,新leader发通知 给所有follower,让其数据截取到HW高水位,再同步新leader),follower失效(先被踢出isr,恢复后同步之前保存的hw,再截取到现在hw,再同步leader,再回到isr)
哪些设计有如此高的性能?
分布式、分区、顺序写磁盘、零拷贝