当前位置: 首页 > 知识库问答 >
问题:

自定义拦截器不适用于Apache Flume

华知
2023-03-14

我有一个Flume组件在监听Syslog流。我做了一个自定义的拦截器来修改调用,但它不起作用。我做错了什么?谢谢你,Andrea

拦截器是一个编译良好的JAR文件,位于@FLUME_HOME/bin目录中

package com.test.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.conf.Configurable;
import org.apache.flume.interceptor.Interceptor;

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class SQLFlumeInterceptor implements Interceptor {

    private final String headerKey;

    private SQLFlumeInterceptor(Context ctx) {            
    }

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {            
        addPreposition(event);
        return event;
    }

    private void addPreposition(Event event) {            
        System.out.println("Event processed");
        event.setBody( "Modified Event".getBytes() );
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext(); ) {

            Event next = iterator.next();
            intercept(next);

            if(next == null) {
            iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class CounterInterceptorBuilder implements Interceptor.Builder {

        private Context ctx;

        @Override
        public Interceptor build() {
            return new SQLFlumeInterceptor(ctx);
        }

        @Override
        public void configure(Context context) {
        this.ctx = context;
        }

  }
# Name the components on this agent
a1.sources = r1
a1.sinks = file-sink
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 41414
a1.sources.r1.host = 192.168.1.2
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.test.flume.SQLFlumeInterceptor$CounterInterceptorBuilder

# Describe the FILE_ROLLsink
a1.sinks.file-sink.type = FILE_ROLL
a1.sinks.file-sink.sink.directory = /opt/apache-flume-1.5.2-bin/logs/pluto.log
a1.sinks.file-sink.sink.rollInterval = 0
ai.sinks.file-sink.batchSize = 100
ai.sinks.file-sink.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.file-sink.channel = c1

系统将事件记录在文件中而不修改它们,这是相关的DEBUG日志:

2015-04-27 21:39:17,625 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: a1, initial-configuration: AgentConfiguration[a1]
SOURCES: {r1={ parameters:{port=41414, host=192.168.1.2, interceptors=i1, interceptors.i1.type=com.test.flume.
SQLFlumeInterceptor$CounterInterceptorBuilder, channels=c1, type=syslogtcp} }}
CHANNELS: {c1={ parameters:{transactionCapacity=100, capacity=1000, type=memory} }}
SINKS: {file-sink={ parameters:{sink.rollInterval=0, type=FILE_ROLL, channel=c1, sink.directory=/opt/apache-flume-1.5.2-bin/logs/pluto.log} }}

共有1个答案

华坚成
2023-03-14

请将您的拦截器 JAR 文件放在 @FLUME_HOME/lib 目录中,而不是 @FLUME_HOME/bin 中。

否则水槽将不会加载JAR。

 类似资料:
  • 我正在使用Apache CXF开发REST服务。我正在使用Spring3.1注释来连接bean。我编写了一个拦截器,它截取我的REST方法以进行监视。要做到这一点,我必须自动连接作为库添加到项目中的Monitor类@自动连线在这种情况下似乎不起作用,导致NPE。我做错什么了吗? 应用上下文:

  • 我用Spring framework 4.0.2 Spring MVC JavaMelody构建了我的应用程序,用于监控应用程序性能。使用LoginInterceptor,除javamelody dashboard:mydomain外,没有登录会话的其他所有请求URL都将重定向到登录页面。com/监控;我需要用过滤器替换拦截器吗?因为RequestParameterFilter中设置的断点可以很好

  • null 我尝试将@priority(interceptor.priority.platform_beform)和@prematching也放入我的过滤器中,但即使是在OIDC启动后也会调用。 另外,是否有任何方法支持扩展quarkus oidc逻辑以包括自定义代码? 我无法获得oidc和keycloak-auth拦截器的优先级(知道这些可以帮助我决定过滤器的优先级)。请帮忙。

  • 问题内容: 我正在使用Java EE 6和Jboss AS7.1,并尝试使用拦截器绑定(来自jboss网站的示例)。 我有一个InterceptorBinding注解: 拦截器: 还有一个豆: 但是拦截器没有被称为。。。 在编写此代码时将调用拦截器: 谢谢你的帮助。 问题答案: 您是否按照参考示例中的说明启用了拦截器? 缺省情况下,bean档案没有通过拦截器绑定绑定的已启用拦截器。必须通过将侦听器

  • 我想找到一种使用JTA事务注释应用自定义拦截器的方法。 我有一个处理业务事务的方法。在这种方法中,我想: 执行一些数据库操作 使用云消息服务发布一些主题 如果其中任何一项失败,则不应执行这两项操作(即,应回滚)。 目前我使用Google Cloud pubsub作为消息服务,但该库似乎与JMS或JTA不兼容。因此,我想知道我是否可以为该库实现自定义拦截器(例如,在事务期间对消息进行排队,并在事务成

  • 我想通过一个自定义的泛型unapply函数压缩我的计算器,该函数计算参数并在成功时返回值。 但是这失败了,错误 有什么方法可以实现这一点吗?我已经研究了类型标签,不适用方法的隐式转换,但我不知道如何将它们集成到这个问题中。如何正确定义Eval?