当前位置: 首页 > 知识库问答 >
问题:

Kafka流未填充

鲁靖
2023-03-14

我有一个非常简单的Java/Spring应用程序来演示KStream的功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用controller GET方法简单地检索其内容。示例代码:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private KStream<String, ResourceMessage> resourceStream;

   StreamController() {

      // configure streams/consumer
      Properties props = new Properties();

      // make sure stream starts from the beginning
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty("java.io.tmpdir")).toAbsolutePath().toString());

      //create POJO serdes
      StreamsBuilder builder = new StreamsBuilder();
      Map<String, Object> serdeProps = new HashMap<>();
      Serializer<ResourceMessage> resourceSerializer = new JsonPOJOSerializer<>();
      serdeProps.put("JsonPOJOClass", ResourceMessage.class);
      resourceSerializer.configure(serdeProps, false);
      Deserializer<ResourceMessage> resourceDeserializer = new JsonPOJODeserializer<>();
      serdeProps.put("JsonPOJOClass", Resource.class);
      resourceDeserializer.configure(serdeProps, false);
      Serde<ResourceMessage> resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);

      // create KStream with POJO serdes for value
      resourceStream = builder.stream("Resources", Consumed.with(Serdes.String(), resourceSerde));
      streams = new KafkaStreams(builder.build(), props);
      streams.start();
     }

   // GET method that enumerates KStream and returns contents
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

       List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

       // problem is here - there are messages in the topic but KStream returns no values in foreach(...)
       resourceStream.foreach((key, value) -> messages.add(value));

       return messages.stream().map(m -> Resource.builder()
            .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

问题-主题中有消息,但foreach(...)中的KStream枚举没有从中检索任何结果。KStream对象状态为“RUNning”,日志中没有错误。

生成随机应用程序ID并将AUTO\u OFFSET\u RESET设置为“最早”并没有帮助。使用Kafka工具,我可以清楚地看到主题中的一些信息。在控制器运行时添加新消息也没有帮助。关于Kafka流媒体,我有什么遗漏或不理解的吗?

PS我在这里使用POJO序列化器和反序列化器示例。

共有1个答案

盖弘毅
2023-03-14

Kafka Streams是用于实时流处理的Kafka客户端。在您的情况下,您不需要Kafka Streams客户端(它不会工作),您需要一个简单的Kafka消费者,该消费者轮询来自Kafka的记录,并使用Rest API将其发送回。例如:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private Consumer<String, ResourceMessage> consumer;

   StreamController() {

  // configure consumer properties
  Properties props = new Properties();

  // make the right properties with your Serialize and deserialiser

  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

  // Create the consumer using props.
  consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Collections.singletonList(TOPIC));
 }

   // GET method that enumerates KStream and returns contents
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

   List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

   ConsumerRecords<String, ResourceMessage> consumerRecords =
                consumer.poll(1000);
   
  
   messages = consumerRecords... // Convert the records to your custom POJO

   return messages.stream().map(m -> Resource.builder()
        .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

您可以在这里找到完整的示例链接。

更新

此外,您应该知道RestControlled是请求作用域,因此为每个请求创建了一个控制器实例。最后,API响应将一无所获。如果您想使用Kafka Streams,可以在主方法中启动它,同时使用Spring Boot应用程序。你可以看到这个例子,链接。

 类似资料:
  • 我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。 这里是平均值的函数: 以下是例外情况: 问题是基本目录不存在,但我希望kafka流在必要时创建目录。 编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。 1个处理器的配置文件: 2个处理器的配置文件: 现在我启动处理器: 类型元组包含配置文

  • 只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)

  • 我试过了,但这不起作用。我如何在一个带有流的映射中创建这个映射,同时计算更改?

  • 我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接

  • 我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢

  • 我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。 密码 bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12