当前位置: 首页 > 知识库问答 >
问题:

如何提高水槽剂的加工率

刘兴修
2023-03-14

我有一个将数据摄取到elasticsearch中的水槽代理。该代理正在使用spolDir源代码。还有另一个代理将文件写入elasticsearch代理的spolDir。

随着时间的推移,文件会增加,已处理文件和未处理文件之间的差异也会增加。

我想增加水槽代理处理的事件数量,以加速摄取过程。

这是水槽代理的配置。

代理04.sources=s1

agent04.channels=ch1

agent04.channels = memoryChannel

代理04.通道.内存通道.类型 = 内存

agent04.channels.memoryChannel.capacity=100000

agent 04 . channels . memory channel . transaction capacity = 1000

agent04.sources.s1.channels

agent04.sources.s1.type

agent04.sources.s1.spool /DataCollection/Flume_Cleaner_Output/Json_Elastic

代理04.sources.s1.反序列化程序.maxLineLength = 100000

agent04.sinks=elasticsearch

agent04.sinks.elasticsearch.channel

agent04.sinks.elasticsearch.type=org.css.cssElasticsearchSink

agent04.sinks.elasticsearch.batch尺寸=400

agent04.sinks.elasticsearch.host名称=elastic-node01.css.org

agent04.sinks.elasticsearch.index名称=all_collections

agent04.sinks.elasticsearch.indexType = live_tweets

agent04.sinks.elasticsearch.indexNameBuilder= org.css.sa.flume.elasticsearch.sink.indexNameBuilder.HeaderValueBasedIndexNameBuilder

agent 04 . sinks . elastic search . cluster name = CSS _ rai _ social

agent04.sinks.elasticsearch.serializer=org.jai.flume.sinks.elasticsearch.serializer.ElasticSearchJsonBodyEventSeriezer

agent04.sinks.elasticsearch.cache_period_ms=90d

共有1个答案

潘翰藻
2023-03-14

为什么要使用 spooldir 链接两个 Flume 代理?这将非常慢,并且是一个令人惊讶的配置。在处理每个批次时,您会产生频繁 fsync 的成本。

我建议您使用Avro Sink和Avro Source将它们链接起来。我还会将批量大小增加到至少 1000。(计算机真的很喜欢批处理,Flume被设置为这样做)。

 类似资料:
  • 我有一个要求,我想运行以假脱机目录作为源的 Flume 代理。将假脱机目录中的所有文件复制到 HDFS(sink) 后,我希望代理停止,因为我知道所有文件都被推送到通道。我还想每次为不同的假脱机目录运行此步骤,并在目录中的所有文件都标记为 .完成。有没有办法停止水槽剂?

  • 下午好,我在增加Flume的堆大小时遇到了麻烦。结果,我得到: 我增加了“flume-env.sh”和Hadoop/Yarn中定义的堆。运气不好。 有一点要注意,在启动水槽时,Exec(进程构建器?)似乎将堆定义为20Mb。关于如何覆盖它有什么想法吗? 最终,我尝试将Heapsize设置为1512MB。

  • https://cwiki.apache.org/confluence/display/FLUME/Getting 开始的页面说 HDFS sink 支持追加,但我无法找到有关如何启用它的任何信息,每个示例都在滚动文件上。因此,如果可能的话,我将不胜感激有关如何将水槽附加到现有文件的任何信息) 使现代化 可以将所有滚动属性设置为0,这将使flume写入单个文件,但它不会关闭文件,新记录对其他进程不

  • 我正在构建一个 Spring 启动独立应用程序,该应用程序需要使用来自远程服务器的消息并将其写入 。我正在使用Flume嵌入式代理来可靠地记录消息。但是我收到以下错误。 引起:org.apache.flume.Flume异常:组件类型的com.security.flume.sink.Sy 根据文档仅支持。这是否意味着我们甚至不能编写自定义水槽? 我对Flume相当陌生。我真的很感谢你在这个问题上的

  • 我正在尝试将日志从单台机器上的不同目录收集到本地文件系统文件或 HDFS。 我已经注册了 2 个来源 r1、r2。两个源都指向单通道C1。有一个接收器连接到通道。K1 请找到下面的配置文件: 但是当我使用代理 a1 启动 Flume 时,只有一个源 (r2) 正在启动。水槽代理启动日志: 谢谢

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法