当前位置: 首页 > 知识库问答 >
问题:

使用拦截器过滤Flume中的日志文件

苏畅
2023-03-14

我有一个超文本传输协议服务器写入日志文件,然后使用Flume将其加载到HDFS。首先,我想根据标头或正文中的数据过滤数据。我读到我可以使用正则表达式的拦截器来做到这一点,有人能解释一下我需要做什么吗?我需要编写Java代码来覆盖Flume代码吗?

我还想获取数据并根据标头将其发送到不同的接收器(即 source=1 转到 sink1,source=2 转到 sink2)这是如何完成的?

谢谢

希蒙

共有2个答案

胡修伟
2023-03-14

您可以使用水槽通道选择器将事件简单地路由到不同的目的地。或者您可以将几个水槽代理链接在一起以实现复杂的路由功能。但是链接的水槽代理将变得有点难以维护(资源使用和水槽拓扑)。你可以看看水槽路由器接收器,它可能会提供一些你想要的功能。

首先,通过水槽拦截器在事件标头中添加特定字段

a1.sources = r1 r2
a1.channels = c1 c2
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
a1.sources.r2.channels =  c2
a1.sources.r2.type = seq
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = datacenter
a1.sources.r2.interceptors.i2.value = BERKELEY

然后,你可以像这样设置你的水槽通道选择器:

a2.sources = r2
a2.sources.channels = c1 c2 c3 c4
a2.sources.r2.selector.type = multiplexing
a2.sources.r2.selector.header = datacenter
a2.sources.r2.selector.mapping.NEW_YORK = c1
a2.sources.r2.selector.mapping.BERKELEY= c2 c3
a2.sources.r2.selector.default = c4

或者,您可以设置avro路由器接收器,如:

agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
agent.sinks.routerSink.hostname = test_host
agent.sinks.routerSink.port = 34541
agent.sinks.routerSink.channel = memoryChannel

# Set sink name
agent.sinks.routerSink.component.name = AvroRouterSink

# Set header name for routing
agent.sinks.routerSink.condition = datacenter

# Set routing conditions
agent.sinks.routerSink.conditions = east,west
agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
agent.sinks.routerSink.conditions.east.then.hostname = east_host
agent.sinks.routerSink.conditions.east.then.port = 34542
agent.sinks.routerSink.conditions.west.if = ^BERKELEY
agent.sinks.routerSink.conditions.west.then.hostname = west_host
agent.sinks.routerSink.conditions.west.then.port = 34543
农雅畅
2023-03-14

您无需编写 Java 代码来筛选事件。使用正则表达式过滤拦截器过滤正文文本与某些正则表达式匹配的事件:

agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true

要根据标头路由事件,请使用多路复用通道选择器:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

这里带有标头“State”=“CZ”的事件转到通道“c1”,带有“State”=“US”-到“c2”和“c3”,所有其他-到“c4”。

这样,您还可以按标头筛选事件 - 只需将特定的标头值路由到通道,该通道指向 Null 接收器。

 类似资料:
  • 主要内容:实现,步骤 1,Filter.java,步骤 2,AuthenticationFilter.java,DebugFilter.java,步骤 3,Target.java,步骤 4,FilterChain.java,步骤 5,FilterManager.java,步骤 6,Client.java,步骤 7,InterceptingFilterDemo.java,步骤 8拦截过滤器模式(Intercepting Filter Pattern)用于对应用程序的请求或响应做一些预处理/后处理。

  • 拦截过滤器模式(Intercepting Filter Pattern)用于对应用程序的请求或响应做一些预处理/后处理。定义过滤器,并在把请求传给实际目标应用程序之前应用在请求上。过滤器可以做认证/授权/记录日志,或者跟踪请求,然后把请求传给相应的处理程序。以下是这种设计模式的实体。 过滤器(Filter) - 过滤器在请求处理程序执行请求之前或之后,执行某些任务。 过滤器链(Filter Cha

  • 拦截过滤器模式(Intercepting Filter Pattern)用于对应用程序的请求或响应做一些预处理/后处理。定义过滤器,并在把请求传给实际目标应用程序之前应用在请求上。过滤器可以做认证/授权/记录日志,或者跟踪请求,然后把请求传给相应的处理程序。以下是这种设计模式的实体。 过滤器(Filter) - 过滤器在请求处理程序执行请求之前或之后,执行某些任务。 过滤器链(Filter Cha

  • 主要内容:1.maven仓库,2.过滤器,3.拦截器,4.监听器,5.实例化,6.测试,7.拦截器与过滤器的区别1.maven仓库 2.过滤器 过滤器的英文名称为 Filter, 是 Servlet 技术中最实用的技术。 如同它的名字一样,过滤器是处于客户端和服务器资源文件之间的一道过滤网,帮助我们过滤掉一些不符合要求的请求,通常用作 Session 校验,判断用户权限,如果不符合设定条件,则会被拦截到特殊的地址或者基于特殊的响应。 3.拦截器 Java中的拦截器是动态拦截 action 调用的

  • 问题内容: 我发现了很多关于如何创建使用像Spring框架登录定制方面的例子这还是这个,但没有发现这种情况和问题,standard/common Spring实现。是否有来自Spring的日志记录方面的任何标准实现? 问题答案: 签出CustomizableTraceInterceptor API,你可以定义带有几个占位符的单独的enter / exit / exception消息: -替换为被调