当前位置: 首页 > 知识库问答 >
问题:

火花流加入Kafka主题比较

羊慈
2023-03-14

我们需要在Kafka主题上实现连接,同时考虑延迟数据或“不在连接中”,这意味着流中延迟或不在连接中的数据不会被丢弃/丢失,但会被标记为超时,

连接的结果被产生以输出Kafka主题(如果发生超时字段)。

(独立部署中的火花2.1.1,Kafka 10)

Kafka在主题:X,Y,...输出主题结果将如下所示:

{
    "keyJoinFiled": 123456,
    "xTopicData": {},
    "yTopicData": {},
    "isTimeOutFlag": true
}

我发现三个解决方案写在这里,1和2从火花流官方留档,但与我们不相关(数据不在加入Dtsream,到达商业时间晚,被丢弃/丢失),但我写了它们进行比较。

从我们看到的情况来看,Kafka连接主题的示例并不多,带有有状态操作,请在此处添加一些代码以供查看:

1) 根据火花流文档,

https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html:   
 val stream1: DStream[String, String] = 
 val stream2: DStream[String, String] = 
 val joinedStream = stream1.join(stream2)

这将连接来自两个流批处理持续时间的数据,但是“业务时间”晚到达/不在连接中的数据将被丢弃/丢失。

2) 窗口连接:

val leftWindowDF = kafkaStreamLeft.window(Minutes(input_parameter_time))
val rightWindowDF = kafkaStreamRight.window(Minutes(input_parameter_time))
leftWindowDF.join(rightWindowDF).foreachRDD...

2.2)需要在内存/磁盘中保存大量数据,例如,30-60分钟窗口2.3)并且再次数据延迟到达/不在窗口中/不在连接中被丢弃/丢失。*由于支持火花2.3.1结构化流到流连接,但是我们遇到了不清理HDFS状态存储的错误,因此,在OOM上每隔几个小时就会有一个作业下降,在2.4、https://issues.apache.org/jira/browse/SPARK-23682中得到解决(使用Rocksdb,或客户状态存储)。

3)使用有状态的操作映射与状态加入Kafka主题与翻滚窗口和30分钟超时延迟数据的D流,输出主题的所有数据包含所有主题的加入消息,如果加入发生或部分主题数据,如果30分钟内没有加入(标记is_time_out标志)

3.1)为每个主题创建1..n个数据流,转换为键值/联合记录,将连接字段作为键并滚动窗口。创建一网打尽的方案。3.2)联合所有流3.3)使用函数在联合流mapWithState上运行-将实际执行连接/标记超时。

来自数据库的有状态连接的好例子(Spark 2.2.0):https://www.youtube.com/watch?time_continue=1858

