当前位置: 首页 > 知识库问答 >
问题:

闪光配料槽

武博艺
2023-03-14

我正在尝试使用flink,以流式和批处理的方式,将大量数据添加到Accumulo中(每分钟几百万个)。我想在将记录发送到accumulo之前对其进行批处理。我从目录或通过kafka摄取数据,使用flatmap转换数据,然后传递给RichSinkFunction,它将数据添加到集合中。

对于流数据,批处理似乎可以,因为我可以将记录添加到固定大小的集合中,一旦达到批处理阈值,这些记录就被发送到accumulo。但是对于有限的批处理数据,我正在努力寻找一种好的批处理方法,因为如果在指定的时间内没有其他数据,则需要一个刷新超时。似乎没有一个Accumulo连接器不同于弹性搜索或其他替代的接收器。

共有1个答案

叶鹭洋
2023-03-14

您可以通过实现processingTimeCallback来访问接收器中的计时器。例如,看看bucketingsink-它的open和onProcessingTime方法应该可以让您开始工作。

 类似资料:
  • 我正在寻找安卓摄像头LED闪光灯的帮助,以改变它的强度,像这个应用程序。 我已经检查了以下链接,但我没有得到确切的结果从它。 可以更改Android设备的LED强度吗? 如何在Android中以编程方式打开相机闪光灯? http://code.google.com/p/droidled/source/checkout http://code.google.com/p/simpleed/source

  • 我正在使用眨眼计划程序。这是我的 sql test_table是Kafka桌 我设置了表.exec.state.ttl=10000 并运行我的程序,然后我继续发送消息。 由于我将状态ttl和cep interval都设置为10s,当我启动它时,状态的大小在10秒后应该是一个固定的数字。 但事实是,该州至少持续增长15分钟。此外,jvm触发了两次完整的gc。 是否有我尚未配置的配置

  • 我正在为使用match_recognize的Flink SQL语句编写单元测试。我正在这样设置测试数据 我有两个问题, 如何将event_time指定为水印字段?(表示行时间) 不太重要,给表创建一个有意义的名称? FLINK版本:1.11

  • 我正在尝试验证控制器中的值,并将闪存错误消息发送到Sailsjs框架中的EJS视图文件。 我收到以下错误消息: 我已经在config/policies.js中添加了闪存策略 这就是我重定向到错误视图的方式。 flash.js内容 帮助我解决此闪存不是视图文件中定义的错误。

  • 问题内容: 像手电筒应用程序一样,我只需要将闪光灯与API camera2(Android 5,API级别21)一起使用。但是我发现的所有示例都需要在视图中显示摄像机流 问题答案: https://github.com/pinguo- yuyidong/Camera2/blob/master/app/src/main/java/us/yydcdut/androidltest/otheractivi

  • 学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。