当前位置: 首页 > 知识库问答 >
问题:

将输入主题的每个传入记录与其单个precede记录进行比较

舒枫涟
2023-03-14

事实上,直到现在我还没有成功。您能帮助我,请提供我的详细代码示例,以实现使用Kafka流DSL?

共有1个答案

山阳辉
2023-03-14

您可以使用处理器API。

您必须实现Transformer接口方法转换:

>

  • 将查找键值,

    object SampleApp extends App {
    
      val storeName: String = ???
      val builder: StreamsBuilder = new StreamsBuilder()
      builder.stream("topicName")(Consumed.`with`(Serdes.String(), Serdes.String()))
        .transform[String, String](() => SampleTransformer(storeName))
        .to("outputTopic")(Produced.`with`(Serdes.String(), Serdes.String()))
    }
    
    
    case class SampleTransformer(storeName: String)
      extends Transformer[String, String, KeyValue[String, String]]
        with LazyLogging {
    
      var store: KeyValueStore[String, String] = _
    
      override def init(context: ProcessorContext): Unit = {
        super.init(context)
        store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, String]]
      }
    
      override def transform(key: String, newValue: String): String = {
        val valueToPass = Option(store.get(key)).map(oldValue => someComputation(oldValue, newValue))
        store.put(key, newValue)
        valueToPass.orNull
      }
    
      def someComputation(oldValue: String, newValue: String): String = ???
    
      override def close(): Unit = {
        // Close
      }
    }
    

  •  类似资料:
    • 如何在Spark Streaming 2.3.1中把每条记录写到多个kafka主题?换句话说,我得到了5条记录和两个输出kafka主题,我希望这两个输出主题中都有5条记录。 这里的问题不谈论结构化流媒体案例。我正在寻找特定的结构化流媒体。

    • 有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。

    • ,日志记录将进入一个文件; (路径)/service_name/service_name.log 我想用logback复制这种行为,但在logback.xml配置中获取“logger”名称时遇到了真正的困难。它可以在log encoder.pattern中看到,即“%d%-5level%logger{35}-%msg%n”。

    • 我是Hadoop的新手,我的map-reduce代码可以工作,但它不产生任何输出。这里是Map-Reduce的信息: 下面是启动mapreduce作业的代码: 如果有任何帮助,我们将不胜感激。谢谢你

    • 更新:任何人都知道如何强迫另一个流到麦克风音频源。这需要原生android代码。请在这方面帮助我,请参考这个问题,以获得更多关于路由音频的详细信息

    • 问题内容: 有没有办法一次插入多个记录而不是一次插入? 我有一个非常丑陋的耙子任务,正在做以下事情… 这必须非常低效,并且必须有更好的方法… 问题答案: 该方法也将数组作为参数。 但是,它仍然对每个条目执行一个SQL查询,而不是单个SQL查询。它效率更高,因为它只需要在后台创建一个activerecord对象。 如果要同时从同一客户端插入许多行,请使用带有多个VALUES列表的INSERT语句一次