当前位置: 首页 > 知识库问答 >
问题:

测试Storm螺栓和喷口

江航
2023-03-14

因此,如果您使用基于JUnit的单元测试,是否建议您运行一个小型模拟拓扑(localmode?)并测试该拓扑下的bolt(或spout)的隐含契约?或者,是否可以使用JUnit,但这意味着我们必须仔细模拟Bolt的生命周期(创建它、调用prepare()、嘲弄config等)?在这种情况下,被测类(螺栓/喷口)有哪些一般的测试点需要考虑?

其他开发人员在创建正确的单元测试方面做了什么?

我注意到有一个拓扑测试API(参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/testingapidemo.java)。使用这些API,并为每个Bolt&spout建立“测试拓扑”(并验证Bolt必须提供的隐式契约,例如-它声明的输出),是不是更好?

谢谢

共有1个答案

轩辕奕
2023-03-14

我们的方法是使用构造函数--将一个可序列化的工厂注入到喷口/螺栓中。然后,喷口/螺栓在其打开/准备方法中咨询工厂。工厂的唯一责任是以可串行化的方式封装获得喷口/螺栓的依赖关系。这种设计允许我们的单元测试注入假/测试/模拟工厂,当被咨询时,这些工厂返回模拟服务。通过这种方式,我们可以使用Mockito等模拟工具对喷口/螺栓进行狭义的单元测试。

下面是一个螺栓的通用示例和它的测试。我已经省略了工厂usernotificationFactory的实现,因为它取决于您的应用程序。您可以使用服务定位器来获取服务、序列化配置、HDFS可访问的配置,或者任何获取正确服务的方法,只要工厂能够在一个serde周期之后完成这些操作。您应该介绍该类的序列化。

螺栓

public class NotifyUserBolt extends BaseBasicBolt {
  public static final String NAME = "NotifyUser";
  private static final String USER_ID_FIELD_NAME = "userId";

  private final UserNotifierFactory factory;
  transient private UserNotifier notifier;

  public NotifyUserBolt(UserNotifierFactory factory) {
    checkNotNull(factory);

    this.factory = factory;
  }

  @Override
  public void prepare(Map stormConf, TopologyContext context) {
    notifier = factory.createUserNotifier();
  }

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    // This check ensures that the time-dependency imposed by Storm has been observed
    checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");

    long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);

    notifier.notifyUser(userId);

    collector.emit(new Values(userId));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(USER_ID_FIELD_NAME));
  }
}
public class NotifyUserBoltTest {

  private NotifyUserBolt bolt;

  @Mock
  private TopologyContext topologyContext;

  @Mock
  private UserNotifier notifier;

  // This test implementation allows us to get the mock to the unit-under-test.
  private class TestFactory implements UserNotifierFactory {

    private final UserNotifier notifier;

    private TestFactory(UserNotifier notifier) {
      this.notifier = notifier;
    }

    @Override
    public UserNotifier createUserNotifier() {
      return notifier;
    }
  }

  @Before
  public void before() {
    MockitoAnnotations.initMocks(this);

    // The factory will return our mock `notifier`
    bolt = new NotifyUserBolt(new TestFactory(notifier));
    // Now the bolt is holding on to our mock and is under our control!
    bolt.prepare(new Config(), topologyContext);
  }

  @Test
  public void testExecute() {
    long userId = 24;
    Tuple tuple = mock(Tuple.class);
    when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
    BasicOutputCollector collector = mock(BasicOutputCollector.class);

    bolt.execute(tuple, collector);

    // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
    //  the call to execute, too.
    verify(notifier).notifyUser(userId);
    verify(collector).emit(new Values(userId));
  }
}
 类似资料:
  • 在我的拓扑中,当元组从spout转移到bolt或从bolt转移到bolt时,我看到大约1-2 ms的延迟。我使用纳秒时间戳来计算延迟,因为整个拓扑运行在单个Worker中。拓扑是在集群中运行的,集群运行在具有生产能力的硬件中。 根据我的理解,在这种情况下,元组不需要序列化/反序列化,因为所有东西都在单个JVM中。我已经将大多数喷流和螺栓的并行性提示设置为5,并且喷流仅以每秒100的速率产生事件。我

  • 我对Apache Storm有一个奇怪的问题。我有一个Kafka连接到Kafka集群,里面有10条消息。 螺栓接收每条消息并正确处理,因为在Storm UI中,螺栓被列为“已确认”。然而,storm UI下面列出的喷口表示所有元组都失败了。 我相信这会导致喷口再次发出所有的信息。。。因此,我看到一个Storm螺栓打印出消息1-10,然后以相同的顺序一次又一次地打印出来。 我适当地调用了和方法,我只

  • 编辑:我向Bolt添加了一个。ack()(这要求我使用一个丰富的Bolt而不是基本的Bolt)并且遇到了同样的问题--没有任何信息告诉我Bolt正在处理元组。 如果有关系的话,我会在EC2实例上的CentOS映像上运行这个。如有任何帮助,不胜感激。 查看生成的Storm worker日志,我看到这一行: 下面几行如下: 工作日志的其余部分没有显示螺栓处理的消息的日志/打印。我不明白为什么螺栓似乎没

  • 我正在本地开发一个Storm拓扑。我正在使用Storm 0.9.2孵化,并开发了一个简单的拓扑。当我使用LocalCluster()选项部署它时,它工作得很好,但它不会显示在我的Storm UI中,它只是执行而已。 当我定期部署它时,它会在我的Storm UI中显示拓扑结构,但当我单击它时,不会看到喷口或螺栓。 我还尝试了许多Storm启动项目中的示例WordCountTopology。同样的行为

  • 我在本地模式下运行Apache Storm拓扑,它工作正常,但是当我将其提交给Storm时,喷口和螺栓不会显示在StormUI中,除了拓扑。 有人建议监督员应该运行,我也尝试过,即监督员、雨云和动物园管理员运行良好。提前谢谢。有人问了类似的问题,这表明监督员应该在阿帕奇Storm上——喷口和螺栓不存在于StormUI...但这在我的情况下不起作用任何想法,请。

  • 我正在尝试测量拓扑中每个bolt的延迟。Storm给出的延迟数是不够的,因为我们想要计算百分位数。在我当前的设置中,我通过测量完成execute方法(包括发出调用)所需的时间来测量bolt的延迟。该方法的假设是,即使当前bolt实例和下一个bolt实例在拓扑结构中共享同一个执行器,收集器的emit也会立即返回,而不需要调用下一个bolt实例执行方法。