当前位置: 首页 > 知识库问答 >
问题:

如何在水槽中同时使用regex_extractor选择器和多路复用拦截器?

林德辉
2023-03-14

由于源和汇之间的速度差距,我正在测试水槽将数据加载到hHase中,并考虑使用水槽的选择器和控制器并行加载数据。

所以,我想用水槽做的是

>

  • 使用拦截器的regex_extractor类型创建事件的标头

    使用选择器的复用类型将带有标头的事件复用到两个以上的通道

    在一个源-通道-接收器中。

    并尝试如下配置。

    
        agent.sources = tailsrc
        agent.channels = mem1 mem2
        agent.sinks = std1 std2
        agent.sources.tailsrc.type = exec
        agent.sources.tailsrc.command = tail -F /home/flumeuser/test/in.txt
        agent.sources.tailsrc.batchSize = 1
        
        agent.sources.tailsrc.interceptors = i1
        agent.sources.tailsrc.interceptors.i1.type = regex_extractor
        agent.sources.tailsrc.interceptors.i1.regex = ^(\\d)
        agent.sources.tailsrc.interceptors.i1.serializers = t1
        agent.sources.tailsrc.interceptors.i1.serializers.t1.name = type
        
        agent.sources.tailsrc.selector.type = multiplexing
        agent.sources.tailsrc.selector.header = type
        agent.sources.tailsrc.selector.mapping.1 = mem1
        agent.sources.tailsrc.selector.mapping.2 = mem2
        
        agent.sinks.std1.type = file_roll
        agent.sinks.std1.channel = mem1
        agent.sinks.std1.batchSize = 1
        agent.sinks.std1.sink.directory = /var/log/flumeout/1
        agent.sinks.std1.rollInterval = 0
        
        agent.sinks.std2.type = file_roll
        agent.sinks.std2.channel = mem2
        agent.sinks.std2.batchSize = 1
        agent.sinks.std2.sink.directory = /var/log/flumeout/2
        agent.sinks.std2.rollInterval = 0
        
        agent.channels.mem1.type = memory
        agent.channels.mem1.capacity = 100
        
        agent.channels.mem2.type = memory
        agent.channels.mem2.capacity = 100
    
    

    但是,它不起作用!

    当选择器部分被删除时,flume的日志中会出现一些拦截器调试消息。但是当选择器和拦截器在一起时,什么都没有。

    有没有什么表达错误或者我漏掉了什么?

    感谢您的阅读。:)

  • 共有1个答案

    法子昂
    2023-03-14

    我找到了。

    在水槽日志中,有如下警告信息。

    
        2013-10-10 16:34:20,514 (conf-file-poller-0) [WARN - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSources(FlumeConfiguration.java:571)] Removed tailsrc due to Failed to configure component!
    
    

    所以我附上了下面的线

    
        agent.sources.tailsrc.channels = mem1 mem2
    
    

    然后它起作用了!!!!

     类似资料:
    • 我有一个要求,我想运行以假脱机目录作为源的 Flume 代理。将假脱机目录中的所有文件复制到 HDFS(sink) 后,我希望代理停止,因为我知道所有文件都被推送到通道。我还想每次为不同的假脱机目录运行此步骤,并在目录中的所有文件都标记为 .完成。有没有办法停止水槽剂?

    • 问题内容: 使用笑话进行测试时,我具有基本的测试服语法: 问题是我的代码中有拦截器,当使用jest命令输出运行测试时,拦截器会: TypeError:无法读取未定义的属性“拦截器” 并指向拦截器对象 是存储以下项的返回值的变量 在SO上引用了该axios线程。我如何以开玩笑的方式测试axios,但是它不涉及任何拦截器,因此并没有真正的帮助。 问题答案: 最后就足够了

    • 我正在使用jshell,希望截断jshell控制台上显示的非常大的消息。 为此,我得到了 /set截断命令: 如果值太长,则在显示时将其截断。使用 /settruncation命令设置显示值的最大长度。如果命令中没有输入任何设置,则显示当前设置。 下面是截断的相关选择器类型。 有人能推荐案例选择器或动作选择器的使用案例吗??

    • 我正在开发一个struts2(2.1.8)web应用程序。我想使用执行等待拦截器,以便在处理上传的文件时显示等待页面。 在解决了线程问题(execAndWait拦截器在等待后没有重定向到success页面)之后,我没有一个NPE访问getText()方法,我继续遇到一些问题。 如果调试动作执行所有的jsp表单变量(包括上传的文件)都可以(与调试检查员我可以看到文件路径,内容类型,文件名),但文件不

    • 问题内容: 我知道如何拦截所有请求,但是我只想拦截来自我资源的请求。 有谁知道如何做到这一点? 问题答案: 如果只想拦截来自特定资源的请求,则可以使用可选的action 属性。Angular的文档请参见此处(用法>操作) 的JavaScript Plunker:http ://plnkr.co/edit/xjJH1rdJyB6vvpDACJOT?p=preview

    • 我正在创建使用改装的Android应用程序。我用Spring作为Restapi。我使用了JWT的身份验证。我在这里使用了两个拦截器。使用到期JWT调用api的场景如下 > 使用 服务器检查令牌和响应代码401/403 客户端检查响应代码并使用头中带有refreshtoken的调用/刷新 服务器检查refreshtoken并使用新accesstoken进行响应 现在的问题是如何再次打电话/你好。每个