我可以同时运行我的作业步骤,但我有点担心,如果我用不同的参数同时启动同一作业的多个实例,它将如何工作。
我正在使用importExchange作业导入Exchange数据,但如果我同时为不同的市场如美国市场、欧洲marktet启动importExchange作业。
Partitioner将输入exchange名称分区到不同的分区步骤执行上下文中,MessagePartitionHandler将stepExecutionRequests作为消息通过rabbitmq队列发送到不同的服务器,并在不同的服务器上并发执行步骤。
由于我们使用的是直接通道和交换,响应将只传递给一个侦听器,那么job1的响应会被Job2的侦听器拾取吗?
或者是否有路由器或过滤器选择性地选择回复?
我需要担心这个还是MessageChannelPartitionHandler来处理这个?或者我应该在应答队列前缀作业id?
<task:executor id="taskExecutor" pool-size="20" />
<int:channel id="importExchangesOutboundChannel">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:channel id="importExchangesInboundStagingChannel" />
<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<beans:bean id="importExchangesMessagingTemplate"
class="org.springframework.integration.core.MessagingTemplate"
p:defaultChannel-ref="importExchangesOutboundChannel"
p:receiveTimeout="150000" />
<beans:bean id="importExchangesPartitioner"
class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
scope="step" />
<beans:bean id="importExchangesPartitionHandler"
class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
p:stepName="importExchangesStep" p:gridSize="6"
p:messagingOperations-ref="importExchangesMessagingTemplate" />
<int:aggregator ref="importExchangesPartitionHandler"
send-partial-result-on-expiry="true" send-timeout="300000"
input-channel="importExchangesInboundStagingChannel" />
<amqp:inbound-gateway concurrent-consumers="6"
request-channel="importExchangesInboundChannel" receive-timeout="300000"
reply-channel="importExchangesOutboundStagingChannel" queue-names="importExchangesQueue"
connection-factory="rabbitConnectionFactory"
mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />
<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>
<int:channel id="importExchangesInboundChannel" />
<int:service-activator ref="stepExecutionRequestHandler"
input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />
<int:channel id="importExchangesOutboundStagingChannel" />
<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />
<rabbit:direct-exchange name="${import.exchanges.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${import.exchanges.queue}"
key="${import.exchanges.routing.key}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<beans:bean id="stepExecutionRequestHandler"
class="org.springframework.batch.integration.partition.StepExecutionRequestHandler"
p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator" />
<beans:bean id="stepLocator"
class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
scope="step" />
<beans:bean id="importExchangesFileItemReader"
class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />
<step id="importExchangesStep">
<tasklet transaction-manager="transactionManager">
<chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
commit-interval="${import.exchanges.commit.interval}" />
</tasklet>
</step>
<job id="importExchangesJob" restartable="true">
<step id="importExchangesStep.master" next="importEclsStep.master">
<partition partitioner="importExchangesPartitioner"
handler="importExchangesPartitionHandler" />
</step>
</job>
<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>
我的工作ID是2014-06-08和2014-06-09。我在文件夹名为2014-06-08和2014-06-09下创建了Exchanges.txt。
/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt
/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt
/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt文件中的数据是
1
2
3
up to 30
在/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt中
31
32
33
up to 60
<beans:bean id="importExchangesFileItemReader"
class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.ecls.reply.timeout}"
p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
scope="step" />
@Override
public void write(List<? extends T> exchanges) throws Exception {
commandRunner.setLogFilePath(this.logFilePath);
for (T exchange : exchanges) {
String command = commandRunner.getConsolePath() + " "
+ "st:import exchange" + " " + exchange.toString();
commandRunner.run(command, this.replyTimeout);
}
}
里面的指挥官,
public void run(String command, long replyTimeout)
throws Exception {
String[] commands = command.split("\\s+");
ProcessBuilder pb = new ProcessBuilder(commands);
File log = new File(this.logFilePath);
pb.redirectErrorStream(true);
pb.redirectOutput(Redirect.appendTo(log));
Process p = pb.start();
.......
}
如果我只启动一个作业实例(8个,批处理id为2015-06-08或2015-06-09),一切都很好,但是如果我同时启动这两个作业实例的步骤输入数据会混合,我的意思是,这是我在日志文件中得到的
尾部-F/var/log/st/batch.log.2015-06-08
14 23 1 27 19 9 15 24 2 10 28 20 25 16 3 21 29 11 26 17 4 30 12 22 18 5 44 45 46
52 13 47 31 37 6 53 57 48 32 38 54 7 49 58 33 39 55 8 59 50 34 40 56 60 51 35 42 41 36 43
<beans:bean id="importExchangesItemWriter"
class="com.st.batch.foundation.ImportExchangesItemWriter"
p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
scope="step" />
它是由于出站和入站网关而发生的吗?spring集成通道、网关等的不同实例是为不同的作业实例创建的,还是像所有作业实例都相同的rabbitmq队列?
入站网关具有concurrent-consumers=“8”这些使用者对于所有作业实例是相同的,还是将为每个作业实例创建单独的8个使用者?
这个处理程序可以为多个作业分区吗?
<beans:bean id="importExchangesPartitioner"
class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
scope="step" />
这一切都由框架为您处理好了。当分区处理程序发出步骤执行请求时,他在头中设置一个相关id...
.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())
聚合器使用它将所有响应(针对此作业)聚合成一条消息,当收到所有响应时,该消息将被释放给分区处理程序。
Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);
这是使用序列大小标头实现的。
我配置了一个spring批处理作业,它在spring WebService中运行。这项工作有几个步骤。我已经在不同的tomcats中部署了这个webservice的两个实例(但两个实例都使用相同的mysql数据库)。 我希望用不同的参数在两个tomcats中同时运行spring批处理作业(每个tomcats中一个)。我没有使用分区,每个作业的参数是完全不同的。 我开始工作在一个汤姆猫和一切看起来很
使用spring batch/spring boot,是否可以在每个线程中使用不同的多次启动具有读取器、处理器和写入器的spring batch作业? 我的用例: 我有许多不同的文件夹,我需要观看。如果新文件进入一个文件夹,我需要调用该作业,并在作业处理期间锁定该文件夹。 这可能发生在不同数量的文件夹中,这就是为什么我需要一个spring批处理作业的多个实例,但每次使用不同的。 每个文件夹一个作业
我们的要求是同时写多个文件。我们正在使用spring批处理来编写文件,并且我们正在从不同的线程中启动spring批处理。每个线程都有自己的应用程序上下文。因此我们可以确保单例bean不会跨多个线程共享。下面是我的代码片段。 这就是我们调用spring批处理的方式。 ThreadPoolExecutor TPE=new ThreadPoolExecutor(10,10,1000000,TimeUni
我是冲刺批次的新手,我找不到问题的答案。 我正在尝试使用Spring引导和Spring批处理实现JOB。我的JOB需要一个参数,所以我像这样执行应用程序: java-jar-Dspring。配置文件。活动=gus/应用程序/botbit批处理/botbit-batch-1.0.0。jar——Spring。一批工作名称=persistCustomerSuccessMetrics日期=2015年12月
问题内容: 我有一个名为“开发”的工作,另一个名为“代码分析”的项目。目前,我们有两个不同的作业和不同的工作区,但是代码相同。有什么办法可以将同一工作空间用于多个作业? 我检查了Jenkins中可用的插件,但是没有找到合适的插件。 问题答案: 假设您的“开发” Jenkins工作空间为。在“代码分析”作业配置页面中,在选项卡下单击并选择选项,并提供与“开发”作业相同的工作空间。
我有两个不同的工作(实际上更多,但为了简单起见,假设2)。每个作业可以与另一个作业并行运行,但同一作业的每个实例应该顺序运行(否则实例将共享彼此的资源)。 基本上,我希望这些作业中的每一个都有自己的作业实例队列。我想我可以使用两个不同的线程池作业启动程序(每个都有一个线程),并将一个作业启动程序与每个作业相关联。 在从Spring Batch Admin web UI中启动作业时,是否有一种方法可