玩命加载中 . . .

Kafka笔记


Kafka

概述

定义

Kafka是一个分布式发布/订阅模式消息队列,主要应用于大数据实时处理领域。

消息队列

使用消息队列的好处

  1. 解耦
  2. 可恢复性:一部分组件失效后,不会影响整个系统,消息队列降低了进程之间的耦合性,加入消息队列的消息仍然可以在系统恢复后处理
  3. 缓冲:解决生产消息和消费消息的处理速度不一致
  4. 灵活性、峰值处理能力
  5. 异步通信

消息队列的两种模式

  1. 点对点:一对一,消费者拉取数据,消息收到后消息删除

  2. 发布/订阅:一对多,消费者消费数据之后不会清除消息,消息可以给多个消费者使用,Kafka是主动拉取消息

    优点:可以控制获取消息的速度

    缺点:不断去问订阅是否有新消息

Kafka基础架构

1

Producer :消息生产者,就是向 kafka broker 发消息的客户端;

Consumer :消息消费者,向 kafka broker 取消息的客户端;

消息队列:主题Topic(用于分类)、分区Partition(用于负载均衡)、Leader主(用于备份)

消费者:消费者组(一个主题可以被一组组多人消费,一个分区只能被一个组中一人消费)

Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

安装部署

集群规划

jar包下载

http://kafka.apache.org/downloads.html

集群部署

命令行操作

1)查看当前服务器中的所有 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –list

2)创建 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –create –replication-factor 3 –partitions 1 – topic first

选项说明:

–topic 定义 topic 名

–replication-factor 定义副本数

–partitions 定义分区数

3)删除 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –delete –topic first

需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。

4)发送消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh –brokerlist hadoop102:9092 –topic first >hello world

5)消费消息

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \ –zookeeper hadoop102:2181 –topic first

Kafka架构深入

工作流程

2

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

每个 partition(分区) 对应于一个 log 文 件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset(偏移量,就是第几条数据的编号)。消费者组中的每个消费者,都会实时记录自己 消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

根据偏移量先找到该偏移量在哪个索引中,再在索引中找到偏移量,再获取偏移量对应真实数据的位置,再找到Log文件中数据。

存储机制

3

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位 效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment(分片)。每个 segment 对应两个文件——“.index”文件和“.log”文件。

index 和 log 文件以当前 segment 的第一条消息的 offset 命名。

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

“.index”文件存储大量的索引信息(左边为偏移量(第几条数据),右边为数据的具体位置),“.log”文件存储大量的数据,索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。

4

这些文件位于一个文件夹下,该文件夹的命名 规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first0,first-1,first-2。

Kafka生产者

分区策略

分区原因
  1. 方便在集群中扩展:提高负载能力
  2. 可提高并发:可以以partition为单位读写
分区的原则

我们需要将 producer 发送的数据封装成一个 ProducerRecord 对象

5

(1)指明 partition 的情况下,直接将指明的值直接作为 partiton 值;

(2)没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;

(3)既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后 面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin(轮询) 算法。

数据可靠性保证(数据丢失、重复)

为保证 producer 能可靠的发送数据到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。

问:何时发送ack?

  1. 半数以上完成同步,就发 送 ack
    1. 优点:延迟低
    2. 缺点:选举新leader时,需要2n+1台副本,确保n台故障仍可以超过半数
  2. 全部完成同步,就发送ack
    1. 优点:选举新leader时,需要n+1台副本,确保n台挂掉还有1个存活,也算全部完成
    2. 缺点:延迟高
ISR(优秀的follower集合、副本同步队列)

问题:采用全部完成同步的方案,如果所有follower开始同步数据,有一台因为故障,很长时间不能完成同步,leader需要一直等下去,为解决这个问题,就有了ISR。

  1. 原理:Leader维护了一个动态的 in-sync replica set (ISR),意思是和leader保持紧密同步的leader。当ISR中的follower完成数据同步之后,leader就会发送ack。如果长时间没有同步完成,该follower就会被踢出ISR。这个时间阈值由replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。
  2. 以前版本是按两个条件进入ISR:同步时间差,同步数据量差。但之后版本移除了同步数据量差,因为在同步时同步数据量没有达到被踢出,同步时间又快被加入,反复进出会影响性能。
ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失, 所以没必要等 ISR 中的 follower 全部接收成功。

所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡, 选择以下的配置。

acks参数配置

0:生产者不等待Kafka集群的ack,这个操作延迟最小,Kafka刚接收还没有写入磁盘就已经返回,但如果集群故障,就有可能丢失数据

1:生产者等待ack,分区中的leader写入成功之后就会返回ack。但如果follower同步完成之前,leader发生故障,就会丢失数据

-1:生产者等待ack,分区中ISR中的leader和follower全部写入同步成功才返回ack。但如果follower同步完成后,leader在没有发送ack之前发生故障,推选出新的leader,生产者没有收到ack就会再次发送消息给刚推选出来的leader,因为新leader已经同步成功,所以会出现数据重复

-1

​ 极端情况下:-1也是可能出现丢失数据。当partition中的follower效率都不高,ISR中只有一个leader,当自己写完之后也是满足了ISR全部完成的情况,直接返回ack,如果follower同步完成前leader发生故障,就会数据丢失,与1的情况相同。

数据一致性(副本间数据一致)

7

LEO:每个副本的最大偏移量offset,最后的数据

HW:消费者能见到的最大偏移量,ISR中最小的LEO

  1. follower故障

    会被临时踢出ISR,等恢复之后,先从磁盘中读取上次记录的LEO,再将高于HW的部分截取掉,从HW开始向leader同步,等LEO=该Partition的HW,追上leader之后,就重新加入ISR

  2. leader故障

    会在ISR中选举出一个新的leader,为保证多个副本数据之间的一致性,其他的follower会先将各个log文件中高于HW的部分截掉,再从新的leader中同步数据

