Flume入门——Selector、Chanel等

董子航
2023-12-01

1、selector

http://blog.csdn.net/looklook5/article/details/40430965

http://blog.csdn.net/xiao_jun_0820/article/details/38116103#

选择器可以工作在复制 多路复用(路由) 模式下    

    复制模式
        属性说明:
            selector.type replicating 类型名称,必须是 replicating 
            selector.optional    –    标志通道为可选


    多路复用(路由)模式
            属性说明:
                selector.type    类型,必须是"multiplexing"
                selector.header    指定要监测的头的名称     
                selector.default    –     
                selector.mapping.*    –
            举例:
                a1.sources = r1
                a1.channels = c1 c2 c3 c4
                a1.sources.r1.selector.type = multiplexing
                a1.sources.r1.selector.header = state
                a1.sources.r1.selector.mapping.CZ = c1
                a1.sources.r1.selector.mapping.US = c2 c3
                a1.sources.r1.selector.default = c4

8.Processor
    1.概述
        Sink Group允许用户将多个Sink组合成一个实体。
        Flume Sink Processor 可以通过切换组内Sink用来实现负载均衡的效果,或在一个Sink故障时切换到另一个Sink。

        sinks    –    用空格分隔的Sink集合
        processor.type    default    类型名称,必须是 default、failover 或 load_balance

    2.Default Sink Processor
        Default Sink Processor 只接受一个 Sink。
        不要求用户为单一Sink创建processor

    3.Failover Sink Processor
        Failover Sink Processor 维护一个sink们的优先表。确保只要一个是可用的就事件就可以被处理。
        失败处理原理是,为失效的sink指定一个冷却时间,在冷却时间到达后再重新使用。
        sink们可以被配置一个优先级,数字越大优先级越高。
        如果sink发送事件失败,则下一个最高优先级的sink将会尝试接着发送事件。
        如果没有指定优先级,则优先级顺序取决于sink们的配置顺序,先配置的默认优先级高于后配置的。
        在配置的过程中,设置一个group processor ,并且为每个sink都指定一个优先级。
        优先级必须是唯一的。
        另外可以设置maxpenalty属性指定限定失败时间。

        sinks    –    Space-separated list of sinks that are participating in the group
        processor.type    default    The component type name, needs to be failover
        processor.priority.<sinkName>    –    Priority value. <sinkName> must be one of the sink instances associated with the current sink group A higher priority value Sink gets activated earlier. A larger absolute value indicates higher priority
        processor.maxpenalty    30000    The maximum backoff period for the failed Sink (in millis)

        Example for agent named a1:
        ------
        a1.sinkgroups = g1
        a1.sinkgroups.g1.sinks = k1 k2
        a1.sinkgroups.g1.processor.type = failover
        a1.sinkgroups.g1.processor.priority.k1 = 5
        a1.sinkgroups.g1.processor.priority.k2 = 10
        a1.sinkgroups.g1.processor.maxpenalty = 10000
        ------
    
    4.Load balancing Sink Processor
        Load balancing Sink processor 提供了在多个sink之间实现负载均衡的能力。
        它维护了一个活动sink的索引列表。
        它支持轮询 或 随机方式的负载均衡,默认值是轮询方式,可以通过配置指定。
        也可以通过实现AbstractSinkSelector接口实现自定义的选择机制。

        !processor.sinks    –    Space-separated list of sinks that are participating in the group
        !processor.type    default    The component type name, needs to be load_balance
        processor.backoff    false    Should failed sinks be backed off exponentially.
        processor.selector    round_robin    Selection mechanism. Must be either round_robin, random or FQCN of custom class that inherits from AbstractSinkSelector
        processor.selector.maxTimeOut    30000    Used by backoff selectors to limit exponential backoff (in milliseconds)

        ------
        a1.sinkgroups = g1
        a1.sinkgroups.g1.sinks = k1 k2
        a1.sinkgroups.g1.processor.type = load_balance
        a1.sinkgroups.g1.processor.backoff = true
        a1.sinkgroups.g1.processor.selector = random
        ------    

