Broker broker = Broker.fromString("localhost:9092")
GlobalPartitionInformation info = new GlobalPartitionInformation()
if(args[4]){
int partitionCount = args[4].toInteger()
for(int i =0;i<partitionCount;i++){
info.addPartition(i, broker)
}
}
StaticHosts hosts = new StaticHosts(info)
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
TridentTopology topology = new TridentTopology()
Stream st = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
.each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
.parallelismHint(args[1].toInteger())
Map conf = new HashMap()
conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
conf.put(Config.TOPOLOGY_DEBUG, false)
if (args[0] == "local") {
LocalCluster cluster = new LocalCluster()
cluster.submitTopology("mytopology", conf, topology.build())
} else {
StormSubmitter.submitTopology("mytopology", conf, topology.build())
NEO4JTridentFunction.getGraphDatabaseService().shutdown()
}
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "localhost"
# - "server2"
#
storm.zookeeper.port : 2999
storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"
nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
-XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
-server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
-Xloggc:logs/gc-worker-%ID%.log -verbose:gc
-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
-XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
-XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
-XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"
java.library.path: "/usr/lib/jvm/jdk1.7.0_25"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
谢谢,
您应该为提到的主题设置与分区计数相同的spout并行度。默认情况下,trident每次执行都接受一个批处理,您应该通过更改topology.max.spout.pending
属性来增加这个计数。由于Trident强制有序事务管理,因此您的执行方法(NEO4JTridentFunction)必须快速以达到所需的解决方案。
此外,您还可以使用“TridentConfig.FetchSizeBytes”
,通过更改它,您可以为喷点中的每个新发射调用摄取更多数据。
此外,您必须检查您的垃圾收集日志,它将给您的线索关于real point。
null 既然我问了这个问题,我的公司决定先买三叉戟。我们只会在性能出现问题时使用纯Storm。可悲的是,这不是一个积极的决定,它只是成为默认的行为(我当时不在)。 他们的假设是,在大多数用例中,我们需要状态处理或只需一次处理,或者我们将在不久的将来需要它。我理解他们的推理,因为从Storm到Trident或返回并不是一个容易的转换,但在我个人看来,没有状态的流处理的概念并不被所有人理解,这是使用
我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0
有人能指导我在这种情况下如何使用三叉戟吗?或者使用storm功能的任何其他适用方式?
嗨,我是新来的斯托姆和Kafka。我使用的是storm 1.0.1和kafka 0.10.0,我们有一个kafkaspout可以接收来自kafka主题的java bean。我花了几个小时来寻找正确的方法。发现很少文章是有用的,但没有一个方法为我工作到目前为止。 KafKaProducer: } Kyro串行器:
对于我的测试,我在队列中发布了700万条消息。我创建了一个包含30个消费者线程消费者组,每个分区一个。我最初的印象是,与通过SQS获得的相比,这将大大加快处理能力。不幸的是,情况并非如此。在我的例子中,数据处理是复杂的,平均需要1-2分钟才能完成,这导致了一系列分区重新平衡,因为线程不能按时运行。我在日志里看到一堆消息 组FULL_GROUP的自动偏移量提交失败:无法完成提交,因为该组已重新平衡并
问题内容: 我在Java2D方面表现有些古怪。我知道sun.java2d.opengl VM参数可以为2D启用3D加速,但是即使使用该参数也有一些奇怪的问题。 这是我运行的测试结果: 在JComponent上绘制具有32x32像素图块的25x18地图, 图像1 = .bmp格式,图像2 = .png格式 没有-Dsun.java2d.opengl = true 使用.BMP图像1的120 FPS使