在使用KafkaSpout和几个Bolt开发并执行了我的Storm(1.0.1)拓扑之后,我注意到即使拓扑处于空闲状态,也会出现巨大的网络流量(Kafka上没有消息,Bolt中没有处理)。因此,我开始逐一评论我的拓扑结构,以便找到原因,现在我的主要内容中只有Kafka普特:
....
final SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts(zkHosts, "/brokers"),
"files-topic", // topic
"/kafka", // ZK chroot
"consumer-group-name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
topologyBuilder.setSpout(
"kafka-spout-id,
new KafkaSpout(config),
1);
....
当这个(无用的)拓扑执行时,即使是在本地模式下,即使是第一次,流量总是会增长很多:我看到了(在我的活动监视器中)
(重要提示:Kafka未在群集中运行,该群集是在同一台计算机上运行的一个实例,具有一个主题和一个分区。我刚刚在我的计算机上下载了Kafka,启动它并创建了一个简单的主题。当我在主题中放置消息时,拓扑中的所有内容都正常工作,没有任何问题)
显然,原因就在《Kafka普特》中。nextTuple()method(如下),但我不明白,如果没有Kafka中的任何消息,我为什么会有这样的流量。有什么我没有考虑的吗?这是预期的行为吗?我查看了Kafka日志,ZK日志,什么都没有,我已经清理了Kafka和ZK数据,什么都没有,仍然是相同的行为。
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
/*
As far as the System.currentTimeMillis() is dependent on System clock,
additional check on negative value of diffWithNow in case of external changes.
*/
if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
commit();
}
}
你的螺栓收到信息了吗?你的螺栓继承BaseRichBolt吗?
注释掉Kafaspout中的m.fail(id.offset)行并检查它。如果您的插销没有确认,那么您的喷口将假定消息失败,并尝试重播相同的消息。
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
//m.fail(id.offset);
}
也尝试停止nextTuple()几毫升,并检查出来。
如果有帮助就告诉我
例如,在nextTuple()方法中Hibernate一秒钟(1000毫秒),然后立即观察流量,
@Override
public void nextTuple() {
try {
Thread.sleep(1000);
} catch(Exception ex){
log.error("Ëxception while sleeping...",e);
}
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
...
...
...
...
}
原因是,Kafka消费者基于拉动方法工作,这意味着消费者将从Kafka经纪人那里拉动数据。因此,从消费者的角度来看(Kafka Spoot)将持续向Kafka代理发出一个获取请求,这是一个TCP网络请求。因此,您面临着发送/接收数据包的大量统计数据<代码>虽然使用者不使用任何消息,但拉取请求和空响应也将计入网络数据包发送/接收统计数据中
巨人游戏 笔试 - 2024-09-06 受不了了,秋招投了一堆游戏公司,笔试是一家比一家费劲,太难绷了。40分不定项 + 60编程 + 20主观题。 1. 不定项难得一笔,各种指针满天飞,看的头晕,单选多选混在一起,夹杂点图形学的东西,鼠鼠投的是服务端啊喂。 2. 编程1,三数之和,编程只准使用C++。 3. 编程2,逆天的来了,手搓汇编语言编译器,题目万字长文,可以感觉到这是一家有趣的公司,可
编辑我试图找出我的代码有什么问题,我开始绘制简单的图形,看看箭头在较小的图形上会是什么样子。我听到以下命令: 我试图生成一个网络的情节,出于某种原因,我的箭头看起来像小矩形,而不是通常的三角形箭头。 以下是我用于绘图的代码: 下面是一个示例图: 当我为。 我的代码有什么问题?这可能与R的版本有关吗?我以前用非常类似的命令绘制了很多图,我从来没有遇到过问题。 这里是带有节点信息和边缘列表的文件。
1.自我介绍 2.为什么投游戏测试岗位? 3.对游戏测试工作有什么了解? 4.游戏测试都需要做什么? 5.平时玩什么游戏 6.一天会花多长时间在游戏 7.最近玩了什么游戏 8.端游呢? 9.对我们公司有什么了解? 10.对工作的计划? 11.linux操作命令 12.对数据库的增删改查了解吗?删除是什么? 13.sql插入语句 14.能接受加班吗? 15.反问
9.6笔试 第一题想破脑袋不知道哪种情况少了,A0.75 第二题整数之和为K,也是想破脑袋只过A0.5,这对不就是全排列加去重吗? 第三题应该最友好了,就是标准的求两个字符串的最长重复子串,A100
一面电话 自我介绍,项目介绍 关于你的项目进行的测试 压力测试的了解 为什么项目不进行性能测试 你最喜欢的游戏? 王者荣耀的一个英雄的技能点进行测试用例的书写 你在王者中遇到过哪些bug,进行下原因分析 为什么想做测试,自己性格为什么适合 自己为了测试进行过哪些学习 你了解SQL,Linux吗(了解,就没深问) 面完直接就说过了等二面通知,一面还是比较基础的 小哥哥说那我们的面试就到这里结束啦,谢
我目前正在使用自定义JWT身份验证进行SpringCloudGateway。身份验证后,我希望使用GlobalFilter将标头中的JWT令牌字符串传递到下游服务: JWT令牌字符串可以通过调用主体来获得。getName(); 我的问题是:我如何实现