当前位置: 首页 > 知识库问答 >
问题:

如何在不指定来源的情况下使用Flume的Kafka Channel

辛龙野
2023-03-14

我有一个现有的 Kafka 主题和一个从那里读取并写入 HDFS 的 flume 代理。我想重新配置我的水槽代理,以便它脱离现有设置;一个 Kafka 源,文件 Channel 到 HDFS Sink,以使用 Kafka 通道。

我在cloudera文档中读到,仅使用Kafka通道和HDFS接收器(没有水槽源)就可以实现这一目标。(除非我弄错了棍子的一端。所以我尝试创建此配置,但它不起作用。它甚至没有在盒子上启动水槽过程。

# Test
test.channels = kafka-channel
test.sinks = hdfs-sink

test.channels.kafka-channel.type = 
org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false

test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8082/data/test/

我正在使用:

  • HDP快速启动VM 2.6.3
  • 水槽版本1.5.2
  • HDFS目录不存在
  • ps-ef|grep-flume仅在我添加了一个kafka源之后返回一个进程,但这是不对的,因为这样做会为发布到主题上的任何消息创建一个无限循环

是否可以只使用Kafka频道和HDFS接收器,或者我是否需要使用Kafkasource但更改一些其他配置,以防止消息无限循环?

< code >Kafka-源 -

共有2个答案

顾乐心
2023-03-14

这并不能直接回答你关于Flume的问题,但总的来说,由于你已经在使用Apache Kafka,所以最好使用Kafka Connect(它是Apache Kafka的一部分)来解决这种模式。有一个Kafka Connect HDFS连接器,根据本指南,使用简单。

申屠喜
2023-03-14

挖掘了一下之后,我注意到Ambari没有为指定的代理创建任何flume conf文件。Ambari似乎只在我指定< code > test . sources = Kafka-source 时创建/更新flume配置。一旦我将它添加到flume配置中(通过ambari ),配置就在机器上创建了,flume代理成功启动。

最终的水槽配置如下所示:

test.sources=kafka-source
test.channels = kafka-channel
test.sinks = hdfs-sink

test.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false

test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs:///data/test

请注意,我没有在源上设置任何属性(这将导致我在问题中提到的无限循环问题),它只是需要被提及,因此Ambari创建flume配置并启动代理。

 类似资料:
  • 问题内容: 在Mongoose的早期版本(针对node.js)中,可以选择使用它而不定义架构 但是在当前版本中,“ noSchema”功能已被删除。我的架构可能会经常更改,并且实际上不适合已定义的架构,因此,是否有一种新的方式在猫鼬中使用无架构的模型? 问题答案: 我想这就是您要寻找猫鼬严密的东西 选项:严格 严格选项(默认情况下启用)可确保未在架构中指定的添加到模型实例的值不会保存到数据库中。

  • 问题内容: 我有一个定义自己的枚举的类,如下所示: 如果指定MyEnum.E1,它可以正常工作,但我真的很想将其作为“ E1”。您知道如何实现此目的,还是必须在另一个文件中定义它才能起作用? 结论:我无法正确获取导入的语法。由于有几个答案表明这是可能的,因此我将选择一个为我提供所需语法的文件,并对其他文件进行投票。 顺便说一下,这是一个真正的STRANGE部分(在我执行静态导入之前),我编写的使用

  • 问题内容: 我想遍历具有以下结构的数据快照: 具体来说,我正在做的是获取Data / Images子级的快照,因此这将是具有自动生成的引用的三个条目,例如, 因此我无法访问特定条目来检索作者的地址,因此我做了我的应用程式 而且我不确定如何遍历此快照的每个条目并在不知道父引用的情况下检索条目的信息。()? 感谢克里斯,感谢他向我发送了正确的搜索路径,但仍然无法构建正确的语法来实现我的目标。 现在,我

  • 问题内容: 我有一个带有GET处理程序的简单控制器,该处理程序接受一个对象来绑定请求参数: 这是一个简单的POJO类: 一切正常,但我想摆脱设置者,使该对象不可变为公共使用。在Spring 5.0.2 之前的处理程序方法文档中,我们了解到可能的有效方法参数是: 命令或表单对象将请求参数绑定到Bean属性(通过设置器) 或直接 绑定 到字段 是否可以以某种方式覆盖默认的Spring Boot配置,以

  • 问题内容: 如果我要使用DefaultServeMux(我将其指定为ListenAndServe的第二个参数来指定),那么我可以访问,您可以在Go Wiki的以下示例中看到该: 在当前代码中,我无法使用DefaultServeMux,即我将自定义处理程序传递给ListenAndServe 因此,我没有内置的代码。但是,我必须将一些授权代码修改为需要类似的授权代码。例如,如果我一直在使用Defaul

  • 我有一个具有两个属性的dynamoDB表: A: 主分区键 B: 主排序键 我想使用属性B查询这个表,因为我不知道A的值。可以这样做吗? 是否可以将B设为GSI(全局二级索引),如何使用B查询表,因为B已经是排序键。