9.Interceptors - 拦截器
    1.概述
        Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的。
        拦截器需要实现org.apache.flume.interceptor.Interceptor接口。
        拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件。
        拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截。
        一个拦截器返回的事件列表被传递给链中的下一个拦截器。
        如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可。
        如果要删除所有事件,只需返回一个空列表。
        
    2.Timestamp Interceptor
        这个拦截器在事件头中插入以毫秒为单位的当前处理时间。
        头的名字为timestamp,值为当前处理的时间戳。
        如果在之前已经有这个时间戳,则保留原有的时间戳。

        参数说明:
            !type    –    类型名称,必须是timestamp或自定义类的全路径名
            preserveExisting    false    如果时间戳已经存在是否保留

    3.Host Interceptor
        这个拦截器插入当前处理Agent的主机名或ip
        头的名字为host或配置的名称
        值是主机名或ip地址,基于配置。

        参数说明:
            !type    –    类型名称,必须是host
            preserveExisting    false    如果主机名已经存在是否保留
            useIP    true    如果配置为true则用IP,配置为false则用主机名
            hostHeader    host    加入头时使用的名称

    4.Static Interceptor
        此拦截器允许用户增加静态头信息使用静态的值到所有事件。
        目前的实现中不允许一次指定多个头。
        如果需要增加多个静态头可以指定多个Static interceptors
        属性说明:
            !type    –    类型,必须是static
            preserveExisting    true    如果配置头已经存在是否应该保留
            key    key    要增加的透明
            value    value    要增加的头值

    5.UUID Interceptor
        这个拦截器在所有事件头中增加一个全局一致性标志。
        其实就是UUID。

        属性说明:
            !type    –    类型名称,必须是org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
            headerName    id    头名称
            preserveExisting    true    如果头已经存在,是否保留
            prefix    “”    在UUID前拼接的字符串前缀
            

    6.Morphline  Interceptor


    7.Search and Replace Interceptor
        这个拦截器提供了简单的基于字符串的正则搜索和替换功能。

        属性说明:
            type    –    类型名称,必须是"search_replace"
            searchPattern    –    要搜索和替换的正则表达式
            replaceString    –    要替换为的字符串
            charset    UTF-8    字符集编码,默认utf-8

    8.Regex Filtering Interceptor
        此拦截器通过解析事件体去匹配给定正则表达式来筛选事件。
        所提供的正则表达式即可以用来包含或刨除事件。

        属性说明:
        !type    –    类型,必须设定为regex_filter
        regex    ”.*” 所要匹配的正则表达式
        excludeEvents    false    如果是true则刨除匹配的事件,false则包含匹配的事件。

    9.Regex Extractor Interceptor
        使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中。
        它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前。

        属性说明:
            !type    –    类型,必须是regex_extractor
            !regex    –    要匹配的正则表达式
            !serializers    –    Space-separated list of serializers for mapping matches to header names and serializing their values. (See example below) Flume provides built-in support for the following serializers: org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer
            serializers.<s1>.type    default    Must be default (org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer), org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer, or the FQCN of a custom class that implements org.apache.flume.interceptor.RegexExtractorInterceptorSerializer
            serializers.<s1>.name    –     
            serializers.*    –    Serializer-specific properties

        ----
        If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used

        a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
        a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
        a1.sources.r1.interceptors.i1.serializers.s1.name = one
        a1.sources.r1.interceptors.i1.serializers.s2.name = two
        a1.sources.r1.interceptors.i1.serializers.s3.name = three

        The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
        ----
        
