我想在这里添加一些代码,并对来自Flink的protobuf数据进行stdout。
我正在使用Flink的Apache Kafka连接器将Flink连接到Kafka。
这是我的Flink密码。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")
这是我的Kafka代码。
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
// TODO: implement here to stdout
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
您需要设置StreamsBuilder
以从主题中使用
val builder: StreamsBuilder = new StreamsBuilder()
.stream(topic)
.print(Printed.toSysOut());
我有一个生产者,它正在为一个主题生成protobuf消息。我有一个消费者应用程序,它反序列化protobuf消息。但hdfs接收器连接器直接从Kafka主题接收消息。中的键和值转换器将设置为什么?做这件事最好的方法是什么?提前道谢!
我正在使用Python语言。我有csv文件,我需要转换成json并发送到kafka,然后发送到ElasticSearch。 我能够将Csv转换为Json并发送给Kafka消费者。如何从Kafka Consumer向ElasticSearch获取数据
尝试使用Apache Flink从Cassandra获取数据,引用本文,我可以读取数据,但我不知道如何将其加载到DataStream对象中。代码如下: 我试过了 将变量中的数据加载到数据流中
我试图通过套接字将protobuf从C++应用程序发送到Java应用程序。我正在使用一个简单的套接字在muc++程序发送protobuf。在通过网络发送之前,我已经将其序列化到char缓冲区。在我的Java(服务器)程序中,我使用ServerSocket来接收数据。 我有麻烦反序列化的原Buf在Java那边。它一直给我错误: 解析协议消息时,输入在字段中间意外结束。这可能意味着输入被截断,或者嵌入
我正试图将数据从Kafka传递到火花流。 这就是我到现在所做的: null
问题内容: 我是Rails和Web开发的新手。 我正在Matlab中生成一堆对象,我想将这些对象发送到我的Rails应用程序中的数据库中。谁能建议我该怎么做? 到目前为止,在Rails端,我已经为数据生成了基本的支架。我可以使用“ / myobjects / new”中的表单将对象添加到数据库中。 在Matlab端,我一直在尝试使用HTTP POST请求添加对象,如下所示: 这将失败,并将以下内容