当前位置: 首页 > 知识库问答 >
问题:

使用外部依赖项对apache beam有状态管道进行单元测试

漆雕伟志
2023-03-14

我有一个apache beam管道,它从pubsub读取数据,使用Redis丰富数据,最后写入pubsub。我试图编写测试来测试浓缩Dofn,这是一个有状态Dofn。在这里,内部状态充当近缓存,以减少对Redis的调用。为了实例化我的Redis客户机,我使用PipelineOptions中声明的工厂,例如

@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();

void setRedisClient(RedisClient client);

理论上,上述客户机应该是每个工人的单人。在我的单元测试中,我试图模拟redis客户端中的一些东西。我的测试是这样的-

//setup pipeline
TestStream<MetricsInstance> inputStream =
        TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream  = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));


CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);

当我尝试运行这个测试时,我得到了一个类似这样的错误

java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'

为了使框架不尝试序列化客户机,我可以在Options类的getRedisclient()上添加@jsonignore。但这会导致Redis实例在某个时候被重新创建,而我所有的嘲笑和攻击都消失了。我想知道测试这种场景的最佳方法是什么。

共有1个答案

卓俊晖
2023-03-14

在对Apache Beam邮件列表进行了一些讨论后,我能够让这个东西工作起来。诀窍是设置RedisClientFactory,它使用管道选项中的另一个字段,该字段公开RedisClient类的名称。

所以选项看起来是这样的-

    @Default.Class(RedisClientImpl.class)
    Class<? extends RedisClient> getRedisClientClass();

    void setRedisClientClass(Class<? extends RedisClient> redisClientClass);

    @Default.InstanceFactory(RedisClientFactory.class)
    RedisClient getRedisClient();

    void setRedisClient(RedisClient client);

工厂是这样实施的--

public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
  @Override
  public RedisClient create(PipelineOptions options) {

    CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
    return InstanceBuilder.ofType(RedisClient.class)
        .fromClass(pipelineOptions.getRedisClientClass())
        .fromFactoryMethod("fromOptions")
        .withArg(PipelineOptions.class, options)
        .build();
  }

}
  public static RedisClientImpl fromOptions(PipelineOptions options) {
    return new RedisClientImpl(options.as(CommonPipelineOptions.class));
  }

使用此设置,我现在可以在单元测试中创建RedisClient的模拟实例。

options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);

我们还需要确保FakeRedisClient类还公开了一个名为fromOptions的方法

  public static FakeRedisClient fromOptions(PipelineOptions options) {
    return new FakeRedisClient();
  }
 类似资料:
  • 问题内容: 在对Angular工厂进行单元测试(使用Karma + Jasmine)时,如何将存根依赖项注入到要测试的工厂中? 这是我的工厂: 实例化我的工厂时需要。 这是我的测试: 注意:我知道这允许用于控制器,但是我没有看到与之等效的工厂。 问题答案: 我知道有两种方法可以完成这样的事情: 使用和匿名模块注入模拟。 注入您要模拟的服务,并使用茉莉的间谍功能提供模拟值。 第二个选项仅在您确切知道

  • 问题内容: 我有一段代码,我不知道如何进行单元测试!该模块使用urllib2从外部XML提要(twitter,flickr,youtube等)中提取内容。这是一些伪代码: 我的第一个想法是腌制响应并加载它以进行测试,但是显然urllib的响应对象是不可序列化的(它引发了异常)。 仅从响应主体保存XML是不理想的,因为我的代码也使用标头信息。它旨在作用于响应对象。 当然,在单元测试中依赖外部数据源是

  • 我有以下方法,它采用UNIX时间戳并以天、小时或分钟的形式返回年龄。我想用JUnit单元测试它,但我不确定如何开始这样做,因为当前时间不断变化。有什么建议吗?谢谢! 方法如下: }

  • 我有这样一个简单的课程: 我想为它写一个测试,下面是一个框架: ErrorLogger类中的logger是由StaticLoggerBinder提供的,所以我的问题是-如何让它工作,以便那些检查“1*logger.error(u作为字符串)”可以工作?在ErrorLogger类中,我找不到一种恰当的方式来嘲笑那个记录器。我曾考虑过反射,并以某种方式访问它,此外,mockito注入也有一个想法(但如

  • 无状态管道是纯粹的功能,通过输入数据流动而不记住任何东西或引起可检测的副作用。 大多数管道是无状态的。 我们使用的CurrencyPipe和我们创建的长度管是无状态管的示例。 状态管道是能够管理它们转换的数据的状态的管道。 创建HTTP请求,存储响应并显示输出的管道是有状态的管道。 有状态管道应谨慎使用。 Angular 2提供 ,这是有状态的。 View Example 实现有状态管道 // n

  • 我们正在将基于spring的camel应用程序迁移到基于蓝图的camel应用程序。我们还将迁移到fuse 6.1版本。我正在使用带有Junit runner的Pax考试和apache felix容器来执行我的单元测试。我面临与ehcache manager相关的未解决捆绑包问题。我曾尝试将pax配置作为mavenBundle提供所需的依赖性,但没有任何运气。异常并没有给出缺少依赖项的细节,而是只给