我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题
我正在运行一个Kafka Streams应用程序,它有三个子拓扑。活动的阶段大致如下: 主题A 主题A、B和C都是物化的,这意味着如果每个主题有40个分区,我的最大并行度是120。 起初,我运行5个流应用程序,每个线程8个。在这种设置下,我遇到了不一致的性能。似乎某些共享同一线程的子拓扑比其他子拓扑更渴望CPU,过了一会儿,我会得到这个错误:组[consumer_group]中的中删除。一切都会重
我正在评估Apache Kafka Streams的事件源,看看它在复杂场景中的可行性。与关系数据库一样,我也遇到过一些情况,原子性/事务性至关重要: 具有两项服务的购物应用程序: OrderService:有一个带有订单的Kafka流商店(OrdersStore) ProductService:有一家Kafka流商店(ProductStockStore),里面有产品及其库存 流量: > Orde
我有下面的Kafka流代码 现在我们的一个客户端正在发送关于kafka标头的版本信息,如下所示。 基于这个标题,我需要为我的消息选择解析器,如何使用KStream操作符读取这个标题?我看过流的所有API,但没有方法给出头 我不能改成普通的kakfa消费者,因为我的应用程序已经依赖于少数KStream API。。
我有一个带有状态存储的转换器,它使用标点符号来操作所述状态存储。重复几次标点符号之后,操作可能已经完成,所以我想取消标点符号——但只针对实际完成分区各自状态存储上操作的任务。尚未完成的任务的标点操作应继续运行。为此,我的transformer保留了对schedule()返回的可取消项的引用。 据我所知,每个任务总是有自己的隔离Transex实例,每个任务都有自己的隔离计划标点符号()在该实例中(?
但是如果serviceworkList列表为空,则变量“validate”为false。我知道allMatch的规范,如果list为空,则返回为true。 有什么建议,我可以如何重建流,如果列表列表为空,我会得到假?
如果我在Java8中有一个并行流,并且我以anyMatch终止,并且我的集合有一个与谓词匹配的元素,那么我将试图弄清楚当一个线程处理这个元素时会发生什么。 我知道anyMatch是短路的,这样我就不会期望一旦匹配元素被处理,就会有更多的元素被处理。我的困惑是其他线程会发生什么,这些线程大概处于处理元素的中间。我可以想到3种可能的场景:a)它们是否被中断?b)它们是否继续处理它们正在处理的元素,然后
我使用的是Java8,以及从包中映射、还原、收集API。我有一个用例,其中我需要垂直求和列表列表。我的列表的结构是我只是想不出有什么方法可以使用映射压缩内部列表,并减少或收集流上可用的操作。请考虑以下代码。假设我的列表名为。此事务存储的是针对内部列表中的每个项目在每个事务中售出的项目数量。内部列表的长度对于所有交易都是相同的,因为它包含库存中每个项目的一个条目。如果事务中没有涉及该项目,则只存储0
给定一个将字符串映射到,有没有一种方法可以使用Java流返回一个布尔值,其中TRUE表示一个或多个list had元素?如果映射中的所有列表都为空,则返回false。 流的使用可以取代这种传统的代码吗? 注意,我们可以在第一次发现后跳出检查。不需要检查所有的映射值(所有列表)。为了提高效率,如果流可以执行相同的“先工作后查找”(一个“短路”操作),那就好了。
在我的Kafka Streams应用程序中,我有一个任务来设置一个预定的(按墙时间)标点符号。标点符号遍历商店的条目并对它们做一些事情。像这样: 由于我在这里使用单个存储(可能是分区的),我假设标点符号的每一次执行都绑定到该存储的单个分区。 有可能找出标点器在哪个分区上运行吗?用于声明此方法在标点符号中返回。 我读过《Kafka流:标点与过程》及其答案。我可以理解,一般来说,任务与特定分区无关。但
我有一个地图,上面有键和值和打印。我需要为这些EMPID添加标签。但当我使用stream API时,最终的结果是很糟糕的。
我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:
我想利用Kafka 0.11中引入的幂等生产者。根据这篇融合的博客文章,添加了一个新属性来支持这一点: 幂等性:每个分区仅一次有序语义 要启用此功能并在每个分区中准确获取一次语义,即没有重复,没有数据丢失,并且为了语义,请将生产者配置为设置“enable.idemponence=true”。 这一点既不是Spring Cloud Stream,也不是Spring Kafka文档对该属性的使用。我们
我试图在Kafka上发送消息,当在特定表记录中插入时。我认为这部分应用程序将被视为供应商。 我有以下代码。 我实际上想发送数据。所以,我正在寻找更像以下内容的东西: 这可能吗?我如何配置它?例如,如果 我声明为“供应商”,那么如何向其传递数据 如果我声明一个“函数”,那么输入似乎将从主题接收并转发。我没有收到来自主题的数据。我正在从RESTAPI数据库接收数据 我不确定以下几点是否能有所帮助:但只