当前位置: 首页 > 知识库问答 >
问题:

使用ApacheFlink仅生成Kafka主题的消息

百里飞捷
2023-03-14

从示例中,我看到了下面的代码片段,它运行良好。但问题是:我并不总是需要处理输入流并将其生成到接收器。

如果我有一个应用程序,根据某些事件,我必须只发布到kafka主题,以便下游应用程序可以做出某些决定。这意味着,我实际上没有输入流,但我只知道当我的应用程序中发生某些事情时,我需要向kafka的特定主题发布消息。也就是说,我只需要一个接收器。

我查看了示例,但没有找到符合我要求的任何内容。有没有一种方法可以只配置一个KafkaSink,该KafkaSink公开一个方法()来发布消息到一个主题。

提前表示感谢!!

String inputTopic = "flink_input";
    String outputTopic = "flink_output";
    String consumerGroup = "baeldung";
    String address = "localhost:9092";
    StreamExecutionEnvironment environment = StreamExecutionEnvironment
      .getExecutionEnvironment();
    FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
    DataStream<String> stringInputStream = environment
      .addSource(flinkKafkaConsumer);

    FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(
      outputTopic, address);

    stringInputStream
      .map(new WordsCapitalizer())
      .addSink(flinkKafkaProducer);

共有1个答案

宗乐池
2023-03-14

您必须有一个来源。您可能希望实现自定义源,或者可以使用类似 NumberSequenceSource 的东西,后跟一个运算符(如 process 函数),该函数发出您知道要写入接收器的任何内容,然后是接收器。

例如,该流程函数可以将传入事件转换为您想要写入Kafka的任何内容,或者它可以忽略其输入并使用计时器生成要发送给Kafka(Kafka)的事件。

或者,根据您的需求,您可能会发现异步i/o是比流程函数更好的构建块。

 类似资料:
  • 我已经在本地安装了kafka(目前没有集群/模式注册),并尝试生成一个Avro主题,下面是与该主题相关的模式。 我想创建一个简单的来根据上述模式创建一些数据并将其发布到kafka。考虑创建转换为的示例数据,然后将其更改为然后发布。 然后,如下所示: 现在可以< code >手动将模式附加到此avro主题。 这可以通过使用而不是使用来实现吗?这是用于批处理,而不是。

  • 我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创

  • 这里是源连接器状态的输出: 这里是接收器连接器配置的输出: 这里是接收器连接器状态的输出:

  • 我想产生一个Kafka主题的信息。该消息应该具有以下模式: 我知道这是一个json模式,那么如何将json转换成字符串呢?

  • 我想在生成一条发送给Kafka主题的消息后,获取偏移量和分区信息。我通读了spring cloud stream kafka绑定文档,发现这可以通过fecting RECORD\u元数据kafka头来实现。 来自Spring文档:(https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.R

  • 我正在开发一个模块,它使用来自Kafka主题的消息并发布到下游系统。在下游系统不可用的情况下,消费者不确认Kakfa消息。因此,当我的消费者收到消息时,当下游系统不可用时,kakfa的偏移量将不会被提交。但是如果我在下游系统启动后收到新消息,并且当我确认该消息时,最新的偏移量将被提交,并且消费者永远不会收到主题中没有偏移量提交的那些消息。