当前位置: 首页 > 知识库问答 >
问题:

为什么可选的水槽通道会导致非可选的水槽通道出现问题?

耿俊
2023-03-14

我有一个看似简单的水槽配置,却给我带来了很多问题。让我先描述一下问题,然后列出配置文件。

我有 3 台服务器:服务器 1、服务器 2、服务器 3。

Server1:Netcat源代码/S

Server2,3:Avro源内存通道Kafka接收器

在我的模拟中,服务器2模拟“生产”,因此不会出现任何数据丢失,而服务器3模拟“开发”,数据丢失是正常的。我的假设是,使用2个通道和2个源将使两个服务器相互解耦,如果服务器3宕机,它不会影响服务器2(尤其是使用可选的配置选项!).然而,事实并非如此。当我运行模拟并使用CTRL-C终止Server3时,我在Server2上体验到速度变慢,从Server2到Kafka sink的输出变成了爬行。当我在Server3上恢复Flume代理时,一切都恢复正常。

我没想到会有这种行为。我所期望的是,因为我有两个频道和两个接收器,如果一个频道和/或接收器坏了,另一个频道或接收器应该不会有问题。这是Flume的限制吗?这是源、汇或通道的限制吗?有没有一种方法可以让Flume在我使用一个代理与多个彼此解耦的通道和接收器的情况下运行?我真的不想为每个“环境”(生产和开发)在一台机器上有多个Flume代理。附件是我的配置文件,所以你可以用更技术的方式看到我做了什么:

服务器 1(第一层代理)

#Describe the top level configuration    
agent.sources = mySource
agent.channels = defaultChannel1 defaultChannel2
agent.sinks = mySink1 mySink2

#Describe/configure the source
agent.sources.mySource.type = netcat
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.ack-every-event = false
#agent.sources.mySource.type = syslogtcp
#agent.sources.mySource.host = 0.0.0.0
#agent.sources.mySource.port = 7103
#agent.sources.mySource.eventSize = 150000
agent.sources.mySource.channels = defaultChannel1 defaultChannel2
agent.sources.mySource.selector.type = replicating
agent.sources.mySource.selector.optional = defaultChannel2

#Describe/configure the channel
agent.channels.defaultChannel1.type = memory
agent.channels.defaultChannel1.capacity = 5000
agent.channels.defaultChannel1.transactionCapacity = 200

agent.channels.defaultChannel2.type = memory
agent.channels.defaultChannel2.capacity = 5000
agent.channels.defaultChannel2.transactionCapacity = 200

#Avro Sink
agent.sinks.mySink1.channel = defaultChannel1
agent.sinks.mySink1.type = avro
agent.sinks.mySink1.hostname = Server2
agent.sinks.mySink1.port = 6666

agent.sinks.mySink2.channel = defaultChannel2
agent.sinks.mySink2.type = avro
agent.sinks.mySink2.hostname = Server3
agent.sinks.mySink2.port = 6666

服务器2“生产”水槽代理

#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server2-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel

SERVER3“开发”FLUME代理

#Describe the top level configuration
agent.sources = mySource
agent.channels = defaultChannel
agent.sinks = mySink

#Describe/configure the source
agent.sources.mySource.type = avro
agent.sources.mySource.port = 6666
agent.sources.mySource.bind = 0.0.0.0
agent.sources.mySource.max-line-length = 150000
agent.sources.mySource.channels = defaultChannel

#Describe/configure the interceptor
agent.sources.mySource.interceptors = myInterceptor
agent.sources.mySource.interceptors.myInterceptor.type = myInterceptor$Builder

#Describe/configure the channel
agent.channels.defaultChannel.type = memory
agent.channels.defaultChannel.capacity = 5000
agent.channels.defaultChannel.transactionCapacity = 200

#Describe/configure the sink
agent.sinks.mySink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.mySink.topic = Server3-topic
agent.sinks.mySink.brokerList = broker1:9092, broker2:9092
agent.sinks.mySink.requiredAcks = -1
agent.sinks.mySink.batchSize = 100
agent.sinks.mySink.channel = defaultChannel 

谢谢你的帮助!

共有1个答案

廖招
2023-03-14

我会考虑调整这个配置参数,因为它与内存通道有关:

agent.channels.defaultChannel.capacity=5000agent.channels.defaultChannel.transaction容量=200

可能先尝试加倍,然后再次执行测试,您应该会看到改进:

代理.通道.默认通道.容量 = 10000 代理.通道.默认通道.事务容量 = 400

在测试期间观察Apache Flume实例的JVM也很好

 类似资料:
  • 我试图建立flume,这样每个代理可以有多个接收器,最终有多个通道和源(现在只看多个通道)。我有一个类似这样的配置文件和一个ruby模板。我不知道如何将功能添加到模板文件中,以便每个代理可以将一个事件发送到多个通道

  • 我正在使用Spool Directory作为源,HDFS作为接收器,文件作为通道。当执行水槽作业时。我得到了以下问题。内存通道工作正常。但是我们需要使用文件通道实现相同的功能。使用文件通道我得到了以下问题。 我已经在flume.env中将JVM内存大小配置为3GB。sh文件。请让我知道我们需要做的任何其他设置。 2016年1月20日20:05:27099错误[SinkRunnerPollingRu

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?

  • 我的要求是将数据发送到不同的ES接收器(基于数据)。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送到sink2等(基本上是根据数据动态发送到任何一个接收器)。我还想分别为ES sink1、ES sink2、ES sink3等设置并行度。 有什么简单的方法可以在flink中实现上述目标吗? 我的解决方案:(但并不满意) 我可以想出一个解决方案,但有中间Kafka主题,我写(topi

  • 我遇到了Flume的问题(Cloudera CDH 5.3上的1.5): 我想做的是:每5分钟,大约20个文件被推送到假脱机目录(从远程存储中抓取)。每个文件包含多行,每行是一个日志(在JSON中)。文件大小在10KB到1MB之间。 当我启动代理时,所有文件都被成功推送到HDFS。1分钟后(这是我在flume.conf中设置的),文件被滚动(删除. tmp后缀并关闭)。 但是,当在假脱机目录中找到