我对Kafka和斯托姆有意见。我现在不确定是我正在设置的KafkaSpout配置有问题,还是我没有正确地进行处理。
我在我的Kafka主题上排了50个条目,但我的喷口发出了1300多个(而且还在计数)元组。此外,the Spout报道说,几乎所有人都“失败了”。拓扑实际上并没有失败,它正在成功地写入数据库,但我只是不知道为什么它明显地重播所有内容(如果它正在这样做的话)
最大的问题是:
下面是我如何设置拓扑和KafkaSpout
public static void main(String[] args) {
try {
String databaseServerIP = "";
String kafkaZookeepers = "";
String kafkaTopicName = "";
int numWorkers = 1;
int numAckers = 1;
int numSpouts = 1;
int numBolts = 1;
int messageTimeOut = 10;
String topologyName = "";
if (args == null || args[0].isEmpty()) {
System.out.println("Args cannot be null or empty. Exiting");
return;
} else {
if (args.length == 8) {
for (String arg : args) {
if (arg == null) {
System.out.println("Parameters cannot be null. Exiting");
return;
}
}
databaseServerIP = args[0];
kafkaZookeepers = args[1];
kafkaTopicName = args[2];
numWorkers = Integer.valueOf(args[3]);
numAckers = Integer.valueOf(args[4]);
numSpouts = Integer.valueOf(args[5]);
numBolts = Integer.valueOf(args[6]);
topologyName = args[7];
} else {
System.out.println("Bad parameters: found " + args.length + ", required = 8");
return;
}
}
Config conf = new Config();
conf.setNumWorkers(numWorkers);
conf.setNumAckers(numAckers);
conf.setMessageTimeoutSecs(messageTimeOut);
conf.put("databaseServerIP", databaseServerIP);
conf.put("kafkaZookeepers", kafkaZookeepers);
conf.put("kafkaTopicName", kafkaTopicName);
/**
* Now would put kafkaSpout instance below instead of TemplateSpout()
*/
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts);
builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout");
StormTopology topology = builder.createTopology();
StormSubmitter.submitTopology(topologyName, conf, topology);
} catch (Exception e) {
System.out.println("There was a problem starting the topology. Check parameters.");
e.printStackTrace();
}
}
private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception {
//String topic = "FLAT-ITEMS";
String zkNode = "/" + topic + "-subscriber-pipeline";
String zkSpoutId = topic + "subscriberpipeline";
KafkaTopicInZkCreator.createTopic(topic, zkHosts);
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId);
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
// spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
//spoutConfig.startOffsetTime = System.currentTimeMillis();
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
return new KafkaSpout(spoutConfig);
}
这里是主题的创建,以备不时之需
public static void createTopic(String topicName, String zookeeperHosts) throws Exception {
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false);
int noOfPartitions = 1;
int noOfReplication = 1;
Properties topicConfiguration = new Properties();
boolean topicExists = AdminUtils.topicExists(zkUtils, topicName);
if (!topicExists) {
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
}
您需要查看螺栓中的消息是否失败。
如果它们也都失败了,那么您可能没有在bolt中确认消息,或者bolt代码中有异常。
如果收到了bolt消息,则更有可能是超时。增加拓扑超时配置或paralisim应该可以解决此问题。
我注意到,当我使用条件断点进行调试时,执行速度会大大减慢。我知道这一点已经有一段时间了,现在想明白为什么。到底是什么原因导致执行如此缓慢?我知道正在添加一个条件,但是如果我自己添加条件,我不会减慢执行速度。 例如,假设我们有以下代码。假设我们添加了一个条件断点。让我们将条件设置为i==10000。 现在让我们自己写条件。 90秒完成击球(包括开始的9秒) 日食: ~9秒到达断点 第二个示例几乎是在
查看此链接的输出(向下滚动查看输出),以了解我正在尝试完成的内容 我希望这可以打印从0.0到0.9的值,但它在打印0.8后就停止了,知道为什么吗?
问题内容: 需要对以下代码进行澄清: 这将打印出来,以便证明和对象引用相同的内存引用。 这将打印出来,也证明是相同的。 显然,这将引发,因为我试图调用空引用。 所以这是我的问题,为什么最后一个代码示例没有抛出,因为我从前两个示例中看到并理解的是,如果两个对象都引用同一个对象,那么如果我们更改任何值,那么它也会反映给另一个对象,因为两个对象都指向相同的内存引用。那么,为什么该规则在这里不适用?如果我
本文向大家介绍kafka 为什么那么快?相关面试题,主要包含被问及kafka 为什么那么快?时的应答技巧和注意事项,需要的朋友参考一下 Cache Filesystem Cache PageCache缓存 顺序写 由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。 Zero-copy 零拷技术减少拷贝次数 Batching of Messages 批量量处理。合并小
使用方法:进入动态-发现话题
问题内容: 这是所有编程语言所共有的吗?在进行多次打印后再执行println似乎更快,但是将所有内容移动到字符串中并仅进行打印似乎最快。为什么? 编辑:例如,Java可以在不到一秒钟的时间内找到所有高达100万的质数- 但要进行打印,然后在自己的println中将它们全部输出可能需要几分钟!最多可打印100亿小时! 例如: 问题答案: 速度并不慢,而是由主机操作系统提供的与控制台连接的基础。 您可