当前位置: 首页 > 知识库问答 >
问题:

使用水槽读取IBM MQ数据

蒯慈
2023-03-14

我想从IBM MQ中读取数据,并将其放入HDFs。

查看了 JMS 的水槽源,似乎它可以连接到 IBM MQ,但我不明白所需属性列表中的“destinationType”和“destinationName”是什么意思。有人可以解释一下吗?

还有,我应该如何配置我的水槽代理

flumeAgent1(在与MQ相同的机器上运行)读取MQ数据——flumeAgent2(在Hadoop集群上运行)写入Hdfs,或者在Hadoop集群上只有一个代理就足够了

有人可以帮助我了解如何将 MQ 与水槽集成吗

参考

https://flume.apache.org/FlumeUserGuide.html

谢谢,查亚

共有1个答案

麹权
2023-03-14

关于Flume代理体系结构,它是由一个负责接收或轮询事件的源以最简单的形式组成的,并将事件转换成放在通道中的Flume事件。然后,接收器接收这些事件,以便将数据保存在某个地方,或者将数据发送到另一个代理。所有这些组件(源、信道、接收器,即代理)都在同一台机器上运行。相反,可以分布不同的代理。

话虽如此,您的场景似乎需要基于JMS源、通道(通常是内存通道)和HDFS接收器的单个代理。

如文档中所述,JMS 源代码仅针对 ActiveMQ 进行了测试,但 shoukd 适用于任何其他队列系统。该文档还提供了一个示例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = jms
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
a1.sources.r1.connectionFactory = GenericConnectionFactory
a1.sources.r1.providerURL = tcp://mqserver:61616
a1.sources.r1.destinationName = BUSINESS_DATA
a1.sources.r1.destinationType = QUEUE

< code>a1是单个代理的名称。< code>c1是信道的名称,其配置仍必须完成;并且完全没有水槽配置。可以通过添加以下内容轻松完成:

a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = ...
a1.sinks.k1...
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1...

r1 是 JMS 源代码,可以看出,destinationName 只是要求一个字符串名称。destinationType 只能接受两个值:队列主题。我认为重要的参数是providerURLinitialContextFactoryconnectionFactory,它们必须适应IBM MQ。

 类似资料:
  • 我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希

  • 由于我不允许在生产服务器上设置 Flume,因此我必须下载日志,将它们放入 Flume spoolDir 中,并有一个接收器可以从通道中使用并写入 Cassandra。一切正常。 然而,由于我在spolDir中有很多日志文件,并且当前设置一次只处理1个文件,这需要一段时间。我希望能够同时处理许多文件。我想到的一种方法是使用spolDir,但将文件分发到5-10个不同的目录中,并定义多个源/通道/接

  • 当hdfs不可用时,是否有方法确保数据安全?场景是:kafka源,flume内存通道,hdfs接收器。如果水槽服务关闭了,它是否可以存储主题分区的偏移量,并在恢复后从正确的位置消费?

  • 我有阅读持续增长的问题。txt文件。我知道我可以从网上读到一些东西,比如说 但是如何用文本文件做呢?我应该传递什么而不是netcat?

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我可以用水槽获取推文,但是,流式传输的语言不是我想要的。下面是flume.conf文件 我收到的推文如下所示: 有人能建议我需要做的改变吗?