我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时,
连接的结果被产生以输出Kafka主题(如果发生超时字段)。
(独立部署中的火花2.1.1,Kafka 10)
Kafka在主题:X,Y,...输出主题结果将如下所示:
{
"keyJoinFiled": 123456,
"xTopicData": {},
"yTopicData": {},
"isTimeOutFlag": true
}
我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsream,到达商业时间晚,被丢弃/丢失),但我写了它们进行比较。
从我们看到的情况来看,Kafka连接主题的示例并不多,带有有状态操作,请在此处添加一些代码以供查看:
1) 根据火花流文档,
https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html:
val stream1: DStream[String, String] =
val stream2: DStream[String, String] =
val joinedStream = stream1.join(stream2)
这将连接来自两个流批处理持续时间的数据,但是“业务时间”晚到达/不在连接中的数据将被丢弃/丢失。
2) 窗口连接:
val leftWindowDF = kafkaStreamLeft.window(Minutes(input_parameter_time))
val rightWindowDF = kafkaStreamRight.window(Minutes(input_parameter_time))
leftWindowDF.join(rightWindowDF).foreachRDD...
2.2)需要在内存/磁盘中保存大量数据,例如,30-60分钟窗口2.3)并且再次数据延迟到达/不在窗口中/不在连接中被丢弃/丢失。*由于支持火花2.3.1结构化流到流连接,但是我们遇到了不清理HDFS状态存储的错误,因此,在OOM上每隔几个小时就会有一个作业下降,在2.4、https://issues.apache.org/jira/browse/SPARK-23682中得到解决(使用Rocksdb,或客户状态存储)。
3)使用有状态的操作映射与状态加入Kafka主题与翻滚窗口和30分钟超时延迟数据的D流,输出主题的所有数据包含所有主题的加入消息,如果加入发生或部分主题数据,如果30分钟内没有加入(标记is_time_out标志)
3.1)为每个主题创建1..n个数据流,转换为键值/联合记录,将连接字段作为键并滚动窗口。创建一网打尽的方案。3.2)联合所有流3.3)使用函数在联合流mapWithState上运行-将实际执行连接/标记超时。
来自数据库的有状态连接的好例子(Spark 2.2.0):https://www.youtube.com/watch?time_continue=1858
添加正在运行/测试的示例代码。
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"session.timeout.ms" -> "30000"
)
//Kafka xTopic DStream
val kafkaStreamLeft = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](leftTopic.split(",").toSet, kafkaParams)
).map(record => {
val msg:xTopic = gson.fromJson(record.value(),classOf[xTopic])
Unioned(Some(msg),None,if (msg.sessionId!= null) msg.sessionId.toString else "")
}).window(Minutes(leftWindow),Minutes(leftWindow))
//Kafka yTopic DStream
val kafkaStreamRight = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](rightTopic.split(",").toSet, kafkaParams)
).map(record => {
val msg:yTopic = gson.fromJson(record.value(),classOf[yTopic])
Unioned(None,Some(msg),if (msg.sessionId!= null) msg.sessionId.toString else "")
}).window(Minutes(rightWindow),Minutes(rightWindow))
//convert stream to key, value pair and filter empty session id.
val unionStream = kafkaStreamLeft.union(kafkaStreamRight).map(record =>(record.sessionId,record))
.filter(record => !record._1.toString.isEmpty)
val stateSpec = StateSpec.function(stateUpdateF).timeout(Minutes(timeout.toInt))
unionStream.mapWithState(stateSpec).foreachRDD(rdd => {
try{
if(!rdd.isEmpty()) rdd.foreachPartition(partition =>{
val props = new util.HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
//send to kafka result JSON.
partition.foreach(record => {
if(record!=null && !"".equals(record) && !"()".equals(record.toString) && !"None".equals(record.toString) ){
producer.send(new ProducerRecord[String, String](outTopic, null, gson.toJson(record)))
}
})
producer.close()
})
}catch {
case e: Exception => {
logger.error(s""""error join topics :${leftTopic} ${rightTopic} to out topic ${outTopic}""")
logger.info(e.printStackTrace())
}
}})
//mapWithState function that will be called on each key occurrence with new items in newItemValues and state items if exits.
def stateUpdateF = (keySessionId:String,newItemValues:Option[Unioned],state:State[Unioned])=> {
val currentState = state.getOption().getOrElse(Unioned(None,None,keySessionId))
val newVal:Unioned = newItemValues match {
case Some(newItemValue) => {
if (newItemValue.yTopic.isDefined)
Unioned(if(newItemValue.xTopic.isDefined) newItemValue.xTopic else currentState.xTopic,newItemValue.yTopic,keySessionId)
else if (newItemValue.xTopic.isDefined)
Unioned(newItemValue.xTopic, if(currentState.yTopic.isDefined)currentState.yTopic else newItemValue.yTopic,keySessionId)
else newItemValue
}
case _ => currentState //if None = timeout => currentState
}
val processTs = LocalDateTime.now()
val processDate = dtf.format(processTs)
if(newVal.xTopic.isDefined && newVal.yTopic.isDefined){//if we have a join remove from state
state.remove()
JoinState(newVal.sessionId,newVal.xTopic,newVal.yTopic,false,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
}else if(state.isTimingOut()){//time out do no try to remove state manually ,it's removed automatically.
JoinState(newVal.sessionId, newVal.xTopic, newVal.yTopic,true,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
}else{
state.update(newVal)
}
}
//case class for kafka topics data.(x,y topics ) join will be on session id filed.
case class xTopic(sessionId:String,param1:String,param2:String,sessionCreationDate:String)
case class yTopic(sessionId:Long,clientTimestamp:String)
//catch all schema : object that contains both kafka input fileds topics and key valiue for join.
case class Unioned(xTopic:Option[xTopic],yTopic:Option[yTopic],sessionId:String)
//class for output result of join stateful function.
case class JoinState(sessionId:String, xTopic:Option[xTopic],yTopic:Option[yTopic],isTimeOut:Boolean,processTs:Long,processDate:String)
我很乐意接受一些评论。对不起,邮件太长了。
我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,
它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka
我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-
为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保
我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接
当前设置:Spark流作业处理timeseries数据的Kafka主题。大约每秒就有不同传感器的新数据进来。另外,批处理间隔为1秒。通过,有状态数据被计算为一个新流。一旦这个有状态的数据穿过一个treshold,就会生成一个关于Kafka主题的事件。当该值后来降至treshhold以下时,再次触发该主题的事件。 问题:我该如何避免这种情况?最好不要切换框架。在我看来,我正在寻找一个真正的流式(一个