当前位置: 首页 > 知识库问答 >
问题:

Flume Twitter流媒体问题

邢璞
2023-03-14

我正在使用Flume 1.6.0-cdh5.9.1使用Twitter源流式传输推文。

配置文件如下所示:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken = xxxxxxxxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxxxxxxxx
TwitterAgent.sources.Twitter.keywords = hadoop, cloudera

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/cloudera/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 1000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

对于Cloudera. jar依赖项,我使用Maven使用以下依赖项构建了flume-sources-1.0-SNAPSHOT.jar

<dependencies>
    <!-- For the Twitter API -->
    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.6</version>
    </dependency>

    <!-- Hadoop Dependencies -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.6.0-cdh5.9.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-sdk</artifactId>
        <version>1.6.0-cdh5.9.1</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.9.1</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

现在,当我运行Flume Agent时,它成功启动,连接到Twitter,但在最后一行(接收状态流)后停止:

2017-02-08 21:55:12,556 (Twitter Stream consumer-1[initializing]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Establishing connection.
2017-02-08 21:55:46,474 (Twitter Stream consumer-1[Establishing connection]) [INFO -    twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Connection established.
2017-02-08 21:55:46,474 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Receiving status stream.

在最后一行之后什么都没有发生。它不会终止,不会流式传输任何东西。我看了一下HDFS位置,那里没有创建任何东西。

有人能在这里帮我吗?

共有1个答案

牛经赋
2023-03-14

该问题存在于配置< code > Twitter agent . sources . Twitter . keywords 中

Twitter Source将正常工作,只要它在Firehose中找到数据,它就会不断拉推文。我尝试了其他一些最近流行的关键字,它工作得非常好。

 类似资料:
  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp

  • 它没有任何错误,我得到以下错误时,我运行火花提交,任何帮助都非常感谢。谢谢你抽出时间。 线程“main”java.lang.noClassDeffounderror:org/apache/spark/streaming/kafka/kafkautils在kafkasparkstreaming.sparkstreamingtest(kafkasparkstreaming.java:40)在kafka

  • Streaming API用于通过令牌读取JSON令牌。 它将JSON内容读写为离散事件。 JsonReader和JsonWriter将数据读/写为令牌,称为JsonToken 。 它是处理JSON的三种方法中最强大的方法。 它具有最低的开销,并且在读/写操作中非常快。 它类似于XML的Stax解析器。 在本章中,我们将展示使用GSON流API来读取JSON数据。 Streaming API与to

  • 我试图使用Apache Flume从Twitter获取一些数据,然后存储在HDFS,但是我遇到了一些问题 这是我的< code>flume-env.sh 这是我的 我正在运行这个命令 但我有这个例外: 有人可以帮助我吗?我尝试了搜索解决方案,但没有任何解决我的问题

  • 收听电台广播的流媒体直播,还可以录制广播。 作者说:有问题欢迎和我QQ信箱交流:10040142@qq.com [Code4App.com]

  • 我有一个Java应用程序午餐一个flink工作来处理Kafka流。