当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams:当kafka流将数据写入目标主题时如何捕获事件

司迪
2023-03-14

我正在开发kafka-stream api。基本上Kafka-stream从源主题获取数据,并在应用一些过滤器后将其写回目标kafka主题。

使用的依存关系:

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.1.0</version>
    </dependency>

下面是相同的代码。:

{ ...

       // create property

            Properties property =  new Properties();
            property.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
            property.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"kafka_streams_app");
            property.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
            property.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

            //create topology
            StreamsBuilder streamsBuilder = new StreamsBuilder();

            //build topology
            KStream<String,String> inputTopic = streamsBuilder.stream("source_topic");

          //filtering data
            KStream<String,String> filteredStream = inputTopic.filter(
                    (k,val)-> filterData(val)>10000
            );
            filteredStream.to("target_topic");
            KafkaStreams streams = new KafkaStreams(
                    streamsBuilder.build(),
                    property
            );
            //start our stream app

            streams.start(); 
    ...
    }

这是我的应用程序架构:

生产者API(源主题中的生产者)=

我想要的是,当流将数据写入目标主题时,我想要捕获事件,无论它是否成功。

有没有办法捕捉到回调?谢谢

共有1个答案

彭正谊
2023-03-14

您可以在普通生产者API上发送时指定的生产者回调

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

未在Streams API中公开。

不过,可以配置拦截器:

  • https://kafka.apache.org/documentation/#producerconfigs_interceptor.classes
  • https://javadoc.io/static/org.apache.kafka/kafka-clients/3.0.0/org/apache/kafka/clients/producer/ProducerInterceptor.html
 类似资料:
  • 我在Scala中设置了Spark Kafka Consumer,它接收来自多个主题的消息: 我需要为每个主题的消息(将采用JSON格式)开发相应的操作代码。 我提到了以下问题,但其中的答案对我没有帮助: 从spark中的Kafka消息获取主题 那么,在接收到的DStream上是否有任何方法可用于获取主题名称以及消息以确定应该采取什么行动? 对此任何帮助都将不胜感激。谢谢你。

  • 我正在使用sdk version并尝试使用运行器将数据拉至bigtable。不幸的是,当我使用作为我的接收器时,我在执行我的数据流管道时得到了。已经检查了我的并且参数很好,根据我的需要。 基本上,我创建并在我的管道的某个点上完成了编写 ,但我甚至无法设置断点来调试正好是null的地方。对于如何解决这个问题,有什么建议吗? 谢谢。

  • 我的场景是我使用make很多共享前缀(例如house.door,house.room)的Kafka主题,并使用Kafka stream regex主题模式API消费所有主题。一切看起来都很好,我得到了数据的密钥和信息。 为了处理数据,我需要主题名,这样我就可以根据主题名进行连接,但我不知道如何在Kafka stream DSL中获得主题名。

  • 我一直在使用covid19api持有的数据实现Kafka生产者/消费者和流。 我试图从endpoint中提取每天的案例https://api.covid19api.com/all.然而,这个服务——以及这个API中的其他服务——拥有自疾病开始以来的所有数据(确诊、死亡和恢复病例),但积累了数据,而不是日常病例,这就是我最终要实现的。 使用transformValues和StoreBuilder(正

  • 我们正在尝试使用托管在Windows独立环境中的Kafka中的代理消息。消费者正在Kubernetes中运行。 Server.Properties: 请帮助我解决这个问题。

  • 问题内容: 我在Oracle数据库中有一个交易表。我正在尝试为涉及多种交易类型的交付系统收集一份报告。实际上,“请求”类型可以是四个子类型之一(在此示例中为“ A”,“ B”,“ C”和“ D”),而“传递”类型可以是四个不同子类型之一类型(“ PULL”,“ PICKUP”,“ MAIL”)。从“请求”到“交付”之间可以有1到5个事务,并且“交付”类型中的许多也是中间事务。 我需要的是像这样的报