当前位置: 首页 > 知识库问答 >
问题:

如何同时运行共享相同读取器和写入器实例的spring批处理作业?

杭令
2023-03-14

这就是我现有系统的工作方式。

我使用spring批处理编写了批处理,它将消息异步写入队列。Writer一旦向queue发送了一定数量的消息,就开始监听LINKED_BLOCKING_QUEUE中相同数量的消息。

我有spring amqp监听器,它使用并处理消息。一旦处理完毕,使用者就会在回复队列中回复。有侦听器侦听应答队列,以检查消息是否被成功处理。应答侦听器检索响应并将其添加到LINKED_BLOCKING_QUEUE,然后由Writer获取。一旦writer获取所有响应,批处理完成。如果有异常,它将停止批处理。

<beans:bean id="computeListener" class="com.st.symfony.Foundation"
    p:symfony-ref="symfony" p:replyTimeout="${compute.reply.timeout}" />

<rabbit:queue name="${compute.queue}" />
<rabbit:queue name="${compute.reply.queue}" />

<rabbit:direct-exchange name="${compute.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${compute.queue}" key="${compute.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container
    connection-factory="rabbitConnectionFactory" concurrency="${compute.listener.concurrency}"
    requeue-rejected="false" prefetch="1">
    <rabbit:listener queues="${compute.queue}" ref="computeListener"
        method="run" />
</rabbit:listener-container>


<beans:beans profile="master">

    <beans:bean id="computeLbq" class="java.util.concurrent.LinkedBlockingQueue" />

    <beans:bean id="computeReplyHandler" p:blockingQueue-ref="computeLbq"
        class="com.st.batch.foundation.ReplyHandler" />

    <rabbit:listener-container
        connection-factory="rabbitConnectionFactory" concurrency="1"
        requeue-rejected="false">
        <rabbit:listener queues="${compute.reply.queue}" ref="computeReplyHandler"
            method="onMessage" />
    </rabbit:listener-container>


    <beans:bean id="computeItemWriter"
        class="com.st.batch.foundation.AmqpAsynchItemWriter"
        p:template-ref="amqpTemplate" p:queue="${compute.queue}"
        p:replyQueue="${compute.reply.queue}" p:exchange="${compute.exchange}"
        p:replyTimeout="${compute.reply.timeout}" p:routingKey="${compute.routing.key}"
        p:blockingQueue-ref="computeLbq"
        p:logFilePath="${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/log.txt"
        p:admin-ref="rabbitmqAdmin" scope="step" />


    <job id="computeJob" restartable="true">
        <step id="computeStep">
            <tasklet transaction-manager="transactionManager">
                <chunk reader="computeFileItemReader" processor="computeItemProcessor"
                    writer="computeItemWriter" commit-interval="${compute.commit.interval}" />
            </tasklet>
        </step>
    </job>      


</beans:beans>

这是我的writer代码,

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {

    protected String exchange;
    protected String routingKey;
    protected String queue;
    protected String replyQueue;
    protected RabbitTemplate template;
    protected AmqpAdmin admin;
    BlockingQueue<Object> blockingQueue;
    String logFilePath;
    long replyTimeout;


    // Getters and Setters

    @Override
    public void write(List<? extends T> items) throws Exception {

        for (T item : items) {


            Message message = MessageBuilder
                    .withBody(item.toString().getBytes())
                    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                    .setReplyTo(this.replyQueue)
                    .setCorrelationId(item.toString().getBytes()).build();

            template.send(this.exchange, this.routingKey, message);
        }

        for (T item : items) {

            Object msg = blockingQueue
                    .poll(this.replyTimeout, TimeUnit.MILLISECONDS);

            if (msg instanceof Exception) {

                admin.purgeQueue(this.queue, true);
                throw (Exception) msg;

            } else if (msg == null) {
                throw new Exception("reply timeout...");
            } 

        }

        System.out.println("All items are processed.. Command completed.  ");

    }

}

监听器pojo

public class Foundation {

    Symfony symfony;

    long replyTimeout;

    //Getters Setters

    public Object run(String command) {

        System.out.println("Running:" + command);

        try {
            symfony.run(command, this.replyTimeout);
        } catch (Exception e) {
            return e;
        }

        return "Completed : " + command;
    }
}

这是回复处理程序

public class ReplyHandler {

    BlockingQueue<Object> blockingQueue;

    public void onMessage(Object msgContent) {

        try {

            blockingQueue.put(msgContent);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }
    }

}

对于并行运行的多个批处理实例,我们是否可以使用相同的项编写器实例,这些项编写器使用相同的阻塞队列实例?

共有1个答案

巩光誉
2023-03-14

您可能需要查看JMS消息选择器。

根据单据

createConsumer和createDurableSubscriber方法允许您在创建消息使用者时将消息选择器指定为参数。

 类似资料:
  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • 我有“N”没有的。客户/客户。对于每个客户/客户,我需要从数据库(读取器)中获取记录,然后我必须处理(处理器)客户/客户的所有记录,然后我必须将记录写入文件(写入器)。 如何将spring批处理作业循环N次?

  • 我刚开始使用Spring批处理,我有一个特殊问题。我希望使用从3个不同的jpa查询中获取结果,并分别处理它们,然后使用将它们写入一个统一的XML文件。 对于eg,生成的XML看起来像是,

  • 项目读取器将数据从特定源代码读入Spring批处理应用程序,而项目写入器将数据从Spring Batch应用程序写入特定目标。 Item处理器是一个包含处理代码的类,该代码处理读入spring批处理的数据。 如果应用程序读取条记录,则处理器中的代码将在每条记录上执行。 块(chunk)是该tasklet的子元素。 它用于执行读取,写入和处理操作。 可以在如下所示的步骤中配置使用此元素的读取器,写入

  • 如何写这个问题?老实说,我不明白这个问题的意思。A) 编写读者和作者优先于读者的解决方案,并评论每个信号量的功能。(记住变量和信号量的定义和初始化)B)读卡器的优先级意味着什么?当一个作家在写作时,到达的读者会发生什么?当编写器结束其操作时会发生什么?

  • 根据已接受的答案代码,对该代码的以下调整对我起作用: 我已经将这个问题更新到了一个可以正确循环的版本,但是由于应用程序将扩展,能够处理并行是很重要的,我仍然不知道如何在运行时用javaconfig动态地做到这一点... 基于查询列表(HQL查询),我希望每个查询都有一个读取器-处理器-写入器。我当前的配置如下所示: 工单 处理机 作家 目前,该过程对于单个查询来说工作得很好。然而,我实际上有一个查