一般:我是一个想在Storm/Kafka/Flink/MS Azure SA/Spark上运行一些性能测试(WordCount)的学生。我想使用Kafka经纪人作为输入源。
我使用了Storm-Starter项目中的WordCount示例,并添加了Kafka作为喷口:
public class WordCountKafkaTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
String zkIp = "localhost";
String topicName = "perfTest";
List<String> nimbus_seeds = new ArrayList<String>();
nimbus_seeds.add("localhost");
String zookeeperHost = zkIp +":2181";
ZkHosts zkHosts = new ZkHosts(zookeeperHost);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, "/" + topicName, topicName);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaPerfTestSpout", kafkaSpout, 8);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("kafkaPerfTestSpout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setMaxTaskParallelism(5);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
config.put(Config.NIMBUS_SEEDS, nimbus_seeds);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp));
try {
StormSubmitter.submitTopology("my-kafka-topology", config, builder.createTopology());
} catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology", e);
}
}
}
我使用kafka-console-producer生成一些消息。我希望有人能帮助我。我是编程Storm的新手...
正在删除“config.put(config.topology_tick_tuple_freq_secs,2);”完成任务了!
在我的storm拓扑(有2个喷口和1个bolt)中,其中一个kafka喷口使用者的偏移量正在前进,但MSG没有通过kafka喷口发送到bolt。我可以在storm ui中看到,对于那个特定的喷口,发出和传送的消息是0。所以,我的问题是为什么消费者在前进,我可以看到消费者从zookeeper客户端的抵消逐渐增加。
我正在用Apache Storm 1.1.2和Kafka0.11在Java9中构建一个Spring应用程序 我注意到,在高负载(每秒2500条消息)下,Kafka喷口有一个非常高的滞后。Kafka喷口有一个平行性提示3。滞后几乎等于喷口提交的偏移。 这个滞后设置了拓扑每秒可以摄取的最大消息量的上限,这并不是很大。有人知道解决这个问题的办法吗? 更新:我还注意到,即使有10个工作者和4个并行性提示,
我正试图在按下某个按钮时弹出一个警报对话框。我首先使用了Android Developer的示例代码而不是'这不起作用,所以我根据在这个站点上发现的情况进行了更改,但是现在我的程序在按下按钮后被迫停止。 就你的知识而言,这是在第二个不同于主要的活动中完成的。不确定这是否重要.... ‘ 碰撞日志:“03-25 19:34:24.373:E/AndroidRuntime(18828):致命异常:ma
2,错误{org.apache.directory.server.LDAP.ldapserver}-ERR_171无法将LDAP服务(10,389)绑定到服务注册表。java.net.BindException:已在使用的地址 请帮忙谢谢 --------提示------------------- JAVA_HOME环境变量设置为/opt/java CARBON_HOME环境变量设置为/mnt/1
我正在做一个类似生存的游戏,我有两种类型的碰撞,一种是玩家的敌人,另一种是敌人身上的子弹。我也有一个健康栏,由于某些原因,在picbox被移除后,健康仍然下降,就像敌人与玩家互动一样。 这是子弹碰撞代码的一个块(所有8个方向的所有代码都是相同的) 这是敌方与玩家碰撞的暗号
我正在使用Storm 1.1.2和Kafka 0.11构建一个Java Spring应用程序,将在Docker容器中启动。 我的拓扑中的所有东西都按计划工作,但在Kafka的高负载下,Kafka滞后会随着时间的推移越来越大。 我的KafKaspoutConfig: 那么我的拓扑结构如下