当前位置: 首页 > 知识库问答 >
问题:

如何通过Spark从Kafka那里获得至少N个日志?

东郭臻
2023-03-14

在Spark streaming中,我会在日志到达时获取日志。但我希望在一次传递中获得至少N个日志。如何才能实现呢?

从这个答案来看,Kafka似乎有这样一种效用,但在Spark中似乎并不存在,使之成为可能。

共有1个答案

单于季
2023-03-14

没有选项允许您设置从Kafka接收的消息数的最小值。选项maxoffsetspertrigger允许您设置最大消息数。

如果您希望您的微批处理一次处理更多的消息,您可以考虑增加您的触发间隔。

还有(参考你提供的链接),这也是不可能在Kafaka本身设置的。您可以设置提取字节数的最小数量,但不能设置消息数的最小数量。

这样,您还可以使用使用者配置kafka.fetch.min.bytes。但是,在loval Kafka 2.5.0安装上用Spark 3.0.1测试这一点并没有任何影响。在添加配置kafka.fetch.max.wait.ms时,我的测试中的提取定时发生了变化,但变化的方式并不是可预测的(至少对我来说是这样)。

查看Spark的KafkaDataConsumer的源代码,与纯KafkaConsumer相比,fetch似乎不直接占任何最小/最大字节。

 类似资料:
  • 我有一个用户登录的函数。但它被暂停了。我试着获取它的返回值,但我做不到。这就是我想做的 代码 我称之为 我有错误 E/AndroidRuntime:致命异常:主进程:ru.gkomega.navigation,pid:11863java.lang.IllegalStateException:已在kotlin.coroutines.safeContinuation.ResumeWith(safeCo

  • 我正在建立一个社交网络,在那里,用户将拥有一个类似推特的流,记录他们关注的人的所有帖子。 什么是最好的方式来查询这个与Laravel雄辩? 我有三张桌子

  • 我正试图用tweepy从特定用户那里获得流行的推文。我需要twitter API中的result_type='popular'之类的东西(https://dev.twitter.com/rest/reference/get/search/tweets). 我该怎么做呢?

  • 我正在使用Javascript(ES6)/FacebookReact,并尝试获得大小不同的数组的前3个元素。我想做与Linq take(n)等价的操作。 在我的Jsx文件中,我有以下内容: 然后我尝试了前3个项目 这不起作用,因为地图没有一个设置的函数。 你能帮忙吗?

  • 我想做一个模板,在那里我可以输入一个索引,它会给我在那个索引的类型。我知道我可以用来实现,但我想自己实现它。例如,我想这样做, ...它会给出位置的类型(因为数组是从0开始索引的)。我怎么能这么做?多谢了。

  • 我正在使用spring boot Kafka向一个主题发送消息。我的要求是从表中增量读取数据,并根据日期时间字段将其发布到主题中,因为这是一个预定的过程,所以我需要在每次成功向Kafka发送消息后存储每条消息的日期时间字段。 有什么建议吗?最好的方法是什么?我相信我不能对这样的事情使用异步回调,因为我需要在每次调用生产者后更新变量。 此外,由于基础设施限制,我无法使用Kafka连接。