我将如何使用Spring执行Apache Beam管道到Google Cloud Dataflow?这个问题类似于在Google Data Flow上的Spring Boot项目中运行Apache Beam管道,但这个问题更关心的是从Spring控制器启动管道,而不是从CommandLineRunner启动管道。
@RestController
@RequestMapping("/task/import-csv-file")
public class ImportCsvController {
@PostMapping("/process-csv-file")
private ResponseEntity<Void> processCsvFile(
@RequestParam String gcsFileName,
@RequestParam String bucketName
) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("same-project-as-this-app-engine-instance");
options.setStagingLocation("gs://" + bucketName + "/binaries");
options.setRunner(DataflowRunner.class);
options.setJobname("process-csv");
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadFile", TextIO.read().from("gs://" + bucketName + "/" + gcsFileName));
// ... apply some more transforms here, which will eventually
// write csv rows as Google Datastore entities ...
pipeline.run().waitUntilFinish();
return ResponseEntity.ok().build();
}
}
我正在使用Google Cloud Tasks运行这个控制器,使用下面的代码:
@Service
public class TaskQueueService {
private Queue csvImportsQueue;
public TaskQueueService() {
this.csvImportsQueue = QueueFactory.getQueue("csv-import-queue");
}
public void queueImportCsvFile(String gcsFileName, String bucketName) {
String url = "/task/import-csv-file/process-csv-file";
TaskOptions taskOptions = TaskOptions.Builder
.withUrl(url)
.method(POST)
.param("gcsFileName", gcsFileName)
.param("bucketName", bucketName);
queue.add(ofy().getTransaction(), taskOptions);
}
}
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
Caused by: java.lang.IllegalArgumentException: Missing required value for [public abstract java.lang.String org.apache.beam.runners.dataflow.options.DataflowPipelineOptions.getProject(), "Project id. Required when running a Dataflow in the cloud. See https://cloud.google.com/storage/docs/projects for further details."].
编辑:我发现了一个不同的stacktrace,它有不同的错误消息。以下是整个stacktrace。
java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:155)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:149)
at com.example.application.controllers.tasks.ImportCsvController.processCsvFile(ImportCsvController.java:64)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:877)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
at com.example.application.filters.SwitchUserProfileFilter.doFilter(SwitchUserProfileFilter.java:127)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:127)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:119)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:170)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLLogoutFilter.processLogout(SAMLLogoutFilter.java:168)
at org.springframework.security.saml.SAMLLogoutFilter.doFilter(SAMLLogoutFilter.java:110)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLDiscovery.doFilter(SAMLDiscovery.java:137)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLLogoutProcessingFilter.processLogout(SAMLLogoutProcessingFilter.java:209)
at org.springframework.security.saml.SAMLLogoutProcessingFilter.doFilter(SAMLLogoutProcessingFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.SAMLEntryPoint.doFilter(SAMLEntryPoint.java:102)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.metadata.MetadataDisplayFilter.doFilter(MetadataDisplayFilter.java:84)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.saml.metadata.MetadataGeneratorFilter.doFilter(MetadataGeneratorFilter.java:87)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:66)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at com.example.application.filters.CustomDomainFilter.doFilterInternal(CustomDomainFilter.java:43)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:357)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:270)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:117)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.access$000(ErrorPageFilter.java:61)
at org.springframework.boot.web.servlet.support.ErrorPageFilter$1.doFilterInternal(ErrorPageFilter.java:92)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.boot.web.servlet.support.ErrorPageFilter.doFilter(ErrorPageFilter.java:110)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at com.googlecode.objectify.ObjectifyFilter.doFilter(ObjectifyFilter.java:48)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at com.google.apphosting.runtime.jetty9.ParseBlobUploadHandler.handle(ParseBlobUploadHandler.java:119)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1182)
at com.google.apphosting.runtime.jetty9.AppEngineWebAppContext.doHandle(AppEngineWebAppContext.java:187)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:293)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.eclipse.jetty.server.Server.handle(Server.java:539)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333)
at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:213)
at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:134)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:757)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:720)
at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:690)
at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:882)
at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:270)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:214)
... 123 common frames omitted
Caused by: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.expireAfterWrite(Ljava/time/Duration;)Lcom/google/common/cache/CacheBuilder;
at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.<init>(GoogleCloudStorageImpl.java:149)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:243)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.<init>(GcsUtil.java:82)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:104)
at org.apache.beam.sdk.extensions.gcp.util.GcsUtil$GcsUtilFactory.create(GcsUtil.java:87)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
at com.sun.proxy.$Proxy164.getGcsUtil(Unknown Source)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.tryCreateDefaultBucket(GcpOptions.java:354)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:300)
at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:288)
at org.apache.beam.sdk.options.ProxyInvocationHandler.returnDefaultHelper(ProxyInvocationHandler.java:592)
at org.apache.beam.sdk.options.ProxyInvocationHandler.getDefault(ProxyInvocationHandler.java:533)
at org.apache.beam.sdk.options.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:158)
at com.sun.proxy.$Proxy149.getGcpTempLocation(Unknown Source)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:249)
... 128 common frames omitted
编辑2:我在一个源文件中键入了这两个导入:
import com.google.common.cache.CacheBuilder;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl;
然后我按下cmd+click,转到Intellij中的实现,我发现方法重载并不存在(正如上面长长的stacktrace中所隐藏的那样)。
private final LoadingCache<String, Boolean> autoBuckets =
CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.build(
new CacheLoader<String, Boolean>() {
final List<String> iamPermissions = ImmutableList.of("storage.buckets.get");
@Override
public Boolean load(String bucketName) throws Exception {
try {
gcs.buckets()
.testIamPermissions(bucketName, iamPermissions)
.executeUnparsed()
.disconnect();
} catch (IOException e) {
return errorExtractor.userProjectMissing(e);
}
return false;
}
});
~/.gradle/caches/modules-2/files-2.1/com.google.guava/guava/29.0-android/63f9bc5fbf2ebfe6b17683f8eac8419588295a28/guava-29.0-android-sources.jar!/com/google/common/cache/cachebuilder.java
public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
checkState(
expireAfterWriteNanos == UNSET_INT,
"expireAfterWrite was already set to %s ns",
expireAfterWriteNanos);
checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
this.expireAfterWriteNanos = unit.toNanos(duration);
return this;
}
编辑3:以下是我正在使用的Apache Beam版本:
dependencies {
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.20.0'
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.20.0'
}
要解决这个问题,您应该使用非Android Guava版本-29.0-jre
,因为GCSIO库依赖于这个Guava版本。
我最近偶然发现了一个非常扭曲的Spring批次问题。要求如下: 我有两个主要步骤: 第一个从oracle数据库中读取一些数据,从一个表中写入另一个表 第二个基于第一步处理的数据,完成其他一些数据库工作 从设计的角度来看,第一步如下所示: 复合项目编写器: 虽然前两位作者并不复杂,但我的兴趣集中在第三位。 正如您可能已经猜到的,这一个将用于获取之前正在处理的一些数据,以便在我的第二步中进行升级: 问
如何计算所有Spring控制器的Spring控制器执行时间?假设我的应用程序中有200个Spring REST控制器,我如何计算每个控制器的执行时间并将时间保存在数据库中?
在我看来,将myObject序列化为JSON和f的Spring代码将同时尝试在get()返回时访问myObject。除了返回MyObject的深度副本之外,还有什么方法可以防止这种情况发生吗?
问题内容: 我了解AngularJS会在某些代码中运行两次,有时甚至更多,例如事件,不断检查模型状态等。 但是我的代码: 执行两次,将2条记录插入我的数据库。很显然,我一直在为此奋斗,我一直在学习! 问题答案: 应用路由器将导航指定为如下所示: 但我在: 这使控制器消化了两次。从HTML中删除属性可以解决此问题。或者,可以从路由指令中删除该属性。 使用选项卡式导航时也会出现此问题。例如,可能包含:
如果可以从Spring返回,我很好奇 这样做可以吗?我试过了,Spring返回的不是流的值。 我应该继续返回
我已经升级到Spring靴2,这反过来又更新了执行器。我使用了Spring Boot1.*执行器度量字段“处理器”,如下所示: