当前位置: 首页 > 知识库问答 >
问题:

从Spring控制器执行Google Cloud Dataflow pipeline

郭业
2023-03-14

我将如何使用Spring执行Apache Beam管道到Google Cloud Dataflow?这个问题类似于在Google Data Flow上的Spring Boot项目中运行Apache Beam管道,但这个问题更关心的是从Spring控制器启动管道,而不是从CommandLineRunner启动管道。

@RestController
@RequestMapping("/task/import-csv-file")
public class ImportCsvController {
    @PostMapping("/process-csv-file")
    private ResponseEntity<Void> processCsvFile(
            @RequestParam String gcsFileName,
            @RequestParam String bucketName
    ) {
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

        options.setProject("same-project-as-this-app-engine-instance");
        options.setStagingLocation("gs://" + bucketName + "/binaries");
        options.setRunner(DataflowRunner.class);
        options.setJobname("process-csv");

        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("ReadFile", TextIO.read().from("gs://" + bucketName + "/" + gcsFileName));
        // ... apply some more transforms here, which will eventually 
        // write csv rows as Google Datastore entities ...
        pipeline.run().waitUntilFinish();
        return ResponseEntity.ok().build();
    }
}

我正在使用Google Cloud Tasks运行这个控制器,使用下面的代码:

@Service
public class TaskQueueService {
    private Queue csvImportsQueue;

    public TaskQueueService() {
        this.csvImportsQueue = QueueFactory.getQueue("csv-import-queue");
    }

    public void queueImportCsvFile(String gcsFileName, String bucketName) {
        String url = "/task/import-csv-file/process-csv-file";
        TaskOptions taskOptions = TaskOptions.Builder
                .withUrl(url)
                .method(POST)
                .param("gcsFileName", gcsFileName)
                .param("bucketName", bucketName);
        queue.add(ofy().getTransaction(), taskOptions);
    }
}
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
Caused by: java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.String org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.getProject(), "Project id. Required when running a Dataflow in the cloud. See https://cloud.google.com/storage/docs/projects for further details."]. 

编辑:我发现了一个不同的stacktrace,它有不同的错误消息。以下是整个stacktrace。

java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
    at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
    at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
    at org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
    at com.example.application.controllers.tasks.ImportCsvController.processCsvFile(ImportCsvController.java:64)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:877)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
    at com.example.application.filters.SwitchUserProfileFilter.doFilter(SwitchUserProfileFilter.java:127)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutFilter.processLogout(SAMLLogoutFilter.java:168)
    at org.springframework.security.saml.SAMLLogoutFilter.doFilter(SAMLLogoutFilter.java:110)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLDiscovery.doFilter(SAMLDiscovery.java:137)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.processLogout(SAMLLogoutProcessingFilter.java:209)
    at org.springframework.security.saml.SAMLLogoutProcessingFilter.doFilter(SAMLLogoutProcessingFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.SAMLEntryPoint.doFilter(SAMLEntryPoint.java:102)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataDisplayFilter.doFilter(MetadataDisplayFilter.java:84)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.saml.metadata.MetadataGeneratorFilter.doFilter(MetadataGeneratorFilter.java:87)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at com.example.application.filters.CustomDomainFilter.doFilterInternal(CustomDomainFilter.java:43)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.access$000(ErrorPageFilter.java:61)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:92)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:119)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
    at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:539)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
    at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:213)
    at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
    at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:757)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:720)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:690)
    at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:882)
    at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:270)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException: null
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
    ... 123 common frames omitted
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
    at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
    at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy164.getGcsUtil(Unknown Source)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.tryCreateDefaultBucket(GcpOptions.java:354)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:300)
    at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:288)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
    at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
    at com.sun.proxy.$Proxy149.getGcpTempLocation(Unknown Source)
    at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:249)
    ... 128 common frames omitted

编辑2:我在一个源文件中键入了这两个导入:

import com.google.common.cache.CacheBuilder;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;

然后我按下cmd+click,转到Intellij中的实现,我发现方法重载并不存在(正如上面长长的stacktrace中所隐藏的那样)。

private final LoadingCache<String, Boolean> autoBuckets =
    CacheBuilder.newBuilder()
        .expireAfterWrite(Duration.ofHours(1))
        .build(
            new CacheLoader<String, Boolean>() {
              final List<String> iamPermissions = ImmutableList.of("storage.buckets.get");

              @Override
              public Boolean load(String bucketName) throws Exception {
                try {
                  gcs.buckets()
                      .testIamPermissions(bucketName, iamPermissions)
                      .executeUnparsed()
                      .disconnect();
                } catch (IOException e) {
                  return errorExtractor.userProjectMissing(e);
                }
                return false;
              }
            });

~/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/29.0-android/63f9bc5fbf2ebfe6b17683f8eac8419588295a28/guava-29.0-android-sources.jar!/com/google/common/cache/cachebuilder.java

public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
  checkState(
      expireAfterWriteNanos == UNSET_INT,
      "expireAfterWrite was already set to %s ns",
      expireAfterWriteNanos);
  checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
  this.expireAfterWriteNanos = unit.toNanos(duration);
  return this;
}

编辑3:以下是我正在使用的Apache Beam版本:

dependencies {
    compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.20.0'
    compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.20.0'
}

共有1个答案

鲁炳
2023-03-14

要解决这个问题,您应该使用非Android Guava版本-29.0-jre,因为GCSIO库依赖于这个Guava版本。

 类似资料:
  • 我最近偶然发现了一个非常扭曲的Spring批次问题。要求如下: 我有两个主要步骤: 第一个从oracle数据库中读取一些数据,从一个表中写入另一个表 第二个基于第一步处理的数据,完成其他一些数据库工作 从设计的角度来看,第一步如下所示: 复合项目编写器: 虽然前两位作者并不复杂,但我的兴趣集中在第三位。 正如您可能已经猜到的,这一个将用于获取之前正在处理的一些数据,以便在我的第二步中进行升级: 问

  • 如何计算所有Spring控制器的Spring控制器执行时间?假设我的应用程序中有200个Spring REST控制器,我如何计算每个控制器的执行时间并将时间保存在数据库中?

  • 在我看来,将myObject序列化为JSON和f的Spring代码将同时尝试在get()返回时访问myObject。除了返回MyObject的深度副本之外,还有什么方法可以防止这种情况发生吗?

  • 问题内容: 我了解AngularJS会在某些代码中运行两次,有时甚至更多,例如事件,不断检查模型状态等。 但是我的代码: 执行两次,将2条记录插入我的数据库。很显然,我一直在为此奋斗,我一直在学习! 问题答案: 应用路由器将导航指定为如下所示: 但我在: 这使控制器消化了两次。从HTML中删除属性可以解决此问题。或者,可以从路由指令中删除该属性。 使用选项卡式导航时也会出现此问题。例如,可能包含:

  • 如果可以从Spring返回,我很好奇 这样做可以吗?我试过了,Spring返回的不是流的值。 我应该继续返回

  • 我已经升级到Spring靴2,这反过来又更新了执行器。我使用了Spring Boot1.*执行器度量字段“处理器”,如下所示: