当前位置: 首页 > 知识库问答 >
问题:

使用apache水槽将数据流式传输到hbase

查学文
2023-03-14

我正在尝试使用apache flume将数据加载到hbase中。当我使用flume将数据传输到hadoop时,它工作得很好。但是当我启动flume代理将数据加载到hbase时,我得到了NoClassDefFoundError。

14/05/12 23:14:10 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:agent4.conf
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Added sinks: sink1 Agent: agent4
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Processing:sink1
14/05/12 23:14:10 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [agent4]
14/05/12 23:14:10 INFO node.AbstractConfigurationProvider: Creating channels
14/05/12 23:14:10 INFO channel.DefaultChannelFactory: Creating instance of channel channel1 type FILE
14/05/12 23:14:10 INFO node.AbstractConfigurationProvider: Created channel channel1
14/05/12 23:14:10 INFO source.DefaultSourceFactory: Creating instance of source source1, type exec
14/05/12 23:14:10 INFO sink.DefaultSinkFactory: Creating instance of sink: sink1, type: org.apache.flume.sink.hbase.HBaseSink
14/05/12 23:14:10 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
    at org.apache.flume.sink.hbase.HBaseSink.<init>(HBaseSink.java:102)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:374)
    at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 17 more

这是我的水槽配置:

flume-env.sh

JAVA_HOME=/usr
FLUME_CLASSPATH=/home/alpha/apache-flume-1.4.0-bin/lib
HBASE_CLASSPATH=/home/alpha/hbase-0.98.1/lib
HBASE_HOME=/home/alpha/hbase-0.98.1
FLUME_HOME=/home/alpha/apache-flume-1.4.0-bin

代理4.conf

# Name the components on this agent
agent4.sources = source1
agent4.sinks = sink1
agent4.channels = channel1

# Describe/configure source1
agent4.sources.source1.type = exec
agent4.sources.source1.command = tail -f /tmp/testGenerate.csv

# Describe sink1
agent4.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
agent4.sinks.sink1.table = AdreamLumiHB
agent4.sinks.sink1.columnFamily =lumiCF
agent4.sinks.sink1.batchSize=5000
agent4.sinks.sink1.serializer.regex = ^(\d+),([^,]+),([^,]+),([^,]+),([^,]+),([^,]+),.*
agent4.sinks.sink1.serializer.regexIgnoreCase = true
agent4.sinks.sink1.serializer.colNames = id,nom,valeur,batiment,etage,piece

# Use a channel which buffers events to a file
agent4.channels.channel1.type = FILE 
agent4.channels.channel1.transactionCapacity = 1000000 
agent4.channels.channel1.checkpointInterval 30000
agent4.channels.channel1.maxFileSize = 2146435071
agent4.channels.channel1.capacity 10000000 

# Bind the source and sink to the channel
agent4.sources.source1.channels = channel1
agent4.sinks.sink1.channel = channel1

共有2个答案

何安宜
2023-03-14

我建议将hbase home的/lib文件夹中的所有jar复制到flume的/lib文件夹中。这帮助我解决了这个问题。

连志义
2023-03-14

当您将hbase类路径附加到水槽时会发生什么?

FLUME_CLASSPATH=/home/alpha/apache-flume-1.4.0-bin/lib/\*:/home/alpha/hbase-0.98.1/lib/\*

注意:星号*前不要包括反斜杠。我把它放在那里是因为星号不会出现在这个编辑器上。

 类似资料:
  • 我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希

  • 使用 Spark 2 连接器从 CosmosDB 流式传输可以使用 Changefeed 实现。 https://docs.microsoft.com/en-us/azure/cosmos-db/spark-connector#streaming-reads-from-cosmos-db 我们如何在Spark 3中做同样的事情?我正在使用Cosmos DB Apache Spark 3联机事务处理

  • 问题内容: 我正在尝试使用流使用Hapi将数据发送到浏览器,但无法确定我们的方式。具体来说,我正在使用请求模块。根据文档,该对象接受流,所以我尝试了: 引发错误。在文档中说流对象必须与stream2兼容,所以我尝试了: 现在,这不会引发服务器端错误,但是在浏览器中,请求永远不会加载(使用chrome)。 然后我尝试了这个: 并且在控制台 中 输出了数据,所以我知道流不是问题,而是Hapi。我如何在

  • 我第一次尝试Kafka,并使用AWS MSK设置Kafka群集。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。 MySQL配置: 注册Mysql连接器后,其状态为“正在运行”,并捕获MySQL表中所做的更改,并以以下格式在消费者控制台中显示结果: 我的第一个问题:在表中“金额”列是“十

  • 问题内容: 我想使用elasticsearch-river-mysql以便将数据从MySQL数据库连续传输到ElasticSearch。我是ES和Rivers的初学者,所以希望您能为我的问题提供帮助。 据我所知,数据将从MySQL数据库流式传输到ES集群,后者将自动对其进行索引。那是对的吗?我需要了解任何超时或限制吗? 关系数据库表之间的外键关系将如何转换为ES?包含外键的表行是否将成为ES文档的

  • 我们摄取的数据可以使用Flume进行排序吗? 我设计了一个简单的多通道水槽代理,它将数据摄取到HDFS中的两个目录中。但我不知道的是,水槽是否支持在这两个通道之间进行排序。 到目前为止,我所假设的是,我的源将是一个假脱机目录。,每当我输入行(每行包含一个唯一的关键字),该行必须进入某个特定的通道。 有什么想法吗?