博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka删除一个topic
阅读量:7219 次
发布时间:2019-06-29

本文共 11259 字,大约阅读时间需要 37 分钟。

前言

当我们在shell中执行topic删除命令的时候` kafka-topics --delete --topic xxxx --zookeeper xxx`,会显示,xxxx已经被标记为删除。然后过了很久你再查看topic列表,发现那个topic依然被标记删除,显然删除没有真正执行。下面就深入了解,kafka删除topic的流程。 

先说结论

delete.topic.enable,配置默认是false,意思是 是否允许kafka集群删除topic,只有为true的情况,kafka才会删除那些已经被标记为删除的topic。否则topic将不会被删除,仅仅被标记,所谓标记,也就是在zk上记录那些delete的topic。注意修改完后需要重启集群。

如果想手动删除topic,那么需要做两件事情

      1. 删除zookeeper上topic的数据

          /brokers/ids/topics/xxx

          /config/topics/xxx

      2. 删除该topic所有partition和replica的数据

          数据在所有broker的`log.dirs`目录下,文件夹结构是topic-partition的方式,直接将该topic的整个文件夹删除即可 

Topic标记删除

通过shell命令可以找到操作topic的类TopicCommand,在删除topic这块逻辑中,只做了3件事情,1.判断该topic是否存在;2.判断topic是否是kafka内部topic(不允许被删除); 3.在zk上创建一个节点(/admin/delete_toppics/xxx)来记录删除的topic。下面是详细代码

def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {  val topics = getTopics(zkUtils, opts)  val ifExists = opts.options.has(opts.ifExistsOpt)  // topic不存在 直接抛出异常  if (topics.isEmpty && !ifExists) {    throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),       opts.options.valueOf(opts.zkConnectOpt)))  }  topics.foreach { topic =>    try {      // topic是kafka自己的,比如__offset之类的topic,那么不允许删除      if (Topic.isInternal(topic)) {        throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))      } else {       // 在zk上创建一个节点,标记该topic是需要被删除的       zkUtils.createPersistentPath(getDeleteTopicPath(topic))        println("Topic %s is marked for deletion.".format(topic))        println("Note: This will have no impact if delete.topic.enable is not set to true.")      }    } catch {      case _: ZkNodeExistsException =>        println("Topic %s is already marked for deletion.".format(topic))      case e: AdminOperationException =>        throw e      case _: Throwable =>        throw new AdminOperationException("Error while deleting topic %s".format(topic))    }  }}

至此topic删除已经走完,那么背标记为删除的topic是在什么时候才被真正的删除呢?下面接着分析。

Topic删除

首先还是从zk上标记的删除topic开始,KafkaController通过订阅zookeeper的删除节点的变化来监听是否有新的topic需要被删除,再通过注册TopicDeletionListener处理监听到的删除事件,下面就贴一段处理删除事件类TopicDeletion的代码。主要逻辑也是3个,1.判断删除的topic是否存在;2.判断是否开启delete.topic.enable功能;3.判断是否有正在重新分配的topic,topic重分配会导致topic的partition数据在broker中转移,从而导致controller无法精准的定位到该topic所在broker的信息,所以正在重新分配的topic不能被删除,直到重分配结束;4.都满足条件,那么执行删除逻辑 

override def process(): Unit = {    if (!isActive) return    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics    if (nonExistentTopics.nonEmpty) {      warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))      nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))    }    topicsToBeDeleted --= nonExistentTopics    if (config.deleteTopicEnable) {      if (topicsToBeDeleted.nonEmpty) {        info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))        // mark topic ineligible for deletion if other state changes are in progress        topicsToBeDeleted.foreach { topic =>          val partitionReassignmentInProgress =           controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)          if (partitionReassignmentInProgress)            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))        }        // add topic to deletion list       topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)      }    } else {      // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics      for (topic <- topicsToBeDeleted) {        info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")       zkUtils.zkClient.delete(getDeleteTopicPath(topic))      }    }  }}

然后就进入删除topic的主角TopicDeletionManager,这个类控制了topic的删除逻辑。联系到`delete.topic.enable`这个配置,几乎所有方法中都很明确地说明了,只有在`delete.topic.enable`为true的情况下,topic才会被删除。方法名都可以很好的体现功能,我们直接看方法onTopicDeletion(),里面调用了onPartitionDeletion(),而onPartitionDeletion()又调用了startReplicaDeletion()。显而易见,删除topic其实就是把topic下所有partition删了,而partition又有很多replica组成,也就是说需要把partition的replica删了。上面提到的replica有可能正在重分配,或者出现replica暂时不可用的情况,那么拥有这些replica的topic会被当做不合格的topic,这些topic不会继续删除,直到下次重试的时候,topic的状态变为合格。

