当前位置: 首页 > 知识库问答 >
问题:

过滤Kafka流

柯鸿振
2023-03-14

我一直在检查Kafka流。我一直在测试下面的Kafka流代码

生产者主题:(这是第一个生产者主题-发送以下json数据)

KafkaProducer<String, String> producer = new KafkaProducer<>(
                    properties);

    producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));
                  producer.close();

JSON-主题的生产者:

{"UserID":"1","Address”:”XXX”,”AccountNo":"234234","MemberName”:”Stella”,”AccountType":"Savings"}

Stream Topic代码:(这是第二个Streaming代码和主题)

builder.<String,String>stream(topic)
           .filter(new Predicate <String, String>() {
               @Override

            public boolean test(String key, String value) {

                   // put you processor logic here
                   System.out.println("value : " + value);

                   return value.substring(0).equals(“1”);
               }
            }) 
           .to(streamouttopic);

         final KafkaStreams streams = new KafkaStreams(builder, props);
         final CountDownLatch latch = new CountDownLatch(1);

            // attach shutdown handler to catch control-c
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });

            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);

如果UserID值为“1”,我想对其进行归档,然后将该数据发送到目标流媒体主题。

当我使用“.filter”并打印System.out时。println(“value:”value);,它在执行时抛出以下错误。

Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String

如果我不使用“. filter”并使用像这样的简单代码,builder.stream(主题). to(流话题);,它工作正常,但没有过滤。但是,我需要使用那个过滤器。

有人能帮我修一下吗?

共有1个答案

司马越
2023-03-14

默认情况下,Kafka Streams假定数据类型

当将主题读取为KStream时,需要指定正确的序列:

builder.<String,String>stream(topic, Consumed.with(Serdes.String(), Serdes.String())
       .filter(...)

请查看示例并阅读文档:

  • https://github.com/confluentinc/kafka-streams-examples
  • https://kafka.apache.org/11/documentation/streams/

 类似资料:
  • 你能就解决下列问题的方法给我一个建议吗。我有两个主题,一个是静态内容,另一个是数据流。任务是连接数据,这在正常情况下很容易。我将静态内容理解为GlobalKTable,动态内容理解为KStream,然后简单地将它们连接起来。问题在于查找数据存在于同一主题的多个版本中。“版本”由“validFrom”字段标识。因此,流的数据需要根据其时间戳与相应版本的查找数据连接。有没有办法过滤GlobalKTab

  • 我们的软件解决方案为每个客户收集数据(“事件”)<一些客户(一小部分约3%)要求将这些数据输入“他们的系统”(他们需要为此服务付费)<我们需要发送这些事件的目标系统可能是: AWS S3 Azure存储 Splunk 数据狗 未来会有更多的目标系统... 上面的所有目标系统都有众所周知的Kafka Connect接收器连接器,因此我们的想法是使用这些连接器来导出数据。 所有客户事件都转到一个“输入

  • 我正在尝试编写一个简单的Kafka Streams应用程序(目标是Kafka 2.2/Confluent 5.2),将一个至少有一次语义的输入主题转换为一个恰好只有一次的输出流。我想对以下逻辑进行编码: 对于具有给定密钥的每条消息: (这是基于我们从上游系统获得的订购保证来保证提供正确的结果;我不想在这里做任何神奇的事情。) 起初,我以为我可以用Kafka Streams操作符来实现这一点,它可以

  • 我正在使用Kafka Streams API (KTable,GlobalKTable..).我在用KStreams消费Kafka主题。我需要根据一些配置过滤出一些传入的Kafka事件,并在配置发生变化时处理它们。主题的持续时间限制至少为7天。以下是要求: 键值状态 K1 V1加工 K2 V2 未处理(基于某些业务逻辑) K3 V3 已处理 K4 V4加工 K1 V5加工 ------ 现在我想再

  • Filter(过滤器)是 Java 组件,允许运行过程中改变进入资源的请求和资源返回的响应中的有效负载和头信息。 Java Servlet API 类和方法提供了一种轻量级的框架用于过滤动态和静态内容。还描述了如何在 Web 应用配置 filter,以及它们实现的约定和语义。 网上提供了 servlet 过滤器的 API 文档。过滤器的配置语法在第14章的“部署描述符”中的部署描述符模式部分给出。

  • 过滤掉不在范围内的数值。 用法 Your browser does not support the video tag. 案例:小台灯 功能:15:00:00时灯自动亮起,15:30:00时自动熄灭 工作原理 在配置项中设定一组范围。如果输入落在范围内,则输出输入本身;否则,输出no。