// Topology Class
public class SimpleTopology {
public static void main(String[] args) throws InterruptedException {
System.out.println("hiiiiiiiiiii");
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout());
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 2).setNumTasks(4).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(2);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("SimpleTopology", config, topologyBuilder.createTopology());
Thread.sleep(2000);
}
// Simple Bolt
public class SimpleBolt implements IRichBolt{
private OutputCollector outputCollector;
public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
this.outputCollector = oc;
}
public void execute(Tuple tuple) {
this.outputCollector.ack(tuple);
}
public void cleanup() {
// TODO
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
// TODO
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
// Simple Spout
public class SimpleSpout implements IRichSpout{
private SpoutOutputCollector spoutOutputCollector;
private boolean completed = false;
private static int i = 0;
public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
this.spoutOutputCollector = soc;
}
public void close() {
// Todo
}
public void activate() {
// Todo
}
public void deactivate() {
// Todo
}
public void nextTuple() {
if(!completed)
{
if(i < 100000)
{
String item = "Tag" + Integer.toString(i++);
System.out.println(item);
this.spoutOutputCollector.emit(new Values(item), item);
}
else
{
completed = true;
}
}
else
{
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
Logger.getLogger(SimpleSpout.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
public void ack(Object o) {
System.out.println("\n\n OK : " + o);
}
public void fail(Object o) {
System.out.println("\n\n Fail : " + o);
}
public void declareOutputFields(OutputFieldsDeclarer ofd) {
ofd.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
更新:有没有可能使用shuffle分组,相同的元组将被处理多次?使用的配置(喷口=4.螺栓=4),现在的问题是,随着螺栓数量的增加,性能下降。
您应该找出这里的瓶颈是什么--RabbitMQ还是Cassandra。打开Storm UI并查看每个组件的延迟时间。
如果增加并行性没有帮助(通常是应该的),那么RabbitMQ或Cassandra肯定有问题,所以您应该关注它们。
我需要用Storm处理成批的元组。我的最后一个bolt必须等到拓扑接收到整个批处理之后才能进行一些处理。为了避免混淆--对我来说,批处理是一组N条消息,它们是实时的,这个术语不需要与批处理(Hadoop)联系在一起。即使2条消息也可以是一批。 阅读Storm的文档是否可以说Storm不支持这种批处理(实时的批处理=N条消息)? 所以我的问题是给你们,我亲爱的Storm大师们,这个拓扑是不是设计得很
一、Storm 1.1 简介 Storm 是一个开源的分布式实时计算框架,可以以简单、可靠的方式进行大数据流的处理。通常用于实时分析,在线机器学习、持续计算、分布式 RPC、ETL 等场景。Storm 具有以下特点: 支持水平横向扩展; 具有高容错性,通过 ACK 机制每个消息都不丢失; 处理速度非常快,每个节点每秒能处理超过一百万个 tuples ; 易于设置和操作,并可以与任何编程语言一起使用
数据处理 可将字段的值进行处理得到最终结果 html标签过滤 内容替换 批量替换 关键词过滤 条件判断 截取字符串 翻译 工具箱 将文本链接标记为图片链接:如果字段的值是完整的url链接(非<img>标签内的链接),可将链接识别为图片 使用函数 调用接口
我遇到了一些数据,我想用许多不同的方式对它进行排序,例如按购买最多的最便宜的产品进行排序。我想一行一行地对文档进行分组,因为每行包含另一个“项目”。我附上了一张图片供参考。我更喜欢使用Java,但如果有必要,我会学习R。我是否手动将每行编码为数组?有400个项目,如果这是唯一的方法,我可以将其分成几天。 样品
Data Preparation You must pre-process your raw data before you model your problem. The specific preparation may depend on the data that you have available and the machine learning algorithms you want
在输入的JSON数据中,v的值越高,粒子越亮,并且它们从出发国家到目的国家的运行越快。 (请查阅Michael Chang的文章来 了解他是如何提出这个想法的)。Gio.js库会自动缩放输入数据的范围以便于更好的数据可视化。作为开发人员,您还可以定义自己的预处理数据的方式。