/**   * Invoked with the list of topics to be deleted   * It invokes onPartitionDeletion for all partitions of a topic.   * The updateMetadataRequest is also going to set the leader for the topics being deleted to   * {
@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be * removed from their caches. */ private def onTopicDeletion(topics: Set[String]) { info("Topic deletion callback for %s".format(topics.mkString(","))) // send update metadata so that brokers stop serving data for topics to be deleted val partitions = topics.flatMap(controllerContext.partitionsForTopic) controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic) topics.foreach { topic => onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet) } } /** * Invoked by onTopicDeletion with the list of partitions for topics to be deleted * It does the following - * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) for partitions that are being * deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas * and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state, * it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1 * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And * will delete all persistent data from all replicas of the respective partitions */ private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) { info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(","))) val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted) startReplicaDeletion(replicasPerPartition) } /** * nvoked by onPartitionDeletion. It is the 2nd step of topic deletion, the first being sending * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion, * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic * is never retried. A topic is removed from the in progress list when * 1. Either the topic is successfully deleted OR * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends * the replicas a StopReplicaRequest (delete=true) * This method does the following things - * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible * for deletion if some replicas are dead since it won't complete successfully anyway * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully *@param replicasForTopicsToBeDeleted */ private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) { replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic => val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic) val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful) val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas // move dead replicas directly to failed state replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible) // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica) debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(","))) controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted, new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) => eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build) if (deadReplicasForTopic.nonEmpty) { debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic)) markTopicIneligibleForDeletion(Set(topic)) } } }

TopicDeletionManager将需要删除的replica通过rpc发送到各个broker,这里TopicDeletionManager更像一个元数据删除管理者,因为实际删除数据是broker干的事情,毕竟数据是分布式的。接着参考类ReplicaManager,在接收到stopReplica的rpc后,ReplicaManager负责删除本地的数据,参考方法stopReplica(),最后通过LogManager删除相关文件夹。这部分调用链比较长就不贴代码了。

在完成topic删除后,TopicDeletionManager再将topic的元数据删除,参考方法completeDeleteTopic(),删除内存中该topic的相关数据,删除topic在zookeeper上的数据,包括3个地方/brokers/ids/topics/,/config/topics/,/admin/delete_topics。

private def completeDeleteTopic(topic: String) {    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener    // firing before the new topic listener when a deleted topic gets auto created    controller.deregisterPartitionModificationsListener(topic)    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)    // controller will remove this replica from the state machine as well as its partition assignment cache    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)    // move respective partition to OfflinePartition and NonExistentPartition state    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)    topicsToBeDeleted -= topic    partitionsToBeDeleted.retain(_.topic != topic)    val zkUtils = controllerContext.zkUtils    zkUtils.zkClient.deleteRecursive(getTopicPath(topic))    zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))    zkUtils.zkClient.delete(getDeleteTopicPath(topic))    controllerContext.removeTopic(topic)  }

总结

Kafka依托于zookeeper管理其自身的元数据,并由自己的controller管理所有元数据,客户端通过修改zookeeper节点数据来触发kafka的事件,从而完成相关操作。对于删除topic而言,kafka通过partition和replica的状态机和事件的机制来实现在复杂环境下对topic的删除。

参考

Kafka 0.11.0.2 源码

 

转载于:https://www.cnblogs.com/ulysses-you/p/10232746.html

你可能感兴趣的文章
CentOS mini 6.5 安装DB2 Express-C 问题处理记录
查看>>
DirectByteBuffer
查看>>
Docker Compose文件详解 V2
查看>>
Memcached的原理与应用(未完)
查看>>
基于 Confluence 6 数据中心的 SAML 单点登录设置你的身份提供者
查看>>
mysql总结
查看>>
Navicat for MySQL版本更新至v11.2.12,修复多项问题|附下载
查看>>
整理 JAVA中的IO流 (字符流和字节流两个大类)
查看>>
uefi与win8 (根据网络资料整理)
查看>>
Eclipse优化
查看>>
Log4j tutorial with Tomcat examples
查看>>
Kong 网关
查看>>
三层结构视频中的DBHelper.cs
查看>>
[转载] 信息系统项目管理师视频教程——18 项目沟通管理
查看>>
在Windows下建立QT开发环境
查看>>
Jedis、JedisPool、ShardedJedis和ShardedJedisPool,java对redis的基本操作
查看>>
[转载] 致命伴侣
查看>>
HTML5 localStorage本地存储实际应用举例
查看>>
Scala访问修饰符
查看>>
实习感悟
查看>>