当前位置: 首页 > 知识库问答 >
问题:

如何对写入BigTable的数据流管道进行集成测试?

蓬琦
2023-03-14

据Beam网站报道,

通常,对管道代码执行本地单元测试比调试管道的远程执行更快更简单。

出于这个原因,我想对写到Bigtable的Beam/DataFlow应用程序使用测试驱动开发。

但是,在Beam测试文档之后,我遇到了一个僵局--Passert并不有用,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不重写equals方法。

我也无法获取PCollection的内容来对它们进行验证,因为

直接获取PCollection的内容是不可能的--Apache Beam或Dataflow管道更像是应该执行什么处理的查询计划,PCollection是计划中的逻辑中间节点,而不是包含数据。

那么,除了手动运行它之外,如何测试这个管道呢?我正在使用Maven和JUnit(在Java中,因为这是Dataflow Bigtable Connector所支持的全部)。

共有1个答案

辛渝
2023-03-14

Bigtable Emulator Maven插件可用于为此编写集成测试:

>

  • 配置Maven Failsafe插件,并将测试用例的结尾从*test更改为*it,以作为集成测试运行。
  • 在gcloud sdk的命令行上安装Bigtable模拟器:

    gcloud components install bigtable   
    

    请注意,这一必要步骤将降低代码的可移植性(例如,它将在您的构建系统上运行吗?在其他开发人员的机器上运行吗?)因此,在部署到构建系统之前,我将使用Docker对其进行容器化。

    根据自述文件向pom添加模拟器插件

    使用HBase客户端API,并查看示例Bigtable Emulator integration test来设置会话和表。

    按照Beam文档正常编写测试,只是不使用PAssert,而是调用CloudBigTableIo.WriteTotable,然后使用HBase客户机从表中读取数据来验证它。

    下面是一个集成测试示例:

    package adair.example;
    
    import static org.apache.hadoop.hbase.util.Bytes.toBytes;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.UUID;
    import java.util.stream.Collectors;
    
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.Mutation;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.client.ResultScanner;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.hamcrest.collection.IsIterableContainingInAnyOrder;
    import org.junit.Assert;
    import org.junit.Test;
    
    import com.google.cloud.bigtable.beam.CloudBigtableIO;
    import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
    import com.google.cloud.bigtable.hbase.BigtableConfiguration;
    
    /**
     *  A simple integration test example for use with the Bigtable Emulator maven plugin.
     */
    public class DataflowWriteExampleIT {
    
      private static final String PROJECT_ID = "fake";
      private static final String INSTANCE_ID = "fakeinstance";
      private static final String TABLE_ID = "example_table";
      private static final String COLUMN_FAMILY = "cf";
      private static final String COLUMN_QUALIFIER = "cq";
    
      private static final CloudBigtableTableConfiguration TABLE_CONFIG =
        new CloudBigtableTableConfiguration.Builder()
          .withProjectId(PROJECT_ID)
          .withInstanceId(INSTANCE_ID)
          .withTableId(TABLE_ID)
          .build();
    
      public static final List<String> VALUES_TO_PUT = Arrays
        .asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");
    
      @Test
      public void testPipelineWrite() throws IOException {
        try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
          Admin admin = connection.getAdmin();
          createTable(admin);
    
          List<Mutation> puts = createTestPuts();
    
          //Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
          Pipeline p = Pipeline.create();
          p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
          p.run().waitUntilFinish();
    
          //Read the data from the table using the regular hbase api for validation
          ResultScanner scanner = getTableScanner(connection);
          List<String> resultValues = new ArrayList<>();
          for (Result row : scanner) {
            String cellValue = getRowValue(row);
            System.out.println("Found value in table: " + cellValue);
            resultValues.add(cellValue);
          }
    
          Assert.assertThat(resultValues,
            IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
        }
      }
    
      private void createTable(Admin admin) throws IOException {
        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
        tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
    
        admin.createTable(tableDesc);
      }
    
      private ResultScanner getTableScanner(Connection connection) throws IOException {
        Scan scan = new Scan();
        Table table = connection.getTable(TableName.valueOf(TABLE_ID));
        return table.getScanner(scan);
      }
    
      private String getRowValue(Result row) {
        return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
      }
    
      private List<Mutation> createTestPuts() {
        return VALUES_TO_PUT
              .stream()
              .map(this::stringToPut)
              .collect(Collectors.toList());
      }
    
      private Mutation stringToPut(String cellValue){
        String key = UUID.randomUUID().toString();
        Put put = new Put(toBytes(key));
        put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
        return put;
      }
    
    }
    

  •  类似资料:
    • 假设我有一个测试来断言系统中新用户的注册是否真的成功: 现在,这将在实际数据库中创建一个新的测试用户,因为我希望这个测试在实际环境中运行。这意味着测试不能一直运行,对吗?我应该如何进行此类测试?那些使用系统的实际环境并操纵真实数据的人?

    • 集成测试是对已经进行单元测试的各个部分的一种整合测试。集成是昂贵的,并且它出现在测试中。你必须把这个考虑到你的预计和时间表里。 理想情况下,你应该这样组织一个项目,使得最后没有一个阶段是必须通过显式集成来进行的。这比在项目过程中,随着事情完成逐渐集成事情要好得多。如果这是不可避免的,请仔细评估。

    • 因此,服务器和客户端都发生这种情况。我有来自通道活动方法的通道处理程序上下文,我正在尝试使用写入AndFlush(对象消息)方法向其写入对象,但似乎消息永远不会进入创建的管道。 下面是我的客户端处理程序的样子(我重写了包解码器和编码器中的一些方法来调试) 这是我如何写入ChannelHandlerContext通道变量 当我运行我的代码时,“从客户端写入服务器的数据”是打印机,但“在客户端上编码的

    • 我正在使用sdk version并尝试使用运行器将数据拉至bigtable。不幸的是,当我使用作为我的接收器时,我在执行我的数据流管道时得到了。已经检查了我的并且参数很好,根据我的需要。 基本上,我创建并在我的管道的某个点上完成了编写 ,但我甚至无法设置断点来调试正好是null的地方。对于如何解决这个问题,有什么建议吗? 谢谢。

    • 作为以下问答的后续问题: https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey 我想与谷歌数据流工程团队(@jkff)确认尤金提出的第三个选项是否有可能使用谷歌数据流: