我有一个非常简单的Java/Spring应用程序来演示KStream的功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用controller GET方法简单地检索其内容。示例代码:
@RestController
@RequestMapping("/resources/")
public class StreamController {
private KafkaStreams streams;
private KStream<String, ResourceMessage> resourceStream;
StreamController() {
// configure streams/consumer
Properties props = new Properties();
// make sure stream starts from the beginning
props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty("java.io.tmpdir")).toAbsolutePath().toString());
//create POJO serdes
StreamsBuilder builder = new StreamsBuilder();
Map<String, Object> serdeProps = new HashMap<>();
Serializer<ResourceMessage> resourceSerializer = new JsonPOJOSerializer<>();
serdeProps.put("JsonPOJOClass", ResourceMessage.class);
resourceSerializer.configure(serdeProps, false);
Deserializer<ResourceMessage> resourceDeserializer = new JsonPOJODeserializer<>();
serdeProps.put("JsonPOJOClass", Resource.class);
resourceDeserializer.configure(serdeProps, false);
Serde<ResourceMessage> resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);
// create KStream with POJO serdes for value
resourceStream = builder.stream("Resources", Consumed.with(Serdes.String(), resourceSerde));
streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = "/resource")
public List<Resource> getResources() {
List<ResourceMessage> messages = new LinkedList<ResourceMessage>();
// problem is here - there are messages in the topic but KStream returns no values in foreach(...)
resourceStream.foreach((key, value) -> messages.add(value));
return messages.stream().map(m -> Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}
问题-主题中有消息,但foreach(...)中的KStream枚举没有从中检索任何结果。KStream对象状态为“RUNning”,日志中没有错误。
生成随机应用程序ID并将AUTO\u OFFSET\u RESET设置为“最早”并没有帮助。使用Kafka工具,我可以清楚地看到主题中的一些信息。在控制器运行时添加新消息也没有帮助。关于Kafka流媒体,我有什么遗漏或不理解的吗?
PS我在这里使用POJO序列化器和反序列化器示例。
Kafka Streams是用于实时流处理的Kafka客户端。在您的情况下,您不需要Kafka Streams客户端(它不会工作),您需要一个简单的Kafka消费者,该消费者轮询来自Kafka的记录,并使用Rest API将其发送回。例如:
@RestController
@RequestMapping("/resources/")
public class StreamController {
private KafkaStreams streams;
private Consumer<String, ResourceMessage> consumer;
StreamController() {
// configure consumer properties
Properties props = new Properties();
// make the right properties with your Serialize and deserialiser
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Create the consumer using props.
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
}
// GET method that enumerates KStream and returns contents
@GetMapping(value = "/resource")
public List<Resource> getResources() {
List<ResourceMessage> messages = new LinkedList<ResourceMessage>();
ConsumerRecords<String, ResourceMessage> consumerRecords =
consumer.poll(1000);
messages = consumerRecords... // Convert the records to your custom POJO
return messages.stream().map(m -> Resource.builder()
.id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
}
}
您可以在这里找到完整的示例链接。
更新
此外,您应该知道RestControlled是请求作用域,因此为每个请求创建了一个控制器实例。最后,API响应将一无所获。如果您想使用Kafka Streams,可以在主方法中启动它,同时使用Spring Boot应用程序。你可以看到这个例子,链接。
我要用Kafka流计算平均值。所以我做了一个有状态的操作,聚合,需要创建一个状态存储,但是这种情况不会发生。 这里是平均值的函数: 以下是例外情况: 问题是基本目录不存在,但我希望kafka流在必要时创建目录。 编辑 ----- 我注意到,如果我有1个处理器,使一个变量的平均值没有问题,但如果我有2个处理器是。 1个处理器的配置文件: 2个处理器的配置文件: 现在我启动处理器: 类型元组包含配置文
只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)
我试过了,但这不起作用。我如何在一个带有流的映射中创建这个映射,同时计算更改?
我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接
我有一个关于kafka流应用程序中的控制流的基本问题。如果有两个源主题 我做了一个非常初步的测试,当记录被消费时,我偷看了一下,然后用一个简单的速溶软件打印了它们被处理的瞬间。现在 这些是主题中记录的开始和结束时间戳 主题B记录在主题A之前提取。Sysout显示主题B中的所有记录。有人能帮助理解这一点吗?我希望在编写具有多个输入源的流式应用程序时使用这种理解。 提前感谢
我们从kafka向SparkStreaming发送了15张唱片,但是spark只收到了11张唱片。我用的是spark 2.1.0和kafka_2.12-0.10.2.0。 密码 bin/Kafka-console-producer . sh-broker-list localhost:9092-topic input data topic # 1 2 3 4 5 6 7 8 9 10 11 12