学习极客时间 《Kafka核心技术与实战》笔记-客户端实践及原理剖析21-22
原极客时间连接:https://time.geekbang.org/column/intro/191?utm_campaign=guanwang&utm_source=baidu-ad&utm_medium=ppzq-pc&utm_content=title&utm_term=baidu-ad-ppzq-title
消费者TCP连接管理
创建连接
消费者端主要的程序入口是KafkaConsumer类,new KafkaConsumer(properties)后是不会创建socket连接,这一点 和 JAVA生产者是有区别的。生产者KafkaProducer在构建实例时,后天会默认启动一个Sender线程,也就建立起了Socket连接。
消费者TCP连接是在调用KafkaConsumer.poll方法时创建的:
- 发起findCoordinator请求时,确定协调者和获取集群元数据
FindCoordinator 请求集群中当前负载最小的Broker(谁的待发送请求最少),此时会创建一个socket连接。 - 连接协调者时,执行组成员管理操作
通过FindCoordinaor请求后,消费者会得到应该连接的Broker,那么下一步就会创建连接此Broker的socket连接。 - 消费数据时
同样,进行消息消费时,也必须想各分区创建socket连接来请求获取消息。
关闭连接
和平常socket连接一样,在作完业务处理后连接会被释放关闭,消费者关闭连接也是如此,关闭了解时分为主动关闭(手动)和Kafka自动关闭。
- 手动调用KafkaConsumer.close() 或者 执行kill 命令
- Kafka自动关闭,设置参数 connection.max.idel.ms 时间控制自动关闭时间,默认9分钟,超过9分钟以上的连接会被Kafka自动关闭。设置为-1 ,禁用定时关闭的案例,此时TCP连接将不会定期清除,将会永久成为僵尸连接,需要特别注意。
消费者组消费进度监控
消费进度,即指滞后程度,消费这当前落后与生产者的程度。
Lag 单位是消息数,Lag=20 ,即表示落后于生产者20条消息;Kafka监控Lag的层级是在分区上的,即其的值是每个分区的Lag的和。
Lag值趋于0,表示生产速度和消费速度相当;
Lag值较小,表示滞后程度很小;
Lag值非常大,生产速度和消费速度无法匹配,可能造成消费消息时已经不在内存了,需要从硬盘获取消息。所以Lag的值需要特表关注,监控Lag 的三种方法:
- 1.Kafka-consumer-groups脚本
Kafka自带的命令工具,bin/kafka-consumer-groups.sh(bat)
$ bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker连接信息> --describe --group <group名称>
执行命令后一般如下图所示:
第二种情况,没有consumer-id 、host和client-id,还有输出一行“consumer group ‘testgroup’ has no active members”,是因为执行脚本时,没有起订消费者程序。
如果执行完命令后什么也不输出,可能是版本过低不支持查询非active消费者组,需要升级下kafka版本。
- 2.Kafka Java Consumer API编程
社区提供的Kafka Java Consumer API 分别提供查询当前分区最新消息位移和消费者组最新消费位移两组方法,可以通过他们计算出Lag。
public static Map<TopicPartit