首页天道酬勤kafka常用命令,消息队列作用

kafka常用命令,消息队列作用

张世龙 05-13 09:53 114次浏览

Kafka群集中有一个或多个broker,其中一个broker被选为“控制器”,用于管理整个群集中所有分区和副本的状态。 如果分区的leader副本出现故障,控制器将选择该分区的新leader副本。 当检测到分区的ISR集合发生更改时,控制器会通知所有中介更新元数据信息。 如果使用kafka-topics.sh脚本增加topic的分区数,则重新分配分区也由控制器负责。

Kafka的控制器选举工作依靠Zookeeper,竞选控制器的中介会在Zookeeper中创建名为/controller的临时(EPHEMERAL )节点。 此临时节点的内容请参考以下内容。

{'version':1,' brokerid':0,' timestamp ' : ' 1529210278988 ' }其中version在当前版本中固定为1,brokerid

在任意时刻,集群内都有控制器,只有一个。 每个broker启动时,都会尝试读取/controller节点的brokerid值。 如果brokerid的值不是-1,则当前broker将放弃选举,因为其他broker节点已经作为控制器成功竞选。 如果Zookeeper中没有名为/controller的节点,或者此节点中的数据异常,则尝试创建名为/controller的节点。 当前broker尝试创建节点时,其他broker可能也会尝试同时创建。 只有创建成功的broker是控制器,创建失败的broker是控制器。每个broker将当前控制器的brokerid值保存在内存中。 此值可标识为activeControllerId。

Zookeeper还具有与控制器相关联的/controller_epoch节点。 该节点是永久节点,其中包含整数controller_epoch值。 controller_epoch用于记录控制器更改的次数,即当前控制器是第几代控制器,也称为“控制器纪元”。 controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,如果控制器发生更改,且未选择新控制器,则将此字段的值加1。 与控制器交互的每个请求都具有字段controller_epoch,如果请求的controller_epoch的值小于存储器中的controller_epoch的值,则该请求被发送到过期的控制器如果请求的controller_epoch值大于内存中的controller_epoch值,则表示已选择新控制器。 因此,Kafka通过controller_epoch保证控制器的唯一性,进而保证相关操作的一致性。

具有控制器角色的broker需要比其他常见broker多一个角色。 具体细节如下。

监听分区相关变化。 在Zookeeper的/admin/reassign_partitions节点上注册PartitionReassignmentListener,用于处理分区重新分配。 在Zookeeper的/isr_change_notification节点上注册IsrChangeNotificetionListener,以处理更改ISR集合的操作。 将首选复制副本选择监听器添加到Zookeeper的/admin/preferred-replica-election节点,以处理首选副本的选举操作。 拦截与主题相关的变化。 在Zookeeper的/brokers/topics节点中添加TopicChangeListener,以适应topic的增减变化; 将TopicDeletionListener添加到Zookeeper的/admin/delete_topics节点中,以处理topic删除。 拦截中介相关变化。 将BrokerChangeListener添加到Zookeeper的/brokers/ids/节点,以适应broker的变化。 从Zookeeper读取、检索和正确管理有关当前主题、分区和中介的所有信息。 将分区修改列表器添加到Zookeeper的/brokers/topics/[topic]节点(适用于所有topic ),以监视对topic分区分配的更改。 启动和管理分区状态机和复制副本状态机。 更新集群的元数据信息。 如果auto.leader.rebalance.enable参数设置为true,则还将打开名为“auto-leader-rebalance-task”的调度任务,以分散分区的首选副本

这个列表可能会让读者陷入混乱,甚至完全不知道发生了什么。 不需要~笔者多用于强调控制器的功能,这些功能的具体细节将在后面的文章中具体介绍。

控制器在选举成功后,需要读取Zookeeper内各节点的数据,初始化并管理上下文信息(ControllerContext )

这些上下文信息,比如为某个topic增加了若干个分区,控制器在负责创建这些分区的同时也要更新上下文信息,并且也需要将这些变更信息同步到其他普通的broker节点中。不管是监听器触发的事件,还是定时任务触发的事件,亦或者是其他事件(比如ControlledShutdown)都会读取或者更新控制器中的上下文信息,那么这样就会涉及到多线程间的同步,如果单纯的使用锁机制来实现,那么整体的性能也会大打折扣。针对这一现象,Kafka的控制器使用单线程基于事件队列的模型,将每个事件都做一层封装,然后按照事件发生的先后顺序暂存到LinkedBlockingQueue中,然后使用一个专用的线程(ControllerEventThread)按照FIFO(First Input First Output, 先入先出)的原则顺序处理各个事件,这样可以不需要锁机制就可以在多线程间维护线程安全。

 

在Kafka的早期版本中,并没有采用Kafka Controller这样一个概念来对分区和副本的状态进行管理,而是依赖于Zookeeper,每个broker都会在Zookeeper上为分区和副本注册大量的监听器(Watcher)。当分区或者副本状态变化时,会唤醒很多不必要的监听器,这种严重依赖于Zookeeper的设计会有脑裂、羊群效应以及造成Zookeeper过载的隐患。在目前的新版本的设计中,只有Kafka Controller在Zookeeper上注册相应的监听器,其他的broker极少需要再监听Zookeeper中的数据变化,这样省去了很多不必要的麻烦。不过每个broker还是会对/controller节点添加监听器的,以此来监听此节点的数据变化(参考ZkClient中的IZkDataListener)。

当/controller节点的数据发生变化时,每个broker都会更新自身内存中保存的activeControllerId。如果broker在数据变更前是控制器,那么如果在数据变更后自身的brokerid值与新的activeControllerId值不一致的话,那么就需要“退位”,关闭相应的资源,比如关闭状态机、注销相应的监听器等。有可能控制器由于异常而下线,造成/controller这个临时节点会被自动删除;也有可能是其他原因将此节点删除了。

当/controller节点被删除时,每个broker都会进行选举,如果broker在节点被删除前是控制器的话,在选举前还需要有一个“退位”的动作。如果有特殊需要,可以手动删除/controller节点来触发新一轮的选举。当然关闭控制器所对应的broker以及手动向/controller节点写入新的brokerid的所对应的数据同样可以触发新一轮的选举。

kafka的ack,kafka 卡住 Spring单元测试框架,springboot单元测试