10.channel
    !!!1.Memory Channel 内存通道
        事件将被存储在内存中的具有指定大小的队列中。
        非常适合那些需要高吞吐量但是失败是会丢失数据的场景下。

        属性说明:
            !type    –    类型,必须是“memory”
            capacity    100    事件存储在信道中的最大数量
            transactionCapacity    100    每个事务中的最大事件数
            keep-alive    3    添加或删除操作的超时时间
            byteCapacityBufferPercentage    20    Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
            byteCapacity    see description    Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.

        案例:参看入门案例
    2.JDBC Channel
        事件被持久存储在可靠的数据库中。目前支持嵌入式的Derby数据库。如果可恢复性非常的重要可以使用这种方式。

    !!!3.File Channel
        性能会比较低下,但是即使程序出错数据不会丢失
        属性说明:
            !type    –    类型,必须是“file”
            checkpointDir    ~/.flume/file-channel/checkpoint    检查点文件存放的位置
            useDualCheckpoints    false    Backup the checkpoint. If this is set to true, backupCheckpointDir must be set
            backupCheckpointDir    –    The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory
            dataDirs    ~/.flume/file-channel/data    逗号分隔的目录列表,用以存放日志文件。使用单独的磁盘上的多个目录可以提高文件通道效率。
            transactionCapacity    10000    The maximum size of transaction supported by the channel
            checkpointInterval    30000    Amount of time (in millis) between checkpoints
            maxFileSize    2146435071    一个日志文件的最大尺寸
            minimumRequiredSpace    524288000    Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value
            capacity    1000000    Maximum capacity of the channel
            keep-alive    3    Amount of time (in sec) to wait for a put operation
            use-log-replay-v1    false    Expert: Use old replay logic
            use-fast-replay    false    Expert: Replay without using queue
            checkpointOnClose    true    Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
            encryption.activeKey    –    Key name used to encrypt new data
            encryption.cipherProvider    –    Cipher provider type, supported types: AESCTRNOPADDING
            encryption.keyProvider    –    Key provider type, supported types: JCEKSFILE
            encryption.keyProvider.keyStoreFile    –    Path to the keystore file
            encrpytion.keyProvider.keyStorePasswordFile    –    Path to the keystore password file
            encryption.keyProvider.keys    –    List of all keys (e.g. history of the activeKey setting)
            encyption.keyProvider.keys.*.passwordFile    –    Path to the optional key password file
    !!!4.Spillable Memory Channel -- 内存溢出通道
        事件被存储在内存队列和磁盘中。
        内存队列作为主存储,而磁盘作为溢出内容的存储。
        内存存储通过embedded File channel来进行管理。
        当内存队列已满时,后续的事件将被存储在文件通道中。 
        这个通道适用于正常操作期间适用内存通道已期实现高效吞吐,而在高峰期间适用文件通道实现高耐受性。通过降低吞吐效率提高系统可耐受性。
        如果Agent崩溃,则只有存储在文件系统中的事件可以被恢复。
        此通道处于试验阶段,不建议在生产环境中使用。 

        属性说明:
        !type    –    类型,必须是"SPILLABLEMEMORY"
        memoryCapacity    10000    内存中存储事件的最大值,如果想要禁用内存缓冲区将此值设置为0。
        overflowCapacity    100000000    可以存储在磁盘中的事件数量最大值。设置为0可以禁用磁盘存储。 
        overflowTimeout    3    The number of seconds to wait before enabling disk overflow when memory fills up.
        byteCapacityBufferPercentage    20    Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.
        byteCapacity    see description    Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). Note that if you have multiple memory channels on a single JVM, and they happen to hold the same physical events (i.e. if you are using a replicating channel selector from a single source) then those event sizes may be double-counted for channel byteCapacity purposes. Setting this value to 0 will cause this value to fall back to a hard internal limit of about 200 GB.
        avgEventSize    500    Estimated average size of events, in bytes, going into the channel
        <file channel properties>    see file channel    Any file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

    5.自定义通道
        自定义渠道需要自己实现Channel接口。
        自定义Channle类及其依赖类必须在Flume启动前放置到类加载的目录下。

        参数说明:
            type - 自己实现的Channle类的全路径名称

 

 类似资料: