我相信,这不是实施这种转变的最佳方式。我想知道是否有更好的方法从数据流作业中进行查找。
跟进:
试图将用例实现为侧输入:
> PcollectionView<DatabaseReader> reader = pipeline.apply(
> ParDo.of(
> new DoFn<Long, DatabaseReader>() {
> @ProcessElement
> public void process(
> @Element Long input,
OutputReceiver<DatabaseReader> o) throws IOException { //Following
> MaxMind Documentation to read file
> File database = new File(“GeoIP2-City.mmdb”);
> DatabaseReader reader = new DatabaseReader
> .Builder(database)
> .fileMode(com.maxmind.db.Reader.FileMode.MEMORY)
> .withCache(new CHMCache())
> .build();
> o.output(reader);
> }
> })) .apply(View.asSingleton());
> PCollection<TableRow> newTableRow =
> tableRowBeforeTransformation.apply("mmdb lookup", ParDo.of(new
> MmdbReaderFn(reader)).withSideInputs(reader));
> DatabaseReader reader; PCollectionView<DatabaseReader> readerView;
>
> public MmdbReaderFn(PCollectionView readerView){ this.readerView =
> readerView;
>
> }
>
> @ProcessElement public void processElement(ProcessContext context)
> throws IOException { TableRow tableRow = context.element();
> reader = context.sideInput(readerView); String ip =
> String.valueOf(tableRow.get("ip")); try {
> InetAddress ipAddress = InetAddress.getByName(ip);
>
> CityResponse response = reader.city(ipAddress);
> if(response == null) {
> log.info("Invalid IP address");
> } else {
>
> if(response.getCountry() != null && response.getCountry().getName() != null) {
> tableRow.set(COUNTRY_NAME, response.getCountry().getName()); // 'Unite States'
> } else {
> tableRow.set(COUNTRY_NAME, null);
> }
>
>
> if(response.getCity() != null && response.getCity().getName() != null) {
> tableRow.set(CITY_NAME, response.getCity().getName());
> } else {
> tableRow.set(CITY_NAME, null);
> }
>
> } catch (Exception e) {
> log.error("Error process GeoIP2", e);
>
> } context.output(tableRow); }
如果预计130MB文件将来不会增长,您可以使用SideInput,这将避免管道外的rpc调用。
请注意,侧输入确实需要适合工人的内存,本文提供了一些关于内存需求分析的好内容。
如果文件定期更改,则可以使用缓慢更新的侧输入模式。
我在处理流时遇到了一些麻烦。本质上,我有一个
我想把下面的代码转换成Java8s。
我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以
我有两个收藏: 现在我的解决办法是: 我的问题:是否有其他方法来配对和收集Foo对象?
假设我有这样一个列表: 是否可以使用Java8流从该列表中每隔一秒获取一个元素以获得以下内容? 或者甚至每三个元素? 基本上,我正在寻找一个函数来获取流的每n个元素:
我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁): 库的用户需要编写一个实现方法的类,并将其传递给,库代码如下所示: 有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个对象要使用时调用。 所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象? 提前谢谢你。