当前位置: 首页 > 知识库问答 >
问题:

KafkaSpout(idle)产生了巨大的网络流量

卓宏达
2023-03-14

在使用KafkaSpout和几个Bolt开发并执行了我的Storm(1.0.1)拓扑之后,我注意到即使拓扑处于空闲状态,也会出现巨大的网络流量(Kafka上没有消息,Bolt中没有处理)。因此,我开始逐一评论我的拓扑结构,以便找到原因,现在我的主要内容中只有Kafka普特:

....
final SpoutConfig spoutConfig = new SpoutConfig(
                new ZkHosts(zkHosts, "/brokers"), 
                "files-topic", // topic
                "/kafka", // ZK chroot 
                "consumer-group-name");
     spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
     spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
     topologyBuilder.setSpout(
                        "kafka-spout-id, 
                        new KafkaSpout(config),
                        1); 
....

当这个(无用的)拓扑执行时,即使是在本地模式下,即使是第一次,流量总是会增长很多:我看到了(在我的活动监视器中)

  • 平均每秒接收432 KB的数据
  • 几小时后,拓扑正在运行(空闲),收到的数据为1.26GB,发送的数据为1GB

(重要提示:Kafka未在群集中运行,该群集是在同一台计算机上运行的一个实例,具有一个主题和一个分区。我刚刚在我的计算机上下载了Kafka,启动它并创建了一个简单的主题。当我在主题中放置消息时,拓扑中的所有内容都正常工作,没有任何问题)

显然,原因就在《Kafka普特》中。nextTuple()method(如下),但我不明白,如果没有Kafka中的任何消息,我为什么会有这样的流量。有什么我没有考虑的吗?这是预期的行为吗?我查看了Kafka日志,ZK日志,什么都没有,我已经清理了Kafka和ZK数据,什么都没有,仍然是相同的行为。

@Override
public void nextTuple() {
    List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
    for (int i = 0; i < managers.size(); i++) {

        try {
            // in case the number of managers decreased
            _currPartitionIndex = _currPartitionIndex % managers.size();
            EmitState state = managers.get(_currPartitionIndex).next(_collector);
            if (state != EmitState.EMITTED_MORE_LEFT) {
                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
            }
            if (state != EmitState.NO_EMITTED) {
                break;
            }
        } catch (FailedFetchException e) {
            LOG.warn("Fetch failed", e);
            _coordinator.refresh();
        }
    }

    long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;

    /*
         As far as the System.currentTimeMillis() is dependent on System clock,
         additional check on negative value of diffWithNow in case of external changes.
     */
    if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
        commit();
    }
}

共有2个答案

鲜于致远
2023-03-14

你的螺栓收到信息了吗?你的螺栓继承BaseRichBolt吗?

注释掉Kafaspout中的m.fail(id.offset)行并检查它。如果您的插销没有确认,那么您的喷口将假定消息失败,并尝试重播相同的消息。

public void fail(Object msgId) {
        KafkaMessageId id = (KafkaMessageId) msgId;
        PartitionManager m = _coordinator.getManager(id.partition);
        if (m != null) {
            //m.fail(id.offset);
        }

也尝试停止nextTuple()几毫升,并检查出来。

如果有帮助就告诉我

戚研
2023-03-14

例如,在nextTuple()方法中Hibernate一秒钟(1000毫秒),然后立即观察流量,

@Override
public void nextTuple() {
   try {
       Thread.sleep(1000);
   } catch(Exception ex){
        log.error("Ëxception while sleeping...",e);
   }
   List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
   for (int i = 0; i < managers.size(); i++) {
     ...
     ...
     ...
     ...
}

原因是,Kafka消费者基于拉动方法工作,这意味着消费者将从Kafka经纪人那里拉动数据。因此,从消费者的角度来看(Kafka Spoot)将持续向Kafka代理发出一个获取请求,这是一个TCP网络请求。因此,您面临着发送/接收数据包的大量统计数据<代码>虽然使用者不使用任何消息,但拉取请求和空响应也将计入网络数据包发送/接收统计数据中

 类似资料:
  • 编辑我试图找出我的代码有什么问题,我开始绘制简单的图形,看看箭头在较小的图形上会是什么样子。我听到以下命令: 我试图生成一个网络的情节,出于某种原因,我的箭头看起来像小矩形,而不是通常的三角形箭头。 以下是我用于绘图的代码: 下面是一个示例图: 当我为。 我的代码有什么问题?这可能与R的版本有关吗?我以前用非常类似的命令绘制了很多图,我从来没有遇到过问题。 这里是带有节点信息和边缘列表的文件。

  • 1.自我介绍 2.为什么投游戏测试岗位? 3.对游戏测试工作有什么了解? 4.游戏测试都需要做什么? 5.平时玩什么游戏 6.一天会花多长时间在游戏 7.最近玩了什么游戏 8.端游呢? 9.对我们公司有什么了解? 10.对工作的计划? 11.linux操作命令 12.对数据库的增删改查了解吗?删除是什么? 13.sql插入语句 14.能接受加班吗? 15.反问

  • 一面电话 自我介绍,项目介绍 关于你的项目进行的测试 压力测试的了解 为什么项目不进行性能测试 你最喜欢的游戏? 王者荣耀的一个英雄的技能点进行测试用例的书写 你在王者中遇到过哪些bug,进行下原因分析 为什么想做测试,自己性格为什么适合 自己为了测试进行过哪些学习 你了解SQL,Linux吗(了解,就没深问) 面完直接就说过了等二面通知,一面还是比较基础的 小哥哥说那我们的面试就到这里结束啦,谢

  • 我目前正在使用自定义JWT身份验证进行SpringCloudGateway。身份验证后,我希望使用GlobalFilter将标头中的JWT令牌字符串传递到下游服务: JWT令牌字符串可以通过调用主体来获得。getName(); 我的问题是:我如何实现

  • 问题内容: 我可以在Java监视程序上创建网络流量吗?该程序必须控制从计算机程序(包括OS模块)到网络驱动程序再返回的所有网络流量。如果是,如何? 注意: 我不仅要监视流量,还要对其进行控制。我想在Windows NT上实现这样的系统。仅靠Java无法实现它。如何在JNI的帮助下执行它? 也许是另一个变体。我不熟悉Windows服务,但仍然如此。我将在C 上编写一个程序并将其注册为Windows服

  • 知道Spark每个工作节点使用多个执行器,并且每个执行器都在自己的JVM中运行,我想知道Spark /if如何优化广播变量的流量。希望它为每个工作节点进行一次下载,然后将已经序列化的数据发送到该特定节点上的执行器。另一种选择是每次执行器需要时下载广播数据(因此必须在特定节点上多次下载相同的数据)。