当前位置: 首页 > 知识库问答 >
问题:

kafka流中的处理器节点

杨乐意
2023-03-14

我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗?

但是,下面的代码没有编译,抛出了一个错误:the method filter(谓词

KStreamBuilder builder = new KStreamBuilder();

builder.stream(topic)
    .filter(new Predicate <String, String>() {
        //@Override
        public boolean test(String key, String value) {
            Hashtable<Object, Object> message;
            // put you processor logic here
            return message.get("UserID").equals("1");
        }
    })
    .to(streamouttopic);

    final KafkaStreams streams = new KafkaStreams(builder, props);
    final CountDownLatch latch = new CountDownLatch(1);

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
       streams.close();
       latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);

有人能给我引路吗?


共有2个答案

吴才俊
2023-03-14

可能您正在使用另一个包中的Predicate类。您需要使用

import org.apache.kafka.streams.kstream.Predicate;
呼延光明
2023-03-14
匿名用户

<代码>生成器。流(主题)返回KStream

如果您知道,实际类型是KStream

builder.<Sting,String>stream(topic)
       .filter(...)

要回答您关于“处理器节点”的问题:是的,添加一个过滤器()将在内部添加一个处理器节点。注意,在DSL级别,通常不需要考虑处理器。

如果您想显式使用处理器,您可以使用处理器API而不是DSL。查看WordCount示例:https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java

注意,使用DSL,代码将在内部转换为处理器拓扑,即Kafka流的运行时模型。

 类似资料:
  • 曾发表过多篇文章,但大多数都与处理错误消息有关,而不是处理过程中的异常处理。 我想知道如何处理流应用程序接收到的消息,并且在处理消息时出现异常?异常可能是由于多种原因造成的,如网络故障、RuntimeException等。, 有人能提出正确的方法吗?我应该使用setUncaughtExceptionHandler吗?还是有更好的方法

  • 我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df

  • 我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。 主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。 这些是主题 application.yml我正在使用cp-all-in-one-community作为docker-file 但现在我得到以下错误:

  • 问题内容: 曾经经历过多个帖子,但是其中大多数都是相关的处理错误消息,与处理它们时的异常处理无关。 我想知道如何处理流应用程序收到的消息,并且在处理消息时出现异常?该异常可能是由于多种原因造成的,例如网络故障,RuntimeException等, 有人可以建议正确的做法吗?我应该使用 吗?或者,还有更好的方法? 如何处理重试? 问题答案: 这取决于您要如何处理生产者方面的异常。如果将对生产者抛出异

  • 我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个

  • 我最近看到了这篇关于Apache Kafka文档的文章,内容涉及如何处理Kafka流中的无序消息 https://kafka.apache.org/21/documentation/streams/core-concepts#streams_out_of_ordering 有人能给我解释一下下面这句话背后的原因吗: 在主题分区中,记录的时间戳可能不会随着它们的偏移量单调地增加。由于Kafka流总是