网站首页 > 开源技术 正文
来源:大数据技术与架构作者:王知无
大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!
暴走大数据
点击右侧关注,暴走大数据!
By 大数据技术与架构
场景描述:Kafka配合Spark Streaming是大数据领域常见的黄金搭档之一,主要是用于数据实时入库或分析。为了应对可能出现的引起Streaming程序崩溃的异常情况,我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。只有管理好offset,才能使整个流式系统最大限度地接近exactly once语义。
关键词:offset Spark Streaming
Kafka+Spark Streaming主要用于实时流处理。到目前为止,在大数据领域中是一种非常常见的架构。Kafka在其中主要起着一个缓冲的作用,所有的实时数据都会经过kafka。所以对kafka offset的管理是其中至关重要的一环。
我们一般都需要手动管理好Kafka的offset,而不是让它自动提交,即需要将enable.auto.commit设为false。
一但管理不善,就会到导致数据丢失或重复消费。
offset的管理方式
一个简单的流程如下:
- 在Kafka DirectStream初始化时,取得当前所有partition的存量offset,以让DirectStream能够从正确的位置开始读取数据。
- 读取消息数据,处理并存储结果。
- 提交offset,并将其持久化在可靠的外部存储中。
- 图中的“process and store results”及“commit offsets”两项,都可以施加更强的限制,比如存储结果时保证幂等性,或者提交offset时采用原子操作。
保存offset的方式
Checkpoint:
Spark Streaming的checkpoints是最基本的存储状态信息的方式,一般是保存在HDFS中。但是最大的问题是如果streaming程序升级的话,checkpoints的数据无法使用,所以几乎没人使用。
offset的三种管理方式:
自动提交offset:
- enable.auto.commit=true。
- 一但consumer挂掉,就会导致数据丢失或重复消费。
- offset不可控。
Kafka自身的offset管理:
- (属于At-least-once语义,如果做好了幂等性,可以使用这种方式):
- 在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。
- Spark Streaming也专门提供了commitAsync() API用于提交offset。
- 需要将参数修改为enable.auto.commit=false。
- 在我实际测试中发现,这种offset的管理方式,不会丢失数据,但会出现重复消费。
- 停掉streaming应用程序再次启动后,会再次消费停掉前最后的一个批次数据,应该是由于offset是异步提交的方式导致,offset更新不及时引起的。
- 因此需要做好数据的幂等性。
- (修改源码将异步改为同步,应该是可以做到Exactly-once语义的)
自定义offset:
- (推荐,采用这种方式,可以做到At-least-once语义):
- 可以将offset存放在第三方储中,包括RDBMS、Redis、ZK、ES等。
- 若消费数据存储在带事务的组件上,则强烈推荐将offset存储在一起,借助事务实现 Exactly-once 语义。
示例
Kafka自身管理offset:
在Kafka 0.10+版本中,offset的默认存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。所以我们读写offset的对象正是这个topic,Spark Streaming也专门提供了commitAsync() API用于提交offset。实际上,一切都已经封装好了,直接调用相关API即可。
stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 确保结果都已经正确且幂等地输出了 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }
ZooKeeper
在Spark Streaming连接Kafka应用中使用Zookeeper来存储offsets也是一种比较可靠的方式。
在这个方案中,Spark Streaming任务在启动时会去Zookeeper中读取每个分区的offsets。如果有新的分区出现,那么他的offset将会设置在最开始的位置。在每批数据处理完之后,用户需要可以选择存储已处理数据的一个offset或者最后一个offset。此外,新消费者将使用跟旧的Kafka 消费者API一样的格式将offset保存在ZooKeeper中。因此,任何追踪或监控Zookeeper中Kafka Offset的工具仍然生效的。
一个典型的工具类:
class ZkKafkaOffsetManager(zkUrl: String) { private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager]) private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000); private val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false) def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = { val offsets = mutable.HashMap.empty[TopicPartition, Long] val partitionsForTopics = zkUtils.getPartitionsForTopics(topics) // /consumers/<groupId>/offsets/<topic>/<partition> partitionsForTopics.foreach(partitions => { val topic = partitions._1 val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic) partitions._2.foreach(partition => { val path = groupTopicDirs.consumerOffsetDir + "/" + partition try { val data = zkUtils.readData(path) if (data != null) { offsets.put(new TopicPartition(topic, partition), data._1.toLong) logger.info( "Read offset - topic={}, partition={}, offset={}, path={}", Seq[AnyRef](topic, partition.toString, data._1, path) ) } } catch { case ex: Exception => offsets.put(new TopicPartition(topic, partition), 0L) logger.info( "Read offset - not exist: {}, topic={}, partition={}, path={}", Seq[AnyRef](ex.getMessage, topic, partition.toString, path) ) } }) }) offsets.toMap } def saveOffsets(offsetRanges: Seq[OffsetRange], groupId: String): Unit = { offsetRanges.foreach(range => { val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic) val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition zkUtils.updatePersistentPath(path, range.untilOffset.toString) logger.info( "Save offset - topic={}, partition={}, offset={}, path={}", Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path) ) }) } }
这样,offset就会被存储在ZK的/consumers/[groupId]/offsets/[topic]/[partition]路径下。当初始化DirectStream时,调用readOffsets()方法获得offset。当数据处理完成后,调用saveOffsets()方法来更新ZK中的值。
其他介质
Hbase、Redis甚至Mysql也经常被用作进行offset的存储。方式和上面类似,代码可以去网上搜一搜。
需要注意的点
特别需要注意,在转换过程中不能破坏RDD分区与Kafka分区之间的映射关系。亦即像map()/mapPartitions()这样的算子是安全的,而会引起shuffle或者repartition的算子,如reduceByKey()/join()/coalesce()等等都是不安全的。
对Dstream进行窗口操作后就不能手动提交offset。因为保存offset需要HasOffsetRanges这个类。而HasOffsetRanges是KafkaRDD的一个trait,而CanCommitOffsets是DirectKafkaInputDStream的一个trait。
Scala Trait(特征) 相当于 Java 的接口,实际上它比接口还功能强大。
与接口不同的是,它还可以定义属性和方法的实现。
如下:private[spark] class KafkaRDD[K, V]( sc: SparkContext, val kafkaParams: ju.Map[String, Object], val offsetRanges: Array[OffsetRange], val preferredHosts: ju.Map[TopicPartition, String], useConsumerCache: Boolean ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges private[spark] class DirectKafkaInputDStream[K, V]( _ssc: StreamingContext, locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V], ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
不能对stream对象做transformation操作之后的结果进行强制转换(会直接报ClassCastException),因为RDD与DStream的类型都改变了。只有RDD或DStream的包含类型为ConsumerRecord才行。
欢迎点赞+收藏+转发朋友圈素质三连
文章不错?点个【在看】吧!
猜你喜欢
- 2024-10-19 Seata 中Resource Manager (RM) 本地事务管理
- 2024-10-19 kill-9导致Kakfa重启失败,说多了都是泪
- 2024-10-19 快速掌握Kafka系列《三》配置项总结
- 2024-10-19 Flink 参数配置和常见参数调优(flink配置详解)
- 2024-10-19 基于 Flink 实现的商品实时推荐系统(附源码)
- 2024-10-19 Kafka+Spark Streaming管理offset的两种方法
- 2024-10-19 0500-使用Python2访问Kerberos环境下的Kafka
- 2024-10-19 Kafka大厂高频面试题:在保证高性能、高吞吐的同时保证高可用性
- 2024-10-19 清华同方大数据岗位面试题(清华同方数据库官网)
- 2024-10-19 # Kafka_深入探秘者(10):kafka 监控--2
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)