一些Kafka术语和Kafka协议抓包分析
HW:High Water。所有Follower都写入才会往前移。例如:一个分区有3个副本,一个leader,2个follower。producer向leader写了10条消息,follower1从leader处拷贝了5条消息,follower2从leader处拷贝了3条消息,那么leader副本的LEO就是10,HW=3;follower1副本的LEO是5。
acks:生产者认为发送成功需要返回的ISR数量。当acks=all时,需要ISR所有副本都返回,非ISR不管。min.insync.replicas指定了最小的ISR,但不是最小的acks数。例如:一个分区有5个副本,Broker的min.insync.replicas设置为2,生产者设置acks=all。如果ISR有5个,必须要全部同步。
LEO:Log End Offset。当前高水位偏移量,也就是最近一个被读取消息的偏移量,同时也是最近一个被提交到集群的偏移量。每个Replica自己维护,写到哪算哪。
ISR:Insync Replica集合,Controller以replica.lag.time.max.ms的一半为周期(默认5000)维护ISR。Follower Replica在replica.lag.time.max.ms时间内向Leader请求最新(offset)的数据,则Follower会被纳入ISR。
Kafka副本机制优点:
- Read-your-writes。成功写入之后,因为follower仅用于备份,可以立即读到最新消息。
- Monotonic Reads(单调读)。对消费组来说,不会看到某条消息时在时不在。原因同上条。
Unclean 领导者选举(Unclean Leader Election)
A :ISR是可以动态调整的,所以会出现ISR为空的情况,由于Leader副本天然就在ISR中,如果ISR为空了,这说明Leader副本也挂掉了,Kafka需要重新选举一个新的Leader。
B :Kafka把所有不在ISR中的存活副本都会称为非同步副本。通常,非同步副本落后Leader太多,如果让这些副本做为新的Leader,就可能出现数据的丢失。在kafka中,选举这种副本的过程称为Unclean领导者选举。
C :Broker端参数unclean.leader.election.enable 控制是否允许Unclean领导者选举。开启Unclean领导者选举可能会造成数据丢失,但它使得分区Leader副本一直存在,不至于停止对外提供服务,因此提升了高可用性。禁止Unclean领导者选举的好处是在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
Producer、Consumer、Broker之间的请求:
用Wireshark抓Kafka协议的包来分析
每个请求和回复都有一个对应的Correlation ID,可以用于串起来。
返回都会带上字段:Throttle time(背压限流时间?)
Producer:
- 每300s向Broker发起Metadata v9 Request,
- 请求参数:Client ID、Auth
- 返回内容:
- Broker的Metadata:Node ID、Host、Port、Rack、Cluster ID
- Controller ID
- Topic MetaData:包含Topic列表,每个Topic的Partition信息
- 【发消息】主动向Broker发起Produce v8 Request,
- 请求参数:Client ID、acks、Topic、目标Partition、Message内容
- 返回内容:Topic名称、发往的Partition ID、offset(只在单个Partition内递增)
Consumer:
- 每3s向Broker发送Heartbeat v4 Request心跳,
- 请求参数:Client ID、Consumer Group、Generation ID(是啥?)、Consumer Group Member ID(消费者ID)
- 返回内容:
- 每5s向Broker发送OffsetCommit v8 Request,
- 请求参数:
- Client ID、Consumer Group、Generation ID(是啥?)、Consumer Group Member ID(消费者ID)
- Topic列表、Partition列表(每个Partition的Offset、Leader Epoch)
- 返回内容:消费的Topic列表、Partition列表
- 请求参数:
- 每300s向Broker发起Metadata v9 Request,
- 请求参数:Client ID、Auth、Topic列表
- 返回内容:
- Broker的Metadata:Node ID、Host、Port、Rack、Cluster ID
- Controller ID
- Topic MetaData:包含Topic列表,每个Topic的Partition信息
- 【消费消息】向Broker发起轮询(Fetch v11 Request),
- 如果有待消费的消息,Broker立即返回。否则过500ms后返回。
- 请求参数:
- Client ID、Replica ID(默认-1)、
- Min/Max Bytes、Max Wait Time(默认500ms)、Isolation Level(默认读未提交)
- Fetch Session ID(建立会话后不会变)、Fetch Session Epoch(递增)
- Rack
- 目标Topic信息(如果有消息的话)
- Topic名称、Partition
- Offset、Log Start Offset、Max Bytes
- 返回内容:
- 共有字段:Fetch Session ID(建立会话后不会变)
- 有消息时带上字段Topic:
- Partition列表、Offset(已更新)、Last Stable Offset、Log Start Offset
- 消息内容
参考资料
- 《Kafka权威指南》