当前位置: 首页 > 知识库问答 >
问题:

将Storm-wordCount拓扑转换为使用Kafka喷口

樊宏义
2023-03-14

我是Storm和Kafka的新手,我可以在一段时间后在本地虚拟机上安装它们。我目前有一个有效的wordCount拓扑,从dropBox文本文件中提取句子:

public void nextTuple() {


 final String APP_KEY = "XXXX";
final String APP_SECRET = "XXXX";
DbxAppInfo appInfo = new DbxAppInfo(APP_KEY, APP_SECRET);
DbxRequestConfig config = new DbxRequestConfig("StormTopology/1.0", Locale.getDefault().toString());
String accessToken = "XXXXXXXX";
DbxClient client = new DbxClient(config, accessToken);
String sentence="";
try {FileOutputStream outputStream = new FileOutputStream("fromSpout.txt"); 
try {
    //client.delete("/*.txt");
   DbxEntry.File downloadedFile = client.getFile("/spout.txt", null,outputStream);

   sentence= readFile("fromSpout.txt");          
   if (sentence==null || sentence == "" || sentence == " " || sentence == "/t") {
           Utils.sleep(1000);
           return;
           }                    
        } 
catch (DbxException ex) {  } 
catch (IOException ex) { }       
        //return 1;
finally {
      outputStream.close();
         }
    }
catch (FileNotFoundException ex){}
catch (IOException ex) {}       


if (sentence.length()<2) {  Utils.sleep(10000);  return; }
try { client.delete("/spout.txt");}
 catch (DbxException ex) {  } 
_collector.emit(new Values(sentence)); 
Utils.sleep(1000);      

现在我想升级我的喷口,使用Kafka的文本,以便在拓扑结构中提交到我的下一个螺栓。我试图在git中遵循许多文章和代码,但没有任何成功。例如:这个Kafka喷口。谁能帮助我,给我一些方向,以便实现新的spout.java文件?谢谢你!

共有1个答案

拓拔嘉颖
2023-03-14

从Storm0.9.2发布,有一个外部StormKafka包可以做到这一点。实际上,这个包是从暴风-kafka-0.8-plus贡献回暴风社区的。并且有一个测试项目展示了它的使用情况。

在细节中,首先添加依赖到你的maven(或lein/gradle):

groupId: org.apache.storm
artifactId: storm-kafka
version: 0.9.2-incubating

然后像这样创建拓扑和喷口:

import storm.kafka

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
 类似资料:
  • 我对Apache Storm的性能有一个问题,主要是从喷口出来的。 我有一个从kestrel队列发出项目的拓扑。我获取大约2000个项目,每次在喷注中调用时,我都会发出一个。 我正在使用1个spout任务和1个spout执行器运行。我已将设置为10。 为什么每次调用之间有这么大的时间间隔?outputCollector在发出一个新元组之前是否正在等待听到每个元组的反馈? 我正在运行Java8和st

  • 我们有一个不想连续运行storm拓扑的用例。相反,有一组输入(10K+)应该在指定的时间被处理,Spout连续发射这些输入,并得到拓扑中其余螺栓的处理。处理完所有输入后,在我的喷注中就没有任何东西可以从nextTuple发出。 此时,我们希望拓扑进入Hibernate状态,并在每天晚上12:00重新启动进程。 在storm配置中是否有任何属性可以设置为每天运行一次拓扑并在处理完成后Hibernat

  • 在新版本的Storm(0.9.2)有一个内置的Kafka喷口基于https://github.com/apache/incubator-storm/tree/v0.9.2-incubating有很多例子如何使用这个Kafka喷口在网上,但看起来像是有什么改变未知的声明。 例如: 编译器无法识别“StaticHosts”,Kafkanconfig类不包含它。

  • 我创建了一个带有Spout的Storm拓扑,该Spout会发出许多元组用于基准测试。一旦所有的元组都从spout发出或者拓扑中不再有任何元组流动,我就想停止/终止我的拓扑。

  • 我正在尝试使用Eclipse在Linux中运行Storm启动示例。我收到以下错误和函数从未被调用。 错误: 我的拓扑类: 我正在虚拟机环境中工作,所以不知道这是否是由于安装了Zookeeper。有什么想法吗?

  • 8台机器一直在使用。每一个都有22个核心和512 GB的RAM。但是,我们的代码运行得真的很慢。传输600万个数据需要10分钟才能完成。 60个文件中的10 MB在一秒钟内传输到HDFS。我们正在努力优化我们的代码,但很明显我们做了一些非常错误的事情。 对于蜂巢表,我们有64个桶。 在HDFS喷口;.setmaxextending(50000); 在蜂巢喷口选项;.WithTxNsperBatch