当前位置: 首页 > 知识库问答 >
问题:

在Flink Kafka生产者中发送密钥

董凡
2023-03-14

我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看下面是我的代码,我可以在我的代码中改变什么,它可以工作,目前在Kafka上,我正在产生我的消息是与null在关键字,作为值是什么,我需要:

Properties consumerProperties = new Properties();
    
    consumerProperties.setProperty("bootstrap.servers", serverURL);
    consumerProperties.setProperty("group.id", groupID);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(consumerTopicName,
            new SimpleStringSchema(), consumerProperties);

    kafkaConsumer.setStartFromEarliest();
    DataStream<String> kafkaConsumerStream = env.addSource(kafkaConsumer);
    final int[] tVoteCount = {0};
    
    DataStream<String> kafkaProducerStream = kafkaConsumerStream.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws InterruptedException, IOException {
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            Tcount = Tcount + jsonNode.get(key1).asInt();
            int nameCandidate = jsonNode.get(key2).asInt();
            System.out.println(Tcount);
            String tCountT = Integer.toString(Tcount);
            //tVoteCount = tVoteCount + voteCount;
             
            //waitForEventTime(timeStamp);
            return tCountT;
        }
    });
    kafkaConsumerStream.print();
    System.out.println("sdjknvksjdnv"+Tcount);
    Properties producerProperties = new Properties();
    producerProperties.setProperty("bootstrap.servers", serverURL);
    FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(producerTopicName,
            new SimpleStringSchema(), producerProperties);
    kafkaProducerStream.addSink(kafkaProducer);
    env.execute();

谢了。

共有1个答案

陆英毅
2023-03-14

在本博客中,您将找到一个如何将key和topic写入主题的示例

您需要将新flinkkafkaProducer的创建替换为以下内容:

FlinkKafkaProducer<KafkaRecord> kafkaProducer = 
  new FlinkKafkaProducer<KafkaRecord>(
    producerTopicName, 
    ((record, timestamp) -> new ProducerRecord<byte[], byte[]>(producerTopicName, record.key.getBytes(), record.value.getBytes())), 
    producerProperties
  );
 类似资料:
  • 前期回顾 其中channel.finishConnect()中完成建立连接,调用了 sender的run(),继续分析 其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send() 调用selector的send() 调用kafkachannel的setsend() 开始发送数据 sender里面的pol

  • 本文向大家介绍在生产者中,何时发生QueueFullException?相关面试题,主要包含被问及在生产者中,何时发生QueueFullException?时的应答技巧和注意事项,需要的朋友参考一下 答:每当Kafka生产者试图以代理的身份在当时无法处理的速度发送消息时,通常都会发生QueueFullException。但是,为了协作处理增加的负载,用户需要添加足够的代理,因为生产者不会阻止。

  • 如果我只在生产者端发送一条记录并等待,生产者何时将记录发送给经纪人?在Kafka文档中,我找到了名为“linger.ms”的配置,它说: 一旦我们得到 根据以上文件,我有两个问题。 > 如果生产者收到的数据达到batch.size,它会立即触发发送一个只包含一个批次的请求给代理?但是正如我们所知,一个请求可以包含许多批次,那么它是如何发生的呢? 这是否意味着即使是收到的数据也不足以批量处理。大小,

  • 大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。 设置: 两个用java编写的Flink jobs(一个消费者,一个生产者) 目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。 然后,使用者应将消息反序列化为具有接收到的模式的对象。使用。 到目前为止还不错:如果我将架构注册表上的主题配置

  • 这是关于让生产者知道消息是否已被消费者消费的用例。 我的想法是,每次消费消息时,消费者都会提交偏移量,生产者可以跟踪和读取当前偏移量,以查看是否消费了相应的消息。 此外,请不要犹豫,让我知道这是否是处理用例的正确方法,因为我没有Kafka的经验。我知道Kafka不是为这种方式设计的(处理上述用例),但我必须坚持使用Kafka。

  • 我无法将KafkaProducer使用java从Windows(主机操作系统)上的eclipse发送到运行在Hortonworks沙箱上的kafka主题。我的java代码如下所示 当我运行这个java代码时没有错误,它只是打印消息的索引,在本例中只有0,然后终止,我无法在hortonworks沙箱的cmd接口上的console-consumer中看到0。 这是pom.xml依赖项 我可以从制片人那