当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Streaming 2.3.1中将每条记录写入多个kafka主题?

云季同
2023-03-14

如何在Spark Streaming 2.3.1中把每条记录写到多个kafka主题?换句话说,我得到了5条记录和两个输出kafka主题,我希望这两个输出主题中都有5条记录。

这里的问题不谈论结构化流媒体案例。我正在寻找特定的结构化流媒体。

共有1个答案

易品
2023-03-14

不确定您使用的是java还是scala。下面是为两个不同主题生成消息的代码。你得打电话

dataset.foreachPartition(partionsrows => {
      val props = new util.HashMap[String, Object]()
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer)
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringSerializer")
      val producer = new KafkaProducer[String, String](props)
      partionsrows.foreach(row => {
        val offerId = row.get(0).toString.replace("[", "").replace("]", "")
        val message1 = new ProducerRecord[String, String]("topic1", "message")
        producer.send(message1)
        val message2 = new ProducerRecord[String, String]("topic2",  "message")
        producer.send(message2)
      })
    })
 类似资料:
  • 问题内容: 在多表查询中,每条记录只能获得一行? 我有这三个表: 苹果 疲劳风险管理系统 FARM_APPLES 使用此表,我需要以下结果: 非常感谢您的任何帮助,在此先感谢您。 编辑 感谢OMG Ponies和Bill,我将尽力尝试您的两种解决方案,这是最后一件事,它有可能获得以下结果: 问题答案: Firebird 2.0支持CASE表达式,因此您可以使用:

  • 事实上,直到现在我还没有成功。您能帮助我,请提供我的详细代码示例,以实现使用Kafka流DSL?

  • 给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。

  • 我的计划是 使用多线程步骤,以便每个线程读取一条记录-在处理器中生成多条记录-将生成的记录写入单独的excel文件。 使用同步读取器从进程表中读取。 在处理器中,使用读取器中返回的记录查询DB(涉及多个联接)并形成一个复合对象。 用自定义编写器将复合对象写入文件 就内存管理而言,上面的方法听起来不太好。 因为要写入的记录是在处理器中生成的(而不是从读取器那里获得的,读取器只是给出记录ID),所以只

  • 问题内容: 我想获得表中每个记录的最小日期,该记录具有一个主键的多个日期条目。看看我的桌子: 我想要这样的结果: 我想获取每个CaseNo的最短日期记录在我的桌子上。 我尝试了这段代码: 结果是这样的: 该代码删除没有最小日期的行。我想显示所有记录的最小日期为Min_date。 问题答案: 试试这个

  • 我一直在关注Kafka消费类课程。我可以将主题作为列表对象传递。我指的是下面的文章https://docs.confluent.io/current/clients/java.html,但我需要知道,一旦消费者类订阅了主题,我如何知道哪个主题中有记录。有没有办法找到答案?代码如下: