【消息中间件】Kafka的工作原理与使用教程

TangLu 未命名 2024-04-01 10437 2

一、消息中间件与Kafka介绍

程序在进行数据交互时,如果是线程和线程之间通信,只需要把数据放到共享堆内存就可以完成数据的共享。如果是进程和进程之间需要进行数据交互,则无法使用共享内存,必须通过类似socket的网络通信流来完成。上述两种情况其实都不需要第三方软件的介入就可以完成,但是他们都有个问题,如果发送的数据量大于接受数据量就会产生数据积压,当积压的数据量达到一定规模后系统就不可用了。除此以外如果进程和进程、线程和线程是直接交互的形式的话,无法动态的从缓冲区中获取需要的数据。为了解决这种问题就需要依赖消息中间件了,而Kafka就是其中的佼佼者。

Kafka是Apache开发的一款分布式消息中间件,支持百亿级吞吐量,相比RabbitMQ、RocketMQ都更有优势,更适合在大数据环境下使用。消息中间件的出现让生产者和消费者不直接进行通信,实现“削峰填谷”(缓冲上下游瞬时突发流量,使其更平滑)。尤其是一些发送能力很强的上游系统,如果没有消息引擎的保护,下游系统可能会直接被压垮导致全链路服务崩溃。而有了消息队列就能够有效地承受上游的流量冲击,让整个链路都趋于平缓。通过多个生产者产生的数据存放在消息队列中,再由后端多个消费者进行处理的方式,解决了生产速度和消费速度不匹配的问题,还可以避免生产者或者消费者任意一方出现问题时发生数据丢失的情况。

为了避免Kafka数据丢失或者数据重复带来的数据不一致性问题,还需要服务端、客户端都做好相应配置以及采取合适的补偿方案,比如

· 生产端:要求不能少生产消息。比如使用带回调方法的API来确认消息发送是否成功、生产消息有重试机制等

· 服务端:要求不能丢失消息。比如对副本做好相关配置

· 消费端:要求不能少消费消息


二、Kafka相关名词解释

· broker(节点):kafka集群是由多个节点构成,而每个节点上的实例就叫做broker,broker又分为了Leader(主)和Follower(从)

