我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看下面是我的代码,我可以在我的代码中改变什么,它可以工作,目前在Kafka上,我正在产生我的消息是与null在关键字,作为值是什么,我需要:
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", serverURL);
consumerProperties.setProperty("group.id", groupID);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
new SimpleStringSchema(), consumerProperties);
kafkaConsumer.setStartFromEarliest();
DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
final int[] tVoteCount = {0};
DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws InterruptedException, IOException {
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
Tcount = Tcount + jsonNode.get(key1).asInt();
int nameCandidate = jsonNode.get(key2).asInt();
System.out.println(Tcount);
String tCountT = Integer.toString(Tcount);
//tVoteCount = tVoteCount + voteCount;
//waitForEventTime(timeStamp);
return tCountT;
}
});
kafkaConsumerStream.print();
System.out.println("sdjknvksjdnv"+Tcount);
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", serverURL);
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
new SimpleStringSchema(), producerProperties);
kafkaProducerStream.addSink(kafkaProducer);
env.execute();
谢了。
在本博客中,您将找到一个如何将key和topic写入主题的示例:
您需要将新flinkkafkaProducer
的创建替换为以下内容:
FlinkKafkaProducer<KafkaRecord> kafkaProducer =
new FlinkKafkaProducer<KafkaRecord>(
producerTopicName,
((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())),
producerProperties
);
前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol
本文向大家介绍在生产者中,何时发生QueueFullException?相关面试题,主要包含被问及在生产者中,何时发生QueueFullException?时的应答技巧和注意事项,需要的朋友参考一下 答:每当Kafka生产者试图以代理的身份在当时无法处理的速度发送消息时,通常都会发生QueueFullException。但是,为了协作处理增加的负载,用户需要添加足够的代理,因为生产者不会阻止。
如果我只在生产者端发送一条记录并等待,生产者何时将记录发送给经纪人?在Kafka文档中,我找到了名为“linger.ms”的配置,它说: 一旦我们得到 根据以上文件,我有两个问题。 > 如果生产者收到的数据达到batch.size,它会立即触发发送一个只包含一个批次的请求给代理?但是正如我们所知,一个请求可以包含许多批次,那么它是如何发生的呢? 这是否意味着即使是收到的数据也不足以批量处理。大小,
大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置
这是关于让生产者知道消息是否已被消费者消费的用例。 我的想法是,每次消费消息时,消费者都会提交偏移量,生产者可以跟踪和读取当前偏移量,以查看是否消费了相应的消息。 此外,请不要犹豫,让我知道这是否是处理用例的正确方法,因为我没有Kafka的经验。我知道Kafka不是为这种方式设计的(处理上述用例),但我必须坚持使用Kafka。
我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那