当前位置: 首页 > 知识库问答 >
问题:

Apache波束大可迭代突变

隆选
2023-03-14

我正在将我的google dataflow java 1.9迁移到beam 2.0,并尝试使用BigtableIO。写

    ....
.apply("", BigtableIO.write()
                .withBigtableOptions(bigtableOptions)
                .withTableId("twoSecondVitals"));

在大舞台前的巴黎公园里,我正在努力让它变得更容易接受。

          try{
        Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v));
        Mutation mu[] = {mutation};
        Iterable<Mutation> imu = Arrays.asList(mu);
        log.severe("imu");
        c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu));
      }catch (Exception e){
        log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage());
      }

上述代码引发以下异常InvalidProtocolBufferException:协议消息结束组标记与预期标记不匹配

v是对象列表(Vitals.class)。hbase api使用Put方法创建变异。如何创建将与BigtableIO接收器一起工作的BigTable突变?

共有1个答案

梅耘豪
2023-03-14

通过浏览sdk的测试,我能够找到我的答案。

            Iterable<Mutation> mutations =
                ImmutableList.of(Mutation.newBuilder()
                .setSetCell(
                        Mutation.SetCell.newBuilder()
                        .setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v)))
                        .setFamilyName("vitals")
                ).build());
 类似资料:
  • 这与这个问题最为相似。 我正在Dataflow 2.x中创建一个管道,它从Pubsub队列获取流式输入。进入的每一条消息都需要通过来自Google BigQuery的一个非常大的数据集进行流式传输,并且在写入数据库之前附加了所有相关的值(基于一个键)。 问题是来自BigQuery的映射数据集非常大--任何将其用作侧输入的尝试都失败了,数据流运行程序会抛出错误“java.lang.IllegalAr

  • 在我的数据流作业中,我需要初始化配置工厂,并在实际处理开始之前将某些消息记录在审核日志中。 我将配置工厂初始化代码审计日志记录放在父类PlatformInitializer中,并在我的主管道类中扩展它。 因此,我还必须在我的管道类中实现可序列化接口,因为beam抛出了错误-<代码>java。io。NotSerializableException:组织。德维塔姆。自定义作业 在PlatformIni

  • 我正在构建一个读取Avro通用记录的管道。为了在各个阶段之间传递GenericRecord,我需要注册avrocoder。文档表明,如果我使用通用记录,模式参数可以是任意的:https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/avrocoder.html#of-java.lang.class-org.a

  • 我正在使用通过OpenMDAO实现的IPOPT,在理解和控制停止标准方面遇到了一些问题。 这是我具体经历的:最初,IPOPT能够找到一个看起来更好的解决方案,尽管约束有点违反(直觉告诉我,调整几个参数可能会使其进入可行区域)。从这次讨论中,我了解到“线性或非线性等式或不等式约束在解算器在最终迭代中完成收敛之前不一定会得到满足”,因此我想知道是否可以更改容差,使解算器能够更快地开始完全满足约束(我是

  • 当一个对象实现了属性时,我们认为它是可迭代的。 一些内置的类型如Array,Map,Set,String,Int32Array,Uint32Array等都已经实现了各自的Symbol.iterator。 对象上的Symbol.iterator函数负责返回供迭代的值。 for..of 语句 for..of会遍历可迭代的对象,调用对象上的Symbol.iterator属性。 下面是在数组上使用for.

  • 嗨,我已经创建了一个apache beam管道,测试了它,并在eclipse内部运行了它,包括本地和使用dataflow Runner。我可以在eclipse控制台中看到管道正在运行。e.控制台上的日志。 这是我在cmd提示符中使用的maven命令, 这是我用来创建管道和设置选项的代码段。