· producer(生产者:用于产生数据到Kafka,还决定了消息的属性,比如partition和replicas的数量(kafka配置文件中虽然也可以指定这些属性,但只是默认值)

· consumer(消费者:用于处理生产出的数据并将消费进度(offset)保存在Kafka的topic中

· consumer Group(消费者组):每个consumer都属于一个特定的Group(如果没配置Group name的话则是默认的Group),将多个消费者集中到一个组中可以提升数据消费能力。在消费者组中会有一个协调者角色(Coordinator),它负责为 Group 执行 Rebalance 以及提供位移管理和组成员管理等

· topic(主题):类似表名,为不同的业务创建不同的主题来实现数据隔离。Kafka 在启动后还会自动创建 __consumer_offsets这样的主题来保存 Kafka 消费者的位移信息(默认50个分区,3个副本数)

· partition(分区、分片)每个主题都可以分割成多个分区生产者生产的消息只会被发送到1个分区中。比如有1个主题是2个分区,那么对该主题发送的消息要么在分区 0 中,要么在分区 1 中(Kafka 的分区编号是从 0 开始的)。合理的分区可以提高Kafka集群的负载能力,实现多分区同时写入,但是在需要严格保证消息消费顺序的场景下,需要将partition设置为1或者生产者通过 Key-ordering策略写入分区这些分区也分为Leader和Follower角色。当Leader失效后会从Follower中选举,用于保证服务的高可用

· replicas(副本副本是在分区的层级进行定义,可以为每个分区配置若干个副本,这些副本分散保存在不同的 Broker 上,从而确保部分 Broker 宕机后数据的安全性同一个分区下的所有副本保存有相同的消息序列,但是和传统数据库不同的是副本中会有一个Leader领导者角色负责数据的读写操作,而其他副本则属于跟随者Follower。Follower仅和Leader保持同步,不提供任何服务。只有当leader故障时,Kafka会从一个能动态调整的ISR副本集合中选取一个Follower提升为Leader(ISR副本集合中的副本都是经过“筛选”的,只有当Follower 副本落后 Leader 副本的最长时间间隔不超过Broker参数replica.lag.time.max.ms 的值才会加入,否则会被剔除,默认为10秒)。在ISR之外的副本都是非同步副本,除非开启了unclean.leader.election.enable的选项,否则这些副本不会被提升为Leader。在各个副本之间存在一个相同高水位(High Watermark),高水位的作用就是让消费者只能看到水位线之前的数据,这样可以确保角色切换后对于消费者来说数据是一致的,但是不能保证数据的完整或者不重复。数据的丢失或者重复问题由ACK机制负责,ACK级别分为0、1 和 -1(也可以表示成 all)。0 表示 Producer 不会等待 Broker 端对消息写入的应答。这个取值对应的 Producer 延迟最低,但是存在极大的丢数据的可能性。1 表示 Producer 等待 Leader 副本所在 Broker 对消息写入的应答。在这种情况下,只要 Leader 副本数据不丢失,消息就不会丢失,否则依然有丢失数据的可能。-1 表示 Producer 会等待 ISR 中所有副本所在 Broker 对消息写入的应答。这是最强的消息持久化保障。0、1均有丢失数据的问题,-1代表leader和follower全部落盘成功后才返回ack,数据不会丢失,但可能产生重复数据

kafka_replicate.png


· offset(偏移量):可以分区偏移量和消费者偏移量。分区偏移量是指生产者向分区写入消息后,消息在分区中的位置信息,也可以用于标识下次应该读取的消息位置。而消费者偏移量(Consumer Offset)消费者在消费消息的过程中记录当前消费到了分区的哪个位置

· lag(滞后值)消费者落后于生产者的程度,是非常重要的监控指标。Lag的单位通常是消息的数量。比如生产者向某主题生产了 100 万条消息,此时 Lag 等于 20 万,那么代表当前的消费者滞后了 20 万条消息 。虽然 Lag 一般是在主题级别上讨论的,但实际上Kafka 监控 Lag 时的层级是在分区上。如果要计算主题级别就需要累加所有主题分区的 Lag,合并成最终的 Lag 值

· zookeeper:Kafka所依赖的注册中心服务,由于kafka支持集群化,当Leader和Follower不在一个节点中就需要将leader的信息注册在Zookeeper中。对于Kafka而言,它将每个节点的metadata信息告诉zk,再由zk来配置和存储节点与主题队列信息。zk和kafka的topic有一样的leader\follower角色


三、部署集群前的资源规划

1、磁盘

在进行集群搭建前要做好磁盘容量的规划,比如说有业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,消息需要保存两周。假设消息的平均大小是 1KB,那么需要的磁盘空间就等于 1 亿 * 1KB * 2 / 1000 / 1000 * 14 = 2.8TB,然后加上索引等其他占用再预留出 10% 的磁盘空间,因此总的存储容量就是 3TB左右。由于Kafka 支持数据压缩,假设压缩比是 0.75,那么最后需要的存储空间就是 0.75 * 3 = 2.25TB

2、网络

假设业务目标是在 1 小时内处理 1TB 的业务数据,按照最常见的千兆网络进行规划,理论上每秒可处理的数据量大概是 1024GB / 3600s ≈ 0.284 GB/s ≈ 284MB/s(注意是MB)。如果 Kafka 最多只能用 70% 的带宽资源,那么单台 Kafka 服务器满载的情况下能使用到的带宽资源是 700Mb 。但是通常不会持续满负载运行,所以真实情况下可能是1/3的带宽占用,即 700Mb / 3  ≈  240Mbps(30MB)。那么约等于需要 10 台服务器才可以满足要求(284MB/30M)。如果消息还需要额外复制两份,那么总的服务器台数还要乘以 3。


四、部署Zookeeper+Kafka集群

1、由于Kafka是一个分布式系统,它依赖ZooKeeper来完成协调任务、状态与配置管理,所以在运行Kafka之前要先安装并启动ZK,具体可参考本站文章——《ZooKeeper集群配置教程》


2、通过官方网站kafka.apache.org/downloads下载Kafka二进制安装包,解压即可使用。这里需要注意版本号的问题,他是按照kafka.scala_version-kafka_version的方式命令的,只有最后一段数字才是kafka自身版本号。

tar zxf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/
ln -s /usr/local/kafka_2.12-3.0.0 /usr/local/kafka


3、Kafka配置文件server.properties主要参数介绍(属于全局配置,还可以通过命令行为指定的topic单独配置部分参数)

#通常只需要修改server.properties这个文件,虽然还有生产者、消费者的属性文件,但是一般都是由程序去控制
vi config/server.properties
broker.id=1             #集群中每个节点ID不能相同
log.dirs=/data/kafka/log       #Kafka消息存放路径,并不是日志,kafka的日志会自动生成到其它目录中
zookeeper.connect=192.168.1.100:2181,192.168.1.101:2181,192.168.1.102:2181  #zk服务地址与端口
listeners=PLAINTEST://192.168.1.100:9092  #指定客户端需要通过什么协议和地址来访问Kafka,PLAINTEXT表示明文传输、SSL表示加密传输,也可以自定义协议名字 
log.flush.interval.message=10000      #日志从内存刷到磁盘的阈值,达到这个数量就落盘,通常要减少落盘的频率
log.retention.hours=168           #日志落盘后保留多少小时,超时则删除,即便没有消费,避免磁盘写满
log.segment.bytes=1073741824        #单个日志的大小,超出则创建新的日志,删掉旧的日志
log.retention.check.interval.ms=300000   #检测日志是否达到删除条件的周期
auto.create.topics.enable=false       #是否允许自动创建Topic,建议false。由运维严格把控topic的创建
unclean.leader.election.enable=false    #是否允许Unclean Leader,建议false。代表落后进度太多的副本没资格进行leader选举,避免切换后数据丢失
auto.leader.rebalance.enable=false     #是否定期进行 Leader 选举,建议为false,减少不必要的选举行为

#数据存储策略
log.retention.{hours|minutes|ms}=168  #控制一条消息数据被保存多长时间,log.retention.hours=168表示默认保存 7 天的数据
log.retention.bytes=-1       #指定 Broker 为消息保存的总磁盘容量大小,-1表示不限制
message.max.bytes=10000000     #Broker 能够处理的最大消息大小,单位是字节

#开启压缩功能,可以极大的优化磁盘占用和网络传输开销。该参数在broker和producer都可以设置,设置为producer表示和生产者一致。避免两端设置不统一导致的broker按照自己的策略进行一次解压+重新压缩,产生额外的CPU开销
compression.type=producer

# 线程数,默认8,表示每台 Broker 启动后有8个线程用于处理请求。如果CPU资源充足可以调大该参数允许更多的并发请求被同时处理
num.io.threads=8


自定义协议时需要通过listener.security.protocol.map参数确定协议底层使用的安全协议

listeners=CONTROLLER: //localhost:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT  #CONTROLLER这个自定义协议底层使用明文传输数据


4、使用kafka-server-start.sh脚本后台启动kafka

nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
# bin/kafka-server-start.sh -daemon config/server.properties #另一种后台启动方式


5、登录ZooKeeper后可以看到kfaka的信息已经注册进来

./zkCli.sh 
ls /


kafka1.png


五、Kafka主题管理常用命令

1、创建topic

通过kafka-topics.sh脚本完成topic的创建,在创建topic时由于名称冲突限制,topic名称不能同时包含.和_

#创建topic
bin/kafka-topics.sh --bootstrap-server 172.20.1.171:9092 --create --replication-factor 1 --partitions 1 --topic test_topic
#--bootstrap-server:指定ZooKeeper集群中任一节点地址。该参数从Kafka2.2开始替代了--zookeeper,使用--bootstrap-server会受到安全认证体系的约束,对命令发起者进行权限验证,更安全
#--replication-factor:topic的总副本数,1代表主备节点总共只有一个副本,相当于没有备份,所以通常3节点的话设置为2
#--partitions:topic分区的数量,通常和节点数保持一致
#--topic:主题名字

#创建topic时还可以指定参数
bin/kafka-topics.sh --bootstrap-server 172.20.1.171:9092 --create --replication-factor 1 --partitions 1 --topic test_topic --config retention.ms=15552000000 --config max.message.bytes=5242880
# retention.ms:该 Topic 消息被保存的时长,默认是 7 天,
# retention.bytes:该 Topic 预留多大的磁盘空间。和全局参数作用相似,默认值是 -1表示可以无限使用磁盘空间


2、查看topic

#查看topic
bin/kafka-topics.sh --bootstrap-server 172.20.1.171:9092 --list 


#查看指定topic详细信息,包含分区、副本与Leader情况,如果不指定具体的主题名称会返回所有可见主题的详细数据
bin/kafka-topics.sh --bootstrap-server 172.20.1.171:9092 --topic test_topic --describe


3、删除topic

如果执行删除后发现主题的分区数据依然在硬盘中,没有成功被清除。通常是副本所在的 Broker 宕机了,一般重启 Broker 后就会自动恢复。还有一种情况是待删除主题的部分分区还在执行迁移过程。但是不管什么原因,一旦碰到主题无法删除的问题,都可以通过手动清理ZooKeeper数据来实现。首先是删除 ZooKeeper 节点 /admin/delete_topics 下以待删除主题为名的 znode。然后删除该主题在磁盘上的分区目录,最后一步是在 ZooKeeper 中执行 rmr /controller,触发 Controller 重选举,刷新 Controller 缓存。在执行最后一步时可能造成大面积的分区 Leader 重选举,通常执行前两步就可以生效,只是 Controller 缓存中没有清空待删除主题罢了,不影响使用

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>


4、修改topic分区数(只能增加不能减少)

bin/kafka-topics.sh --bootstrap-server 172.20.1.171:9092 --alter --topic test_topic --partitions 3


5、修改topic参数

#也能指定--bootstrap-server参数,只不过--bootstrap-server是用来设置动态参数的
bin/kafka-configs.sh --zookeeper 172.20.1.171:2181 --entity-type topics --entity-name test_topic --alter --add-config max.message.bytes=10485760


6、生产消息

#指定生产者参数 acks 为 -1,同时启用了 LZ4 的压缩算法,然后就可以向test-topic发送消息
bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4


7、消费消息

#-from-beginning从队列头部开始消费数据,否则是从最新位移读取消息,如果此时没有新消息,则该命令输出为空
#--group指定了group信息,否则每次运行都会自动生成以console-consumer开头的消费者组
bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false


8、测试生产者性能

向指定主题发送 1 千万条消息,每条消息大小是 1KB,命令执行后会输出生产者的吞吐量 (MB/s)、消息发送延时以及各种分位数下的延时。需要关心的就是延时分布情况,通常关注到 99th 分位就可以了,这里 99th 值是 604ms,表明生产消息有 99% 的延时都在 604ms 以内,这个数据可以作为生产者对外承诺的 SLA

bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4

2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.


9、测试消费者性能

该脚本打印出消费者的吞吐量数据,比如本例中的 1723MB/s。不过它没有计算不同分位数下的分布情况。因此,该脚本的实际使用率要比生产者性能测试脚本的使用率低

bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-06-26 15:24:18:138, 2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225, 1769598.3012


10、查看消息文件数据

使用 kafka-dump-log 脚本可以查看消息体中的数据

$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log


11、重设消费者位移

通过 kafka-consumer-groups 脚本可以手动设置消费者位移信息( Kafka 0.11 之前只能使用 API 的方式来重设位移)

# Earliest 策略
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest --execute

# Latest 策略
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute

# Current 策略
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute

# Specified-Offset 策略
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute

# DateTime 策略
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute


五、Kafka数据安全性配置


如果要让 Kafka 做到消息不丢失的话,需要从以下几点优化:

1、Producer

· 生产者需要使用producer.send(msg, callback)这样带有回调通知的 send 方法

· 生产者需要设置 acks = all。这是最高等级的数据已提交定义,表明所有副本 Broker 都要接收到消息才算是数据已提交

· 生产者将 retries 设置为较大值。当消息发送失败时够自动重试消息发送,避免消息丢失

2、Broker

·  Broker 参数 unclean.leader.election.enable = false。避免消息落后的 Broker 成为新的 Leader,从而造成消息的丢失

·  Broker 参数 replication.factor >= 3,将消息多保存几份

·  Broker 参数 min.insync.replicas > 1。消息至少要被写入到多少个副本才算是已提交,设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1

·  确保 replication.factor > min.insync.replicas。推荐 replication.factor = min.insync.replicas + 1,如果两者相等,只要有一个副本挂了,整个分区就无法正常工作

3、Consumer 

·  确保消息者消费完再提交,最好将 enable.auto.commit 设置成 false,采用手动提交位移的方式。避免 Consumer 从 Kafka 获取到消息进行多线程异步处理时,某个线程运行失败导致消息没有被成功处理,但此时位移已经被更新了,所以其他线程获取到的偏移量就是错误的,对于 Consumer 而言数据就丢失了


六、Kafka 消费者组的Rebalance机制

如果某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。Kafka 会平均会为每个 Consumer 分配 5 个分区,这个分配的过程就叫 Rebalance

1、Kafka Rebalance 的触发条件

· 组成员数发生变更

比如有新的 Consumer 实例加入组或者离开组,或者其它导致组成员发生变化的情况发生时

· 订阅主题数发生变更

Consumer Group 可以使用正则表达式的方式订阅主题,如果 Consumer Group 运行过程中新建的主题和正则表达式匹配上了,那么 Group 就会发生 Rebalance

· 订阅主题的分区数发生变更

当分区数增加时会触发订阅该主题的所有 Group 开启 Rebalance


2、Kafka Rebalance 的影响

·  Rebalance 过程中所有 Consumer 实例都会停止消费,直至 Rebalance 完成,而 Rebalance 是一个缓慢的过程

·  Rebalance 过程中所有 Consumer 实例会全部重新分配分区,这不是一个高效的做法。例如实例 A 之前负责消费分区 1、2、3,在 Rebalance 发生后会被重新分配所有分区,而不是让实例 A 继续消费分区 1、2、3。这样实例 A 连接这些分区所在 Broker 的 TCP 连接就无法复用,需要重新创建连接其他 Broker 的 Socket 资源


3、如何减少Kafka Rebalance 的影响

Rebalance 的低效和对 TPS 的影响目前没有好的解决办法,只能尽量避免。比如订阅主题数和分区数的变化一般属于运维操作,可以尽量避免的


评论

精彩评论
2020-06-16 19:01:22

博主写的不错

2020-06-16 20:27:30

@爱乐 @爱乐:谢谢支持 一起学习