当前位置: 首页 > 知识库问答 >
问题:

使用我自己的Cassandra驱动程序编写聚合结果

拓拔嘉运
2023-03-14

我正在尝试创建一个简单的应用程序,它将我站点上每个网页的页面视图写给Cassandra。我想写每5分钟的累积页面浏览量从一个逻辑小时开始。

我的代码如下所示:

KTable<Windowed<String>, Long> hourlyPageViewsCounts = keyedPageViews
            .groupByKey()
            .count(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)), "HourlyPageViewsAgg")

我只需要在聚合5分钟后写,而不是每次更新。有可能吗?阅读这里表明,不使用低级API,可能是不行的,我正试图避免使用低级API,因为这似乎是一个足够简单的任务,可以用更高级的API来完成。

共有1个答案

江洲
2023-03-14

提交和生成/编写输出在Kafka Streams API中是两个不同的概念。在Kafka Streams API中,输出是连续产生的,并且提交用于“标记进度”(即,提交消费者抵消,包括刷新所有存储和缓冲的生产者记录)。

您可能希望查看这篇博文以获取更多详细信息:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

1)要写入Casandra,建议将应用程序的结果写回主题(通过#To(“topic-name”)),并使用Kafka Connect将数据写入Casandra。

比较:Kafka流处理过程中的外部系统查询

2)如果您希望有严格的5分钟间隔,那么使用低级API是唯一的方法(正如您已经指出的)。请注意,下一个版本(Kafka 1.0)将包括挂钟时间标点符号,这将使您更容易实现目标。

 类似资料:
  • 我正在试着写我自己的期望条件。我需要的是。。。我有一个iframe。我也有一个图像在里面。我想在图像的scr发生变化时继续处理。我所做的: 在后面的代码中,我有: 此代码工作,但返回相同的url多次。问题是当我取消注释时,它与 它下面的行工作正常...我不明白。

  • 我想在Mongo 3.2中执行聚合,如下所述,但在Java中: https://docs.mongodb.org/master/reference/operator/aggregation/lookup/#pipe._S_lookup 目前,我的java查询对象非常简单: 除了按employeId进行筛选外,我还想加入公司的此集合(其中employee.company\u id=company.i

  • 在本指南中,我们将从头开始建立属于自己的项目,使用 Kafka Streams 编写一个流处理应用程序。如果你还没阅读过 quickstart (在Kafka流中运行一个流应用程序)章节,我们强烈建议你先去阅读一下。 建立一个Maven项目 使用以下命令来创建具有 Kafka Streams 项目架构的 Maven 原型: mvn archetype:generate \ -Darchety

  • 我正在使用php的Datastax Cassandra驱动程序,希望能够检查是否查询失败,在数据库中没有找到结果。现在,如果查询失败,日志报告如下 如果查询成功,它将返回预期的数据。下面是函数 根据发布的建议,我最后做了以下几点 谢谢你的建议

  • 问题内容: 在进行大量数据加载时,基于日志数据增加计数器,但是遇到超时异常。我正在使用Datastax 2.0-rc2 Java驱动程序。 这是服务器无法跟上问题的问题(即服务器端配置问题),还是客户端无聊的等待服务器响应的问题?无论哪种方式,我都可以进行简单的配置更改来解决此问题吗? 节点之一在大致发生时报告此情况: 问题答案: 虽然我不了解此问题的根本原因,但我可以通过增加conf / cas

  • 在进行大容量数据加载、根据日志数据递增计数器时,我遇到了超时异常。我使用DataStax2.0-RC2 java驱动程序。 这是服务器无法跟上的问题(即服务器端配置问题),还是客户端在等待服务器响应时感到厌烦的问题?不管怎样,我能做一个简单的配置更改来解决这个问题吗? 其中一个节点大致在它发生的时间报告这一点: