当前位置: 首页 > 知识库问答 >
问题:

如何统计一天内从Kafka话题中获取的消息数量?

祝俊雄
2023-03-14

我正在从Kafka主题中获取数据,并以Deltalake(拼花)格式存储它们。我希望找到在特定的一天提取的消息的数量。

我的思考过程:我想用spark读取以拼花格式存储数据的目录,并在特定的一天对带有“.parquet”的文件应用count。这将返回一个计数,但我不确定这是否是正确的方法。

这种方式正确吗?有没有其他方法来计算某一天(或持续时间)从Kafka主题获取的消息数?

共有1个答案

酆英达
2023-03-14

我们从主题中获取的消息不仅具有键值,而且还具有时间戳等其他信息

可用于跟踪消费者流量。

时间戳时间戳由代理或生产者根据主题配置更新。如果主题配置的时间戳类型为CREATE_TIME,则代理将使用producer记录中的时间戳,而如果主题配置为LOG_APPEND_TIME,则代理将在追加记录时用代理本地时间覆盖时间戳。

>

  • 因此,如果您存储任何位置,如果您保留时间戳,您可以很好地跟踪每天或每小时的消息率。

    另外,您可以使用一些Kafka仪表板,如汇流控制中心(许可价格)或Grafana(免费)或任何其他工具来跟踪消息流。

    在我们的例子中,在消费消息并存储或处理消息的同时,我们还将消息的元细节路由到弹性搜索,并且我们可以通过Kibana将其可视化。

  •  类似资料:
    • 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。如何才能得到一个主题中的留言数量?

    • 我的用例是,从生产者端,它将一行数据(大约100字节)作为一条消息发布到kafka topic,从消费者端,我希望一次消费5条消息,并将其提供给我的消费者逻辑。 我做了一个简单的例子,它总是得到一个消息并打印在控制台上。请建议我任何需要的配置更改,以实现这一点。 请在下面找到源代码。 使用以下命令启动生产者 /kafka生产者性能测试——num记录500——主题测试——吞吐量10——有效负载文件测

    • 我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题

    • 问题内容: 我正在使用apache kafka进行消息传递。我已经用Java实现了生产者和消费者。我们如何获取主题中的消息数量? 问题答案: 从消费者的角度来看,想到此的唯一方法是实际消费消息并计数。 Kafka代理公开了自启动以来收到的消息数量的JMX计数器,但是您不知道已经清除了其中的多少。 在最常见的情况下,最好将Kafka中的消息视为无限流,而获得当前磁盘上保留的离散值并不重要。此外,在与

    • 我想要从服务器的一个主题开始所有的消息。 当使用上面的控制台命令时,我希望能够从一开始就获得一个主题中的所有消息,但我不能从一开始就使用java代码消费一个主题中的所有消息。