当前位置: 首页 > 知识库问答 >
问题:

如何使用水槽将身体中的数据拆分为不同的通道?

孙斌
2023-03-14

我想使用水槽代理并根据定义的函数拆分身体的数据,并将身体数据的一部分发送到一个接收器,另一部分发送到另一个接收器。

我是否需要为此实现自定义拦截器,或者是否有我错过阅读Flume用户指南的默认解决方案

共有1个答案

包谭三
2023-03-14

是的,你必须编写一个自定义拦截器。

你可以使用拦截器的拦截方法,并根据你的功能拆分每个事件的主体。稍后,您可以将一个有意义的标头分配给身体的特定部分,该部分可以用作水槽扇出流的重定向或通道选择器参数。

准备好代码后,您可以在 flume conf 文件中添加以下属性并实现多路复用 -

多路选择器的映射:

Agent.sources.Source1.selector.type = multiplexing

Agent.sources.Source1.selector.header = someHeader
Agent.sources.Source1.selector.mapping.Value = Channel1
Agent.sources.Source1.selector.mapping.Value2 = Channel1 Channel2
Agent.sources.Source1.selector.mapping.Value3 = Channel2
 类似资料:
  • 我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希

  • 当hdfs不可用时,是否有方法确保数据安全?场景是:kafka源,flume内存通道,hdfs接收器。如果水槽服务关闭了,它是否可以存储主题分区的偏移量,并在恢复后从正确的位置消费?

  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?

  • 这是这个问题的延续Java8流比较两个对象并在它们上运行一个函数 我的问题是根据一些动态参数将集合(使用Java8流)拆分成更小的组件。动态参数可以根据传递给方法splitParts的不同条件(这里,我给出的示例是getPartId()相同)而改变,该方法的返回类型是一组部件的列表 我尝试了以下方法: 如何根据动态参数拆分流(可能通过调用函数接口而不是减少方法),然后在其上运行方法? 我的函数签名

  • 问题内容: 我想将它们分成几个新列。假设我有一个看起来像这样的数据框: 我知道使用: 我可以分割一个字符串。但是,下一步,我想像这样有效地将拆分后的字符串放入新列中: 我可以例如这样做: 但是,如何才能更优雅地达到相同的结果呢? 问题答案: 该方法有一个参数: 带有列名: Python> = 3.6 f字符串的情况更加整洁: