由于一个我无法摆脱的错误,我无法使用Flume将Twitter数据拉入HDFS。
命令:
bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent
控制台:
2020-12-14 11:38:08,662 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:154)] Unhandled error
java.lang.NoSuchMethodError: 'boolean twitter4j.conf.Configuration.isStallWarningsEnabled()'
at twitter4j.TwitterStreamImpl.<init>(TwitterStreamImpl.java:60)
at twitter4j.TwitterStreamFactory.<clinit>(TwitterStreamFactory.java:40)
at org.apache.flume.source.twitter.TwitterSource.configure(TwitterSource.java:110)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:325)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
flume-env.sh:我手动将flume-sources-1.0-SNAPSHOT.jar添加到flume/lib中。
export JAVA_HOME=/usr/lib/jvm/default-java
export JAVA_OPTS="-Xms500m -Xmx2000m -Dcom.sun.management.jmxremote"
# export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
FLUME_CLASSPATH="/home/jb/flume/lib/flume-sources-1.0-SNAPSHOT.jar"
twitter.conf:
# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = xxx
TwitterAgent.sources.Twitter.consumerSecret = xxx
TwitterAgent.sources.Twitter.accessToken = xxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxx
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.minBlockReplicas = 1
# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 100
TwitterAgent.channels.MemChannel.transactionCapacity = 100
# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel
操作系统:Ubuntu Flume:v1.9.0 Hadoop:v3.3.0
我设法让它工作。对于那些想知道的人,请阅读这篇文章。
首先,改变水槽版本。我现在使用https://flume.apache.org/releases/1.7.0.html.水槽1.7.0,但也许一个更新的版本会工作,我不想打破它:)
其次,克隆这个存储库https://github.com/cloudera/cdh-twitter-example.里面,有一个flume.conf文件。我是这样配置的:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xx
TwitterAgent.sources.Twitter.consumerSecret = xx
TwitterAgent.sources.Twitter.accessToken = xx
TwitterAgent.sources.Twitter.accessTokenSecret = xx
TwitterAgent.sources.Twitter.keywords = hadoop, bigdata
TwitterAgent.sources.Twitter.locations = -54.5247541978, 2.05338918702, 9.56001631027, 51.1485061713
TwitterAgent.sources.Twitter.language = fr
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/%Y/%m/%d/%H/
#It specifies the File format. File formats that are currently supported are SequenceFile, DataStream or CompressedStream.
#The DataStream will not compress the output file and please don’t set codeC. The CompressedStream requires set hdfs.codeC with an available codeC
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
# It specifies the suffix to append to file. For eg, .avro
TwitterAgent.sinks.HDFS.hdfs.fileSuffix = .json
#It specifies the number of events written to file before it is flushed to HDFS.
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10000
# It specifies the file size to trigger roll, in bytes. If it is equal to 0 then it means never roll based on file size.
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
#It specifies the number of events written to the file before it rolled. If it is equal to 0 then it means never roll based on the number of events.
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0
#It specifies the number of seconds to wait before rolling the current file. If it is equal to 0 then it means never roll based on the time interval.
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 60
TwitterAgent.sinks.HDFS.hdfs.callTimeout = 180000
TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000
然后,修改pom.xml(版本):
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
用maven打包它
cd flume-sources
mvn package
它创建了一个目标/flume-sources-1.0-snapshot . jar文件,将其复制到您的
cp ./target/flume-sources-1.0-SNAPSHOT.jar ~/flume/lib
我更改了我之前显示的文件中的类路径:
FLUME_CLASSPATH="/home/jb/flume/lib/flume-sources-1.0-SNAPSHOT.jar"
复制我们刚刚写入的conf/flume.conf
第三,验证lib/ twitter4j-core.jar、media-support.jar et stream.jar是否在3.0.3版本。如果没有,去下载它们。
最后:
cd $FLUME_HOME
bin/flume-ng agent --conf ./conf/ -f ./conf/flume.conf -Dflume.root.logger=INFO,console -n TwitterAgent
哈利路亚 :
2020-12-18 02:48:38,805 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 100 docs
2020-12-18 02:48:40,777 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 200 docs
2020-12-18 02:48:42,017 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 300 docs
2020-12-18 02:48:44,772 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 400 docs
2020-12-18 02:48:46,779 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 500 docs
2020-12-18 02:48:47,875 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 600 docs
2020-12-18 02:48:49,852 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 700 docs
2020-12-18 02:48:52,789 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 800 docs
2020-12-18 02:48:54,791 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 900 docs
2020-12-18 02:48:56,805 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.onStatus(TwitterSource.java:173)] Processed 1 000 docs
2020-12-18 02:48:56,805 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:295)] Total docs indexed: 1 000, total skipped docs: 0
2020-12-18 02:48:56,805 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:297)] 47 docs/second
2020-12-18 02:48:56,805 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:299)] Run took 21 seconds and processed:
2020-12-18 02:48:56,806 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:301)] 0,013 MB/sec sent to index
2020-12-18 02:48:56,807 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:303)] 0,266 MB text sent to index
2020-12-18 02:48:56,807 (Twitter4J Async Dispatcher[0]) [INFO - org.apache.flume.source.twitter.TwitterSource.logStats(TwitterSource.java:305)] There were 0 exceptions ignored:
我正在对php页面进行调整,希望最终用户能够从下拉列表中选择“姓名和电子邮件”。数据将来自mySQL数据库。 我已经设法让它部分工作,但是必须有东西做一个"验证",因为当页面上的所有数据都被选中,最终用户试图提交页面时,最终用户会收到电子邮件地址是无效,而事实上它是100%有效的。 与此相关,我更改的部分是,最终用户过去必须手动键入他们的电子邮件地址,现在他们可以从下拉列表中选择,但现在它告诉我们
有人能帮我解决这个问题吗? 谢谢:)
请在Access数据库中插入数据时遇到错误代码。它一直说我的INSERT INTO语句中存在sytanx错误。有人能帮我解决这个问题吗?这是代码
问题内容: 今天,我们发现我的一位客户的Twitter提要已损坏。 我尝试切换为使用新的API 1.1,但出现以下错误: 即使使用自己的示例也会产生相同的响应: 我正在参考以下文档。 知道这是怎么回事吗? 谢谢,Mikey 问题答案: 因此,似乎Twitter的最新API 1.1 不允许 未经身份验证的访问-即使是看似公开的数据也是如此……就像时间轴上的最新3条推文一样。 我在此找到的最好的文章(
问题内容: 使用JDBC驱动程序从Java中的oracle中读取oracle阿拉伯字符时遇到问题,主要问题是我找不到正确的字符编码来获取正确的数据,但是我使用此方法手动解决了问题: 此方法可以给我正确的字符,如数据库中显示的那样,但是当我尝试更新/插入阿拉伯数据时,它将保存错误的字符。例如:我的文本在数据库中另存为“ ?????????” 而不是“مرحبا”。 这是我连接到oracle数据库的方
我对将大型集合插入cassandra数据库的最快方法有点困惑。我了解到我不应该使用批量插入,因为它是为原子性而创建的。甚至Cassandra也给了我一个信息,让我使用异步写来提高性能。我使用了没有“batch”关键字的最快插入代码: 出发地:https://medium.com/@fondev/cassandra-batch-loading-non-the-batch-keyword-40f00e