当前位置: 首页 > 知识库问答 >
问题:

如何将列表中的数据发送到kafka作为记录

秋和雅
2023-03-14

我是新来的,我有一个这样的名单

newlist = List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])

我使用下面的代码将列表中的数据发送给Kafka

      val today = Calendar.getInstance.getTime
      val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val key = UUID.randomUUID().toString().split("-")(0)

      val value = formatter.format(today) + "," + newList

      val data = new ProducerRecord[String, String](topic, key, value)

      println(data.value())
      producer.send(data)

我的输出如下:

2020-05-12 14:56:41,List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])
2020-05-12 14:56:42,List([Date,Open,High,Low,Close,Volume,Name], [2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM], [2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM], [2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM], [2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM], [2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM], [2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM])

但我希望我的输出是这样的(列表中的每个值都是行):

2020-05-12 14:56:41,Date,Open,High,Low,Close,Volume,Name
2020-05-12 14:56:42,2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2020-05-12 14:56:43,2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM

我们怎么能那样做?请忽略我的错误

共有1个答案

司寇经亘
2023-03-14

ProducerRecord是单个数据的一个实例,如果您想发送数据列表,可以多次调用“发送”方法

newlist.map(row => 
    producer.send(new ProducerRecord[String, String](topic, key, row.mkString))
 类似资料:
  • 我正在使用Python语言。我有csv文件,我需要转换成json并发送到kafka,然后发送到ElasticSearch。 我能够将Csv转换为Json并发送给Kafka消费者。如何从Kafka Consumer向ElasticSearch获取数据

  • 我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如

  • 如何使用Kotlin声明并调用以函数列表为参数的函数。我在单个函数的函数中使用了参数,但是如何在函数列表中使用参数呢? 这个问题展示了如何将一个函数发送给一个函数:Kotlin:如何将一个函数作为参数传递给另一个函数?对于一系列函数,最好的方法是什么?

  • 本文向大家介绍Java将CSV的数据发送到kafka的示例,包括了Java将CSV的数据发送到kafka的示例的使用技巧和注意事项,需要的朋友参考一下 为什么将CSV的数据发到kafka flink做流式计算时,选用kafka消息作为数据源是常用手段,因此在学习和开发flink过程中,也会将数据集文件中的记录发送到kafka,来模拟不间断数据; 整个流程如下: 您可能会觉得这样做多此一举:flin

  • 我如何发送表单数据作为‘文本’通过放心,请参阅截图。当我使用request.multipart(“key”.“value”)时,请求是作为文件发送的(参考屏幕截图)。蒂娅。

  • 如何执行上述动作? 我读过类似的文章 我将其映射到login servlet `