我有一个主题,它接收可能包含部分数据的JSON记录。我想合并这些数据,所以我试图在最终的数据记录内收集尽可能多的信息。
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute2: 'bar' }
合并记录值后所需的流:
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }
//key of the topic is id
KStream<String, MyObject> input = ...
return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();
编辑:流定义是正确的,默认情况下reduce不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,配置属性:
cache.max.bytes.buffering: 0
必须设置。
public class MergeStreams {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String rockTopic = allProps.getProperty("input.rock.topic.name");
final String classicalTopic = allProps.getProperty("input.classical.topic.name");
final String allGenresTopic = allProps.getProperty("output.topic.name");
KStream<String, SongEvent> rockSongs = builder.stream(rockTopic);
KStream<String, SongEvent> classicalSongs = builder.stream(classicalTopic);
KStream<String, SongEvent> allSongs = rockSongs.merge(classicalSongs);
allSongs.to(allGenresTopic);
return builder.build();
}
public void createTopics(Properties allProps) {
AdminClient client = AdminClient.create(allProps);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.rock.topic.name"),
Integer.parseInt(allProps.getProperty("input.rock.topic.partitions")),
Short.parseShort(allProps.getProperty("input.rock.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("input.classical.topic.name"),
Integer.parseInt(allProps.getProperty("input.classical.topic.partitions")),
Short.parseShort(allProps.getProperty("input.classical.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
MergeStreams ms = new MergeStreams();
Properties allProps = ms.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
Topology topology = ms.buildTopology(allProps);
ms.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
需要更多详细信息,请点击此处
给定:我在Kafka中有两个主题,假设主题A和主题B。Kafka流从主题A中读取一条记录,对其进行处理,并产生与所消耗记录相对应的多条记录(假设recordA和recordB)。现在的问题是我如何使用Kafka流来实现这一点。 在这里,读取的记录是消息;处理之后,它返回一个消息列表。如何将此列表划分为两个生产者流?任何帮助都将不胜感激。
我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,
有没有办法从Kafka主题中删除单个记录?我知道有一个脚本kafka-delete-records.sh删除指定主题和分区上指定偏移量之前的记录,但是我希望能够删除我指定的偏移量上的记录。有办法做到吗? 这不是在Java而是在裸露的Kafka实例上。
我用Flink的table API创建了一个表。 当运行SQL以查看记录时,我得到: 我知道有一些坏的avro记录被推送到Kafka主题中。在JSON格式中,有一个选项可以通过设置来跳过/过滤这些记录。当从合流avro格式读取时,我们可以跳过这些记录吗? 这并不理想,但不幸的是,尽管有一个模式注册表,但我无法控制要推送到Kafka的内容。
我有一个KStream KStream DSL如下所示: 阅读一些文章(例如Kafka流窗口) 但我想补充一点,这对我来说并不适用: Java编译器抛出以下错误: 老实说,我不明白。参数是正确的;虚拟现实类型是“历史”。 你知道我错过了什么吗? 这个windowedBy KTable的想法是让一个状态为一件“事情”保存所有事件一天。假设生成了一个新警报,我想将一天内“某物”的所有事件附加到警报上。
合并 在执行pull之后,进行下一次push之前,如果其他人进行了推送内容到远程数据库的话,那么你的push将被拒绝。 这种情况下,在读取别人push的变更并进行合并操作之前,你的push都将被拒绝。这是因为,如果不进行合并就试图覆盖已有的变更记录的话,其他人push的变更(图中的提交C)就会丢失。 合并的时候,Git会自动合并已有的变更点!不过,也存在不能自动合并的情况。在下一个页面,我们会为大