添加正在运行/测试的示例代码。

 val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "session.timeout.ms" -> "30000"
  )

  //Kafka xTopic DStream
  val kafkaStreamLeft = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](leftTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:xTopic = gson.fromJson(record.value(),classOf[xTopic])
    Unioned(Some(msg),None,if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(leftWindow),Minutes(leftWindow))

  //Kafka yTopic DStream
  val kafkaStreamRight = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](rightTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:yTopic = gson.fromJson(record.value(),classOf[yTopic])
    Unioned(None,Some(msg),if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(rightWindow),Minutes(rightWindow))

  //convert stream to key, value pair and filter empty session id.
  val unionStream = kafkaStreamLeft.union(kafkaStreamRight).map(record =>(record.sessionId,record))
    .filter(record => !record._1.toString.isEmpty)
  val stateSpec = StateSpec.function(stateUpdateF).timeout(Minutes(timeout.toInt))

  unionStream.mapWithState(stateSpec).foreachRDD(rdd => {
    try{
      if(!rdd.isEmpty()) rdd.foreachPartition(partition =>{
        val props = new util.HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        //send to kafka result JSON.
        partition.foreach(record => {
          if(record!=null && !"".equals(record) && !"()".equals(record.toString) && !"None".equals(record.toString) ){
            producer.send(new ProducerRecord[String, String](outTopic, null, gson.toJson(record)))
          }
        })
        producer.close()
      })
    }catch {
      case e: Exception  => {
        logger.error(s""""error join topics :${leftTopic} ${rightTopic} to out topic ${outTopic}""")
        logger.info(e.printStackTrace())
      }
    }})

//mapWithState function that will be called on each key occurrence with new items in newItemValues and state items if exits.

def stateUpdateF = (keySessionId:String,newItemValues:Option[Unioned],state:State[Unioned])=> {
    val currentState = state.getOption().getOrElse(Unioned(None,None,keySessionId))

    val newVal:Unioned = newItemValues match {
      case Some(newItemValue) => {
        if (newItemValue.yTopic.isDefined)
          Unioned(if(newItemValue.xTopic.isDefined) newItemValue.xTopic else currentState.xTopic,newItemValue.yTopic,keySessionId)
        else if (newItemValue.xTopic.isDefined)
          Unioned(newItemValue.xTopic, if(currentState.yTopic.isDefined)currentState.yTopic else newItemValue.yTopic,keySessionId)
        else newItemValue
      }
      case _ => currentState //if None = timeout => currentState
    }

    val processTs = LocalDateTime.now()
    val processDate = dtf.format(processTs)
    if(newVal.xTopic.isDefined && newVal.yTopic.isDefined){//if we have a join remove from state
      state.remove()
      JoinState(newVal.sessionId,newVal.xTopic,newVal.yTopic,false,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else if(state.isTimingOut()){//time out do no try to remove state manually ,it's removed automatically.
        JoinState(newVal.sessionId, newVal.xTopic, newVal.yTopic,true,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else{
      state.update(newVal)
    }
  }

  //case class for kafka topics data.(x,y topics ) join will be on session id filed.
  case class xTopic(sessionId:String,param1:String,param2:String,sessionCreationDate:String)
  case class yTopic(sessionId:Long,clientTimestamp:String)
  //catch all schema : object that contains both kafka input fileds topics and key valiue for join.
  case class Unioned(xTopic:Option[xTopic],yTopic:Option[yTopic],sessionId:String)
  //class for  output result of join stateful function.
  case class JoinState(sessionId:String, xTopic:Option[xTopic],yTopic:Option[yTopic],isTimeOut:Boolean,processTs:Long,processDate:String)

我很乐意接受一些评论。对不起,邮件太长了。

共有1个答案

张嘉佑
2023-03-14

我的印象是这个用例是由会话化API解决的?:

StructuredSessionization.scala

结构化流媒体中的有状态操作

还是我错过了什么?

 类似资料:
  • 我有一些关于Kafka主题分区->spark流媒体资源利用的用例,我想更清楚地说明这些用例。 我使用spark独立模式,所以我只有“执行者总数”和“执行者内存”的设置。据我所知并根据文档,将并行性引入Spark streaming的方法是使用分区的Kafka主题->RDD将具有与Kafka相同数量的分区,当我使用spark-kafka直接流集成时。 因此,如果我在主题中有一个分区和一个执行器核心,

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 我已经在Ubuntu上设置了Kafka和Spark。我正在尝试阅读Kafka的主题通过火花流使用pyspark(Jupyter笔记本)。Spark既没有读取数据,也没有抛出任何错误。 null Kafka生产者:bin/kafka-console-producer.sh--broker-list localhost:9092--topic new_topic Kafka使用者:bin/kafka-

  • 为什么以及何时会选择将Spark流媒体与Kafka结合使用? 假设我有一个系统通过Kafka每秒接收数千条消息。我需要对这些消息应用一些实时分析,并将结果存储在数据库中。 我有两个选择: > < li> 创建我自己的worker,该worker从Kafka读取消息,运行分析算法并将结果存储在DB中。在Docker时代,只需使用scale命令就可以轻松地在我的整个集群中扩展这个工作线程。我只需要确保

  • 我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接

  • 当前设置:Spark流作业处理timeseries数据的Kafka主题。大约每秒就有不同传感器的新数据进来。另外,批处理间隔为1秒。通过,有状态数据被计算为一个新流。一旦这个有状态的数据穿过一个treshold,就会生成一个关于Kafka主题的事件。当该值后来降至treshhold以下时,再次触发该主题的事件。 问题:我该如何避免这种情况?最好不要切换框架。在我看来,我正在寻找一个真正的流式(一个