当前位置: 首页 > 知识库问答 >
问题:

如何将Protobuf数据从Flink转发到Kafka和stdout?

淳于兴朝
2023-03-14

我想在这里添加一些代码,并对来自Flink的protobuf数据进行stdout。

我正在使用Flink的Apache Kafka连接器将Flink连接到Kafka。

这是我的Flink密码。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")

这是我的Kafka代码。

  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p
  }

  val builder: StreamsBuilder = new StreamsBuilder
  // TODO: implement here to stdout 

  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()

  sys.ShutdownHookThread {
     streams.close(Duration.ofSeconds(10))
  }

共有1个答案

戚升
2023-03-14

您需要设置StreamsBuilder以从主题中使用

val builder: StreamsBuilder = new StreamsBuilder()
    .stream(topic)
    .print(Printed.toSysOut());
 类似资料:
  • 我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!

  • 我正在使用Python语言。我有csv文件,我需要转换成json并发送到kafka,然后发送到ElasticSearch。 我能够将Csv转换为Json并发送给Kafka消费者。如何从Kafka Consumer向ElasticSearch获取数据

  • 尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中

  • 我试图通过套接字将protobuf从C++应用程序发送到Java应用程序。我正在使用一个简单的套接字在muc++程序发送protobuf。在通过网络发送之前,我已经将其序列化到char缓冲区。在我的Java(服务器)程序中,我使用ServerSocket来接收数据。 我有麻烦反序列化的原Buf在Java那边。它一直给我错误: 解析协议消息时,输入在字段中间意外结束。这可能意味着输入被截断,或者嵌入

  • 我正试图将数据从Kafka传递到火花流。 这就是我到现在所做的: null

  • 问题内容: 我是Rails和Web开发的新手。 我正在Matlab中生成一堆对象,我想将这些对象发送到我的Rails应用程序中的数据库中。谁能建议我该怎么做? 到目前为止,在Rails端,我已经为数据生成了基本的支架。我可以使用“ / myobjects / new”中的表单将对象添加到数据库中。 在Matlab端,我一直在尝试使用HTTP POST请求添加对象,如下所示: 这将失败,并将以下内容