当前位置: 首页 > 知识库问答 >
问题:

推特流媒体是一个问题

姬烨磊
2023-03-14

我试图使用Apache Flume从Twitter获取一些数据,然后存储在HDFS,但是我遇到了一些问题

这是我的< code>flume-env.sh

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
$JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"
FLUME_CLASSPATH="/home/vineasouza/apache-flume-1.9.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar" 

这是我的twitter.conf

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS

# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = #
TwitterAgent.sources.Twitter.consumerSecret = # 
TwitterAgent.sources.Twitter.accessToken = #
TwitterAgent.sources.Twitter.accessTokenSecret = #
TwitterAgent.sources.Twitter.keywords = brasil

# Describing/Configuring the sink 
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://0.0.0.0:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 10000

# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

我正在运行这个命令

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent

但我有这个例外:

2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] User-Agent: twitter4j http://twitter4j.org/ /3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection: close
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-Version: 3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-URL: http://twitter4j.org/en/twitter4j-3.0.3.xml
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Accept-Encoding: gzip
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client: Twitter4J
Exception in thread "Twitter Stream consumer-1[Establishing connection]" java.lang.OutOfMemoryError: Java heap space
    at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:239)
    at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
    at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
    at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200)
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:614)
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:569)
    at java.base/sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:740)
    at java.base/sun.security.util.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:222)
    at java.base/java.security.KeyStore.load(KeyStore.java:1479)
    at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:62)
    at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:53)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/sun.security.util.AnchorCertificates.<clinit>(AnchorCertificates.java:53)
    at java.base/sun.security.provider.certpath.AlgorithmChecker.checkFingerprint(AlgorithmChecker.java:214)
    at java.base/sun.security.provider.certpath.AlgorithmChecker.<init>(AlgorithmChecker.java:164)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:181)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:145)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(PKIXCertPathValidator.java:84)
    at java.base/java.security.cert.CertPathValidator.validate(CertPathValidator.java:309)
    at java.base/sun.security.validator.PKIXValidator.doValidate(PKIXValidator.java:364)
    at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:275)
    at java.base/sun.security.validator.Validator.validate(Validator.java:264)
    at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
    at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
    at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:183)

有人可以帮助我吗?我尝试了搜索解决方案,但没有任何解决我的问题

共有1个答案

融宏伟
2023-03-14

修改flume下的conf/flume-env.sh文件的问题已解决:

export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"
 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 我正在使用Flume 1.6.0-cdh5.9.1使用Twitter源流式传输推文。 配置文件如下所示: 对于Cloudera. jar依赖项,我使用Maven使用以下依赖项构建了: 现在,当我运行Flume Agent时,它成功启动,连接到Twitter,但在最后一行(接收状态流)后停止: 在最后一行之后什么都没有发生。它不会终止,不会流式传输任何东西。我看了一下HDFS位置,那里没有创建任何东

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • 这是因为检查点只存储了其中一个数据流的偏移量吗?浏览Spark结构流文档,似乎可以在Spark 2.2或>中进行流源的联接/联合

  • 我是一个初学者,试图使用spark streaming获得推文,使用Scala和一些过滤器关键字。是否有可能在流媒体之后只过滤那些没有地理定位为Null的推文?我正在尝试保存ElasticSearch中的推文。所以,在将tweet地图保存到ElasticSearch之前,我可以过滤那些带有地理定位信息的地图,然后保存它们吗?我正在使用json4s.jsondsl和tweet中的字段创建JSON。这

  • Streaming API用于通过令牌读取JSON令牌。 它将JSON内容读写为离散事件。 JsonReader和JsonWriter将数据读/写为令牌,称为JsonToken 。 它是处理JSON的三种方法中最强大的方法。 它具有最低的开销,并且在读/写操作中非常快。 它类似于XML的Stax解析器。 在本章中,我们将展示使用GSON流API来读取JSON数据。 Streaming API与to