当前位置: 首页 > 工具软件 > Storm Trident > 使用案例 >

Storm Trident示例global

太叔俊侠
2023-12-01

如下代码使用global做repartition, 数据流中的所有tuple都被分配到同一个partition当中(partition id最小的那个),

省略部分代码,省略部分可参考:https://blog.csdn.net/nickta/article/details/79666918

FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3, 
        		new Values("nickt1", 4),
                new Values("nickt2", 7), 
                new Values("nickt1", 8),
                new Values("nickt4", 9), 
                new Values("nickt5", 7),
                new Values("nickt1", 11),
                new Values("nickt4", 5)
                );
        spout.setCycle(false);
        TridentTopology topology = new TridentTopology();
        topology.newStream("spout1", spout)
        		.global()
        		.each(new Fields("user"),new Debug("print:"))
        		.parallelismHint(5);
输出:

<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt1]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt2]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt1]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt4]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt5]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt1]
<Fri Mar 23 15:41:17 CST 2018[partition0-Thread-62-b-0-executor[33 33]]> DEBUG(print:): [nickt4]

 类似资料: