当前位置: 首页 > 知识库问答 >
问题:

非kV元件的GroupIntoBatches

龙景澄
2023-03-14
static class FakeKVFn extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}
public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  long batchSize = 100L;

  p.apply("ReadLines", TextIO.read().from("./input.txt"))
      .apply("FakeKV", ParDo.of(new FakeKVFn()))
      .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
      .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(callWebService(c.element().getValue()));
        }
      }))
      .apply("WriteResults", TextIO.write().to("./output/"));

  p.run().waitUntilFinish();
}

有没有办法不引入“假”钥匙就分组成批?

共有1个答案

庄飞
2023-03-14
    null
 类似资料:
  • Consul相关功能: 根据项目及业务线配置一个ConsulACL(即:Consul权限)。 根据配置好的 Consul token,可配置具体 Key/Value。 Consul相关提示: 支持删除 ACL 删除后不可恢复。 支持删除 Key/Value 具体 字段 及 文件夹,删除后不可恢复。 Key/Value 的增、删、改,都需要用到 token,请妥善保管。 操作步骤 ACL 管理 Co

  • 一个简单的小工具,用于在两个独立的 Consul 集群之间迁移 KV 数据和比较 KV 差异。 命令行参数 $ consul-kv-migrate -hUsage of consul-kv-migrate: -action string 执行动作:diff-对比源和目标 Consul 配置差异,migrate-迁移源到目标并覆盖目标, 目标服务器的多余的 Key 不会被删除 (defau

  • 在很多情况下,日志内容本身都是一个类似于 key-value 的格式,但是格式具体的样式却是多种多样的。logstash 提供 filters/kv 插件,帮助处理不同样式的 key-value 日志,变成实际的 LogStash::Event 数据。 配置示例 filter { ruby { init => "@kname = ['method','uri','verb'

  • Compact 方法压缩在etcd键值存储中的事件历史。 键值存储应该定期压缩,否则事件历史会无限制的持续增长. rpc Compact(CompactionRequest) returns (CompactionResponse) {} 消息体 请求的消息体是 PutRequest: message CompactionRequest { // 用于比较操作的键值存储的修订版本 int6

  • Txn 方法在单个事务中处理多个请求。 一个 txn 请求增加键值存储的修订版本并为每个完成的请求生成带有相同修订版本的事件。 不容许在一个txn中多次修改同一个key。 rpc Txn(TxnRequest) returns (TxnResponse) {} 背景 以下内容翻译来自 proto文件中 TxnRequest 的注释,解释了Txn请求的工作方式. 来自 google paxosdb

  • DeleteRange 方法从键值存储中删除给定范围。 删除请求增加键值存储的修订版本并在事件历史中为每个被删除的key生成一个删除事件. rpc DeleteRange(DeleteRangeRequest) returns (DeleteRangeResponse) 消息体 请求的消息体是 DeleteRangeRequest: message DeleteRangeRequest { /