当前位置: 首页 > 知识库问答 >
问题:

在Flink kafka问题中如何利用字符串方法保持顺序将数据流推送到Kafka主题

杨乐意
2023-03-14

我正在尝试每500毫秒创建一个JSON数据集,并希望将其推送到Kafka,以便在下游设置一些窗口并执行计算。下面是我的代码:

package KafkaAsSource

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper


import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}

object PushingDataToKafka {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setMaxParallelism(256)
    env.enableCheckpointing(5000)
    val stream: DataStream[String] = env.fromElements(createData())

    stream.addSink(sendToTopic(stream))
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    return properties
  }

  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 1000
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(jsonData)
      Thread.sleep(500)
    }
    return jsonData
  }

  def sendToTopic(): Properties = {
    val producer = new FlinkKafkaProducer[String](
      "topic"
      ,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
      ,
      getProperties(),
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )
    return producer
  }
}

它给了我以下错误:

type mismatch;
 found   : Any
 required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
    stream.addSink(sendToTopic())

修改代码:

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setMaxParallelism(256)
    var stream = env.fromElements("")
    //env.enableCheckpointing(5000)
    //val stream: DataStream[String] = env.fromElements("hey mc", "1")

    val myProducer = new FlinkKafkaProducer[String](
      "maddy", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(a)
      Thread.sleep(500)
      stream = env.fromElements(jsonData)
      println(jsonData)
      stream.addSink(myProducer)
    }

    env.execute("hey")
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    return properties
  }
  /*
  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      Thread.sleep(500)
    }
    return jsonData
  }
  */

}

< code>Modified Code提供了Kafka主题中的数据,但它不保留顺序。我在循环中做错了什么?此外,必须将Flink的版本从< code>1.13.5更改为< code>1.12.2。

我最初使用的是< code > Flink < code > 1 . 13 . 5 、< code >连接器和< code>2.11的< code>Scala。我到底错过了什么?

共有1个答案

公西星海
2023-03-14

关于这个循环的几件事:

for (a <- minRange to maxRange) {
    jsonData = 
      "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\""
      + DateTimeFormatter
        .ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
        .format(LocalDateTime.now) + "\"\n  \n}"
    println(a)
    Thread.sleep(500)
    stream = env.fromElements(jsonData)
    println(jsonData)
    stream.addSink(myProducer)
}

>

  • 睡眠发生在Flink客户端,仅影响客户端在将作业图提交到集群之前组装作业图所需的时间。它对作业的运行方式没有影响。

    这个循环创建了10个独立的管道,这些管道将独立并行运行,所有管道都产生相同的Kafka主题。这些管道将相互竞争。

    若要获取所需的行为(跨单个管道的全局排序),您需要从单个源生成所有事件(当然,按顺序),并以 1 的并行度运行作业。像这样的东西会这样做:

    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    
    object FlinkTest {
    
      def main(ars: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.setParallelism(1)
    
        val myProducer = ...
        val jsonData = (i: Long) => ...
    
        env.fromSequence(0, 9)
          .map(i => jsonData(i))
          .addSink(myProducer)
    
          env.execute()
      }
    }
    

    您可以将 maxParallelism 保留为 256(或其默认值 128);它在这里不是特别相关。maxParallelism 是 keyBy 将把密钥散列到的哈希桶的数量,它定义了作业可伸缩性的上限。

  •  类似资料:
    • 我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr

    • 假设我使用kafka streams(kafka-streams-scala库,版本2.2.0)。 我想出了一种可能的方法:创建流和可变映射,然后使用来跟踪每个键的N个最近值。 我的问题是是否有更好的方法来实现这一点--或者使用streams API,或者至少不使用可变映射。

    • 我在Kafka中配置了3个代理运行在不同的端口上。我用的是春云流Kafka 我正在创建一个获得连续数据流的数据管道。我在kafka topic中存储3个代理运行的数据流。到目前为止没有问题。我担心的是假设3个经纪人倒下了5分钟,然后在那个时候我无法获得关于kafka主题的数据。将会有5分钟的数据丢失。从Spring开机我会得到警告 有没有一种方法可以在所有代理都停机时临时存储数据,并在代理再次启动

    • 我使用的debezium带有一个重路由选项,它将所有表的更改发送到仅一个kafka主题。有了这样的配置,我确信我可以从spark中读到独特的Kafka主题。 但我的问题是:如果我使用debezium而不使用重路由选项,并且我在不同的Kafka主题中对每个表进行了更改,我如何保证我以正确的顺序阅读了所有主题的事件? 我知道我可以使用Spark来订购它,例如通过时间戳,但如果说,一个kafka主题离线

    • 我有一个问题,我做了一个apache kafka消费者在Spring Boot消费3个不同的主题。但是我需要先使用来自第一个主题的所有数据,然后使用来自以下主题的数据,有什么方法可以做到这一点吗?还是你总是用同样的方式读它们?

    • 我有一根绳子: 给定另一个字符串 我想做的是替换<code> bb_seq的顺序,在中添加一个字符,得到: <代码>的总长度?保证与< code>bb_seq相同。 我如何用R实现这一点? 我尝试了这个,但失败了: 我对非正则表达式解决方案持开放态度。