Exactly Once(精准一次)

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 **At Least Once **语义(最少发送一次请求)。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被 发送一次,即 At Most Once 语义(最多发送一次请求)。

对于一些非常重要的信息,比如说 交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once 语义(精准一次)。

At Least Once + 幂等性 = Exactly Once

所谓的

工作原理:

要启用幂等性,只需要将 Producer 的参数中 enable.idompotence 设置为 true 即可。

开启幂等性的 Producer 在 初始化的时候会被分配一个 PID(生产者ID),发往同一 Partition 的消息会附带 Sequence Number(序列号)。而 Broker 端会对<PID,Partition,SeqNumber>消息的标识做缓存,当具有相同主键的消息提交时,Broker 只 会持久化一条。

总结

ack:可靠性{0(只发一次,不管确认),1(发一次,等写的确认),-1(发一次或多次,等写等同步的的确认)}

isr:HW(高水位,isr中最小的LEO,消费者可见的最大数据,保证一致性(消费、存储))、LEO(每个副本的最大偏移量)

消费者

消费方式

  1. consumer 采用 pull(拉)模式从 broker 中读取数据。

    • 缺点:如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。
    • 解决:消费时传入时间参数,如果没有数据可以消费,就等待一段时间再返回
  2. push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。

分区分配策略

  1. RoundRobin(轮询)按照组来分

    将所有分区看作一个整体轮流分配给消费者

    问题:如果消费者A订阅T1,B订阅T2,轮询会把T1,T2看成一个整体,然后分配,有可能将T1给了B,T2给了1

    结论:轮询只能使用在消费者订阅相同主题

    优点:分配均匀

    缺点:当消费者组订阅的主题不一样时,会错分配

  2. Range,按照主题来分(先看订阅后看组)

    比如AB消费者都订阅了T1(012)T2(012),T1主题被拆成01,2分别给AB,T2也是如此

    优点:不会错误分配

    缺点:分配不均匀

总结:当消费者组中每个消费者都订阅相同的topic时,推荐使用轮询,而如果一个消费者组中的消费者订阅不同的topic,那么使用range

分配策略被调用的时机:消费者组中成员发生变化时,重新分配

offset 的维护

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故 障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

消费者是按组消费的,比如消费者组有消费者A,消息由3个分区,每个偏移量都是10,当B加入消费者组,重新分配时,B获得一个分区,继续在10之后消费

offset=消费者组+主题+分区

Kafka 高效读写数据

  1. 顺序写磁盘
  2. 零复制技术:直接与操作系统交互,跳过用户空间
  3. 分布式(分区):并发读写

Zookeeper 在 Kafka 中的作用

Kafka 集群中有一个 broker 会被选举为 Controller,负责管理集群 broker 的上下线所有 topic 的分区副本分配leader 选举等工作

Controller 的管理工作都是依赖于 Zookeeper 的。

8

Kafka 事务

事务可以保证 Kafka 在 Exactly Once 语义的基 础上,生产和消费可以跨分区和会话的精准一次,要么全部成功,要么全部失败。

面试题

  1. 什么是ISR、OSR、AR

    副本同步队列、同步速度慢会被踢出到OSR列表、所有副本

  2. HW、LEO代表什么

    高水位,消费者可观察最大offset、分区中每个副本最大的offset

  3. 怎么体现消息顺序性

    区内有序

  4. 拦截器、序列化器、分区器

  5. 生产者客户端整体结构是什么样子,由几个线程来处理,分别是什么

    2个线程,main线程(发数据、拦截器、序列化器、分区器), Sender 线程,一个线程共享变量——RecordAccumulator。

    main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。

  6. 消费组中消费者个数不能超过topic的分区,因为会有消费者消费不到数据,导致资源浪费

  7. 消费者提交消费位移时提交的是当前消费到的最新消息的 offset 还是 offset+1?

    +1

  8. 有哪些情形会造成重复消费?

    先消费再提交

  9. 那些情景会造成消息漏消费?

    先提交后消费

  10. topic 的分区数可不可以增加/减少?如果可以怎么增加?如果不可以,那又是为什么?

    增加可以,减少不行:数据无法处理

  11. Kafka 有内部的 topic 吗?如果有是什么?有什么所用?

    有,给普通的消费者存offset用

  12. Kafka 分区分配的概念?

    range(按主题分配,先看订阅后看组,不会分配错误,但分配不均匀),roundrobin:(按组分配,混合一起,分配均匀但没有订阅的可能也会得到消息)

  13. 日志目录结构?

    .log 用于存放详细数据

    .index 用于存放索引

    每个Partition有一个log存放数据,当log过大1G,就会分片索引

  14. 指定一个offset,如何查找对应的消息

    先通过二分查找法定位到index文件,根据offset对应的值,值就是Log里数据的详细位置

  15. Kafka Controller的作用

    通过zk在Kafka集群中选了一个broker当老大,负责管理集群 broker 的上下线所有 topic 的分区副本分配leader 选举等工作

  16. 哪些地方需要选举?

    controller(抢资源)、leader(ISR,看同步时间,看同步条数)

  17. 失效副本是指什么?有哪些应对措施?

    失效的leader、follower,leader失效(在isr中重新选举出新leader,新leader发通知 给所有follower,让其数据截取到HW高水位,再同步新leader),follower失效(先被踢出isr,恢复后同步之前保存的hw,再截取到现在hw,再同步leader,再回到isr)

  18. 哪些设计有如此高的性能?

    分布式、分区、顺序写磁盘、零拷贝


文章作者: 小苏
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 小苏 !
评论
  目录