当前位置: 首页 > 知识库问答 >
问题:

实现流云数据流转换,为流中的每个元素调用API

鞠子轩
2023-03-14

我相信,这不是实施这种转变的最佳方式。我想知道是否有更好的方法从数据流作业中进行查找。

跟进:

试图将用例实现为侧输入:

> PcollectionView<DatabaseReader> reader = pipeline.apply(
>        ParDo.of(
>                new DoFn<Long, DatabaseReader>() {
>                    @ProcessElement
>                    public void process(
>                            @Element Long input, 
    OutputReceiver<DatabaseReader> o) throws IOException { //Following
> MaxMind Documentation to read file
>                            File database = new File(“GeoIP2-City.mmdb”);
>                            DatabaseReader reader = new DatabaseReader
>                                    .Builder(database)
>                                    .fileMode(com.maxmind.db.Reader.FileMode.MEMORY)
>                                    .withCache(new CHMCache())
>                                    .build();
>                            o.output(reader);
>                    }
>                })) .apply(View.asSingleton());
> PCollection<TableRow> newTableRow =   
> tableRowBeforeTransformation.apply("mmdb lookup", ParDo.of(new
> MmdbReaderFn(reader)).withSideInputs(reader));
> DatabaseReader reader; PCollectionView<DatabaseReader> readerView;
> 
> public MmdbReaderFn(PCollectionView readerView){    this.readerView =
> readerView;
> 
> }
> 
> @ProcessElement public void processElement(ProcessContext context)
> throws IOException {    TableRow tableRow = context.element();   
> reader = context.sideInput(readerView);    String ip =
> String.valueOf(tableRow.get("ip"));    try {
>        InetAddress ipAddress = InetAddress.getByName(ip);
> 
>        CityResponse response = reader.city(ipAddress);
>        if(response == null) {
>            log.info("Invalid IP address");
>        } else {
> 
>            if(response.getCountry() != null && response.getCountry().getName() != null) {
>                tableRow.set(COUNTRY_NAME, response.getCountry().getName());      // 'Unite States'
>            } else {
>                tableRow.set(COUNTRY_NAME, null);
>            }
> 
>           
>            if(response.getCity() != null && response.getCity().getName() != null) {
>                tableRow.set(CITY_NAME, response.getCity().getName());
>            } else {
>                tableRow.set(CITY_NAME, null);
>            }
> 
>                } catch (Exception e) {
>        log.error("Error process GeoIP2", e);
> 
>    }    context.output(tableRow); }

共有1个答案

袁永贞
2023-03-14

如果预计130MB文件将来不会增长,您可以使用SideInput,这将避免管道外的rpc调用。

请注意,侧输入确实需要适合工人的内存,本文提供了一些关于内存需求分析的好内容。

如果文件定期更改,则可以使用缓慢更新的侧输入模式。

 类似资料:
  • 我在处理流时遇到了一些麻烦。本质上,我有一个

  • 我想把下面的代码转换成Java8s。

  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 我有两个收藏: 现在我的解决办法是: 我的问题:是否有其他方法来配对和收集Foo对象?

  • 假设我有这样一个列表: 是否可以使用Java8流从该列表中每隔一秒获取一个元素以获得以下内容? 或者甚至每三个元素? 基本上,我正在寻找一个函数来获取流的每n个元素:

  • 我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁): 库的用户需要编写一个实现方法的类,并将其传递给,库代码如下所示: 有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个对象要使用时调用。 所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象? 提前谢谢你。