我想使用水槽代理并根据定义的函数拆分身体的数据,并将身体数据的一部分发送到一个接收器,另一部分发送到另一个接收器。
我是否需要为此实现自定义拦截器,或者是否有我错过阅读Flume用户指南的默认解决方案?
是的,你必须编写一个自定义拦截器。
你可以使用拦截器的拦截方法,并根据你的功能拆分每个事件的主体。稍后,您可以将一个有意义的标头分配给身体的特定部分,该部分可以用作水槽扇出流的重定向或通道选择器参数。
准备好代码后,您可以在 flume conf
文件中添加以下属性并实现多路复用 -
多路选择器的映射:
Agent.sources.Source1.selector.type = multiplexing
Agent.sources.Source1.selector.header = someHeader
Agent.sources.Source1.selector.mapping.Value = Channel1
Agent.sources.Source1.selector.mapping.Value2 = Channel1 Channel2
Agent.sources.Source1.selector.mapping.Value3 = Channel2
我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希
当hdfs不可用时,是否有方法确保数据安全?场景是:kafka源,flume内存通道,hdfs接收器。如果水槽服务关闭了,它是否可以存储主题分区的偏移量,并在恢复后从正确的位置消费?
这是这个问题的延续Java8流比较两个对象并在它们上运行一个函数 我的问题是根据一些动态参数将集合(使用Java8流)拆分成更小的组件。动态参数可以根据传递给方法splitParts的不同条件(这里,我给出的示例是getPartId()相同)而改变,该方法的返回类型是一组部件的列表 我尝试了以下方法: 如何根据动态参数拆分流(可能通过调用函数接口而不是减少方法),然后在其上运行方法? 我的函数签名
问题内容: 我想将它们分成几个新列。假设我有一个看起来像这样的数据框: 我知道使用: 我可以分割一个字符串。但是,下一步,我想像这样有效地将拆分后的字符串放入新列中: 我可以例如这样做: 但是,如何才能更优雅地达到相同的结果呢? 问题答案: 该方法有一个参数: 带有列名: Python> = 3.6 f字符串的情况更加整洁:
我有这样一份清单: 如何将此列表拆分为三个变量,每个变量分别保持不变