对应Exactly-once semantics要自己去实现了,大致的实现思路就是在driver启动的时候先从zk上获得consumer offsets信息,createDirectStream有两个重载方法,其中一个可以设置从任意offsets位置开始消费,部分代码如下:
def createDirectStream(implicit streamingConfig: StreamingConfig, kc: KafkaCluster) = {
val extractors = streamingConfig.getExtractors()
//从zookeeper上读取offset开始消费message
val messages = {
val kafkaPartitionsE = kc.getPartitions(streamingConfig.topicSet)
if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
val kafkaPartitions = kafkaPartitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
if (consumerOffsetsE.isLeft) throw new SparkException("get kafka consumer offsets failed:")
val consumerOffsets = consumerOffsetsE.right.get
consumerOffsets.foreach {
case (tp, n) => println("===================================" + tp.topic + "," + tp.partition + "," + n)
}
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))
}
messages
}
这里会有几个问题,就是在一个group是新的consumer group时,即首次消费,zk上海没有相应的group offsets目录,这时要先初始化一下zk上的offsets目录,或者是zk上记录的offsets已经过时,由于kafka有定时清理策略,直接从zk上的offsets开始消费会报ArrayOutofRange异常,即找不到offsets所属的index文件了,针对这两种情况,做了以下处理:
def setOrUpdateOffsets(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
streamingConfig.topicSet.foreach(topic => {
println("current topic:" + topic)
var hasConsumed = true
val kafkaPartitionsE = kc.getPartitions(Set(topic))
if (kafkaPartitionsE.isLeft) throw new SparkException("get kafka partition failed:")
val kafkaPartitions = kafkaPartitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(streamingConfig.group, kafkaPartitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {
//如果有消费过,有两种可能,如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
//针对这种情况,只要判断一下zk上的consumerOffsets和leaderEarliestOffsets的大小,如果consumerOffsets比leaderEarliestOffsets还小的话,说明是过时的offsets,这时把leaderEarliestOffsets更新为consumerOffsets
val leaderEarliestOffsets = kc.getEarliestLeaderOffsets(kafkaPartitions).right.get
println(leaderEarliestOffsets)
val consumerOffsets = consumerOffsetsE.right.get
val flag = consumerOffsets.forall {
case (tp, n) => n < leaderEarliestOffsets(tp).offset
}
if (flag) {
println("consumer group:" + streamingConfig.group + " offsets已经过时,更新为leaderEarliestOffsets")
val offsets = leaderEarliestOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(streamingConfig.group, offsets)
}
else {
println("consumer group:" + streamingConfig.group + " offsets正常,无需更新")
}
}
else {
//如果没有被消费过,则从最新的offset开始消费。
val leaderLatestOffsets = kc.getLatestLeaderOffsets(kafkaPartitions).right.get
println(leaderLatestOffsets)
println("consumer group:" + streamingConfig.group + " 还未消费过,更新为leaderLatestOffsets")
val offsets = leaderLatestOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(streamingConfig.group, offsets)
}
})
}
def updateZKOffsets(rdd: RDD[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
println("rdd not empty,update zk offset")
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(streamingConfig.group, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
def processData(messages: InputDStream[(String, String)])(implicit streamingConfig: StreamingConfig, kc: KafkaCluster): Unit = {
messages.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val datamodelRDD = streamingConfig.relation match {
case "1" =>
val (topic, _) = streamingConfig.topic_table_mapping
val extractor = streamingConfig.getExtractor(topic)
// Create direct kafka stream with brokers and topics
val topicsSet = Set(topic)
val datamodel = rdd.filter(msg => {
extractor.filter(msg)
}).map(msg => extractor.msgToRow(msg))
datamodel
case "2" =>
val (topics, _) = streamingConfig.topic_table_mapping
val extractors = streamingConfig.getExtractors(topics)
val topicsSet = topics.split(",").toSet
//kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
val datamodel = rdd.filter(msg => {
//kafka msg为key-value形式,key用来对msg进行分区用的,为了散列存储消息,采集器那边key采用的是:topic|加一个随机数的形式,例如:rd_e_pal|20,split by |取0可以拿到对应的topic名字,这样union在一起的消息可以区分出来自哪一个topic
val keyValid = msg != null && msg._1 != null && msg._1.split("\\|").length == 2
if (keyValid) {
val topic = msg._1.split("\\|")(0)
val (_, extractor) = extractors.find(p => {
p._1.equalsIgnoreCase(topic)
}).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
//trim去掉末尾的换行符,否则取最后一个字段时会有一个\n
extractor.filter(msg._2.trim)
}
else {
false
}
}).map {
case (key, msgContent) =>
val topic = key.split("\\|")(0)
val (_, extractor) = extractors.find(p => {
p._1.equalsIgnoreCase(topic)
}).getOrElse(throw new RuntimeException("配置文件中没有找到topic:" + topic + " 对应的extractor"))
extractor.msgToRow((key, msgContent))
}
datamodel
}
//先处理消息
processRDD(datamodelRDD)
//再更新offsets
updateZKOffsets(rdd)
}
})
}
def processRDD(rdd: RDD[Row])(implicit streamingConfig: StreamingConfig) = {
if (streamingConfig.targetType == "mongo") {
val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
if (!MongoDBClient.db.collectionExists(target.collection)) {
println("create collection:" + target.collection)
MongoDBClient.db.createCollection(target.collection, MongoDBObject("storageEngine" -> MongoDBObject("wiredTiger" -> MongoDBObject())))
val coll = MongoDBClient.db(target.collection)
//创建ttl index
if (target.ttlIndex) {
val indexs = coll.getIndexInfo
if (indexs.find(p => p.get("name") == "ttlIndex") == None) {
coll.createIndex(MongoDBObject(target.ttlColumn -> 1), MongoDBObject("expireAfterSeconds" -> target.ttlExpire, "name" -> "ttlIndex"))
}
}
}
}
val (_, table) = streamingConfig.topic_table_mapping
val schema = streamingConfig.getTableSchema(table)
// Get the singleton instance of SQLContext
val sqlContext = HIVEContextSingleton.getInstance(rdd.sparkContext)
// Convert RDD[String] to RDD[case class] to DataFrame
val dataFrame = sqlContext.createDataFrame(rdd, schema)
// Register as table
dataFrame.registerTempTable(table)
// Do word count on table using SQL and print it
val results = sqlContext.sql(streamingConfig.sql)
//select dt,hh(vtm) as hr,app_key, collect_set(device_id) as deviceids from rd_e_app_header where dt=20150401 and hh(vtm)='01' group by dt,hh(vtm),app_key limit 100 ;
// results.show()
streamingConfig.targetType match {
case "mongo" => saveToMongo(results)
case "show" => results.show()
}
}
def saveToMongo(df: DataFrame)(implicit streamingConfig: StreamingConfig) = {
val target = streamingConfig.getTarget().asInstanceOf[MongoTarget]
val coll = MongoDBClient.db(target.collection)
val result = df.collect()
if (result.size > 0) {
val bulkWrite = coll.initializeUnorderedBulkOperation
result.foreach(row => {
val id = row(target.pkIndex)
val setFields = target.columns.filter(p => p.op == "set").map(f => (f.name, row(f.index))).toArray
val incFields = target.columns.filter(p => p.op == "inc").map(f => {
(f.name, row(f.index).asInstanceOf[Long])
}).toArray
// obj=obj.++($addToSet(MongoDBObject("test"->MongoDBObject("$each"->Array(3,4)),"test1"->MongoDBObject("$each"->Array(1,2)))))
var obj = MongoDBObject()
var addToSetObj = MongoDBObject()
target.columns.filter(p => p.op == "addToSet").foreach(col => {
col.mType match {
case "Int" =>
addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Int]]))
case "Long" =>
addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[Long]]))
case "String" =>
addToSetObj = addToSetObj.++(col.name -> MongoDBObject("$each" -> row(col.index).asInstanceOf[ArrayBuffer[String]]))
}
})
if (addToSetObj.size > 0) obj = obj.++($addToSet(addToSetObj))
if (incFields.size > 0) obj = obj.++($inc(incFields: _*))
if (setFields.size > 0) obj = obj.++($set(setFields: _*))
bulkWrite.find(MongoDBObject("_id" -> id)).upsert().updateOne(obj)
})
bulkWrite.execute()
}
}
仔细想一想,还是没有实现精确一次的语义,写入mongo和更新ZK由于不是一个事务的,如果更新mongo成功,然后更新ZK失败,则下次启动的时候这个批次的数据就被重复计算,对于UV由于是addToSet去重操作,没什么影响,但是PV是inc操作就会多算这一个批次的的数据,其实如果batch time比较短的话,其实都还是可以接受的。