我有一个由byte[]键控的主题,我想对它进行重新分区,并通过消息正文中字段中的另一个键来处理该主题。
我发现有< code>KGroupedStream和< code>groupby函数。但是它需要一个聚合函数来转换成KTable/KStream。我不需要总数。我只想重新划分和处理输出。
KStream接口上有一个方法repartition(),它允许您基于Serdes和StreamPartitioner对主题进行重新分区,而不是映射/selectingKey()加上应用through或repartition。
(Kafka Streams 2.5.x 或更早版本)
不确定这是否完全正确,但它是有效的,并且重新分区主题是自动创建的,并带有< code>stream的正确分区数。
KTable emptyTable = someTable.filter((k, v) -> false);
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.leftJoin(emptyTable, (v, Null) -> v, ...);
编辑
在2020年8月,当Kafka Streams 2.6.0推出并KStream.重新分区()出现时,这种方法显然变成了一种复杂的可憎之物,值得大量的反对票和鞭笞。
因此,对于流媒体版本2.6.x,您必须使用
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();
是的,你可以。您设置了一个新的关键字,然后通过另一个主题传输数据。
// repartition() will create the required topic automatically for your,
// with the same number of partitions as your input topic;
//
// it's also possible to set the number of partitions explicitly to scale in/out
// via `repartitioned(Repartitioned.numberOfPartitions(...))`
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.repartition();
// older versions:
//
// using `through()` you need to create the use topic manually,
// before you start your application
KStream stream = ...
KStream repartionedStream = stream.selectKey(...)
.through("topic-name");
请注意,在使用所需数量的分区启动应用程序之前,您需要通过()创建在中使用的主题。
我在本地计算机上将一条太大的消息推送到Kafka消息主题中,现在我收到一个错误: 增加在这里并不理想,因为我实际上不想接受那么大的消息。
我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了
Spring HATEOAS提供了方便的ControllerLinkBuilder来创建指向控制器方法的链接,这些方法将作为HREF添加到返回给客户端的JSON/XML中。例如: ... 可能会生成类似以下内容的JSON: 然而... 我倾向于通过反向代理访问我的服务。我想大多数人可能会这样做。这让我可以在不同的端口上运行多个服务,但让我可以通过相同的基本URL访问它们。不幸的是,通过代理访问意味
问题内容: 有没有办法用TypeScript语言进行方法重载? 我想实现以下目标: 这是我不想做的一个例子(我真的很讨厌JS中重载hack的那一部分): 问题答案: 根据规范,TypeScript确实支持方法重载,但是它很笨拙,并且包含许多手动检查参数类型的工作。我认为这主要是因为在纯JavaScript中最接近方法重载的地方还包括检查,并且TypeScript尝试不修改实际的方法主体,以避免任何
本文向大家介绍ubuntu下没有中文输入法的解决办法,包括了ubuntu下没有中文输入法的解决办法的使用技巧和注意事项,需要的朋友参考一下 ubuntu下没有中文输入法的解决办法! 我们在安装虚拟机或者ubuntu系统的时候,常见的是尽管是中文版本的但却没有中文输入法,确实是一件很恼火的事情! 我自己也亲身经历过,特此给大家提供一种方法,很好用的! 我们在安装ubuntu系统的时候,最常见也是我们
问题内容: 有没有办法在Java中嵌入浏览器? 问题答案: http://docs.oracle.com/javafx/2.0/webview/jfxpub-webview.htm
我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们
我目前正在使用Web音频API。我设法“读懂”了一个麦克风,并将它播放给我的扬声器,这非常无缝。 使用Web Audio API,我现在想重新取样传入的音频流(又名麦克风)从44.1kHz到16kHz。16kHz,因为我正在使用一些需要16kHz的工具。由于44.1kHz除以16kHz不是整数,我相信我不能简单地使用低通滤波器和“跳过样本”,对吗? 我还看到一些人建议使用,但由于它已被弃用,我觉得