当前位置: 首页 > 知识库问答 >
问题:

Spring重试模板阻止我的响应队列

单凯捷
2023-03-14

我想要实现的是,我有一个来自UI的REST调用,它调用添加一个用户。因此,用户必须执行异步队列(这是一个约束),但在将结果发送回UI之前,必须等待响应队列一段配置的时间并对其进行处理。如果队列返回时引用号为空,那么我必须删除用户记录并抛出异常,说明用户无效。如果响应返回时引用有效(或者超时发生),那么我假设它有效并返回success。

我有一个应用程序,我在其中发送一条队列消息,以获取我的用户对象的referenceNumber。然后等待队列响应,然后再回复REST调用。但是,我必须等待配置的时间,等待队列响应返回。

UserManagerImpl

// REST CALL to persist
public User Persist(User user) {
...
...
 // Building the message for sending to QUEUE
 UserEnvelopeV1_0 userEnvelope =buildUserEnvelope(user);
// This is the place i send the queue message
userQueueClient.send(userEnvelope);
// Update Request time
updateRequestDetails(user.getUserId);
// This is the call i am going retry
boolean userValid = userRetryTemplate.doUserReferenceRetry(userId);
if (!userValid ) {
                  //remove User Object
                  throw Exception
                }
...
}

// update the request time for reference Number
private void updateRequestDetails(String userId) {
 User user = userRepository.findById(userId);
        if (user != null) {
            user.setRefRequestDateItem(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }

public void updateReference(String userId, String referenceNumber) {

        User user = userRepository.findById(userId);
        if (user != null) {
            user.setReference(referenceNumber);
            user.setResponseDate(DateHelper.createXMLGregorianCalendar());
            userRepository.saveAndFlush(user);
        }
    }

用户队列客户端:

@Component
public class UserQueueClient {



    @JmsListener(id = "#{T(java.util.UUID).nameUUIDFromBytes('${in.res}",
            destination = "${in.res}", containerFactory = "containerFactory")
    public void receive(Message message, UserEnvelopeV1_0 envelope) throws{


        try {
            String userId = envelope.getHeader().getMessageIdentification().getUserId();
 ApplicationInformationStructure applicationInformation = envelope.getBody().getApplicationInformation();

if(CollectionUtils.isNotEmpty(applicationInformation.getApplicationInformationResult())) {
          String referenceNumber = applicationInformation.getApplicationInformationResult().getRefNumber();      

                userManager.updateReference(userId, referenceNumber);
            }

        } catch (Exception e) {
            //
        }
    }

    @Transactional(propagation = Propagation.MANDATORY)
    public void send(UserEnvelopeV1_0 sarsSoapEnvelope) throws JMSException {


        envelope.setHeader();

        Message message = sendToQueue(envelope, requestQueue, responseQueue,
                userId);

        applicationEventPublisher.publishEvent(new MessageLogEvent("USER_GET_REF_NUMBER", message, MessageType.XML,
                requestQueue, MessageDirection.SEND, true, false, new Date(), userId));

    }
}

UserRetryTemplate



@Component
public class UserRetryTemplate {


    @Value("${retry.max.attempts:5}")
    private int maxAttempts;

    @Value("${response.waiting.time.in.seconds:60}")
    private long maxDelay;

    @Autowired
    private UserRepository userRepository;

    private static final long INITIAL_INTERVAL = 2000L;

    public RetryTemplate retryTemplate() {

        // Max timeout in milliseconds
        long maxTimeout = maxDelay*1000;

        //double multiplier = (maxTimeout - INITIAL_INTERVAL)/((maxAttempts-2)*6000);

        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(maxAttempts);


        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(maxTimeout/(maxAttempts-1));

        RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        return template;
    }

    public boolean doUserReferenceRetry(String userId) {
        boolean isUserReferenceValid = true;
        try {
            boolean isValidUser = retryTemplate().execute(context -> {
                logger.info("Attempted {} times", context.getRetryCount());
                User user = userRepository.findById(userId);
                logger.info("User Retry :" + user);

                if (user.getResponseDateItem() == null || user.getReferenceNumber == null) {
                    logger.info("response not yet received");
                    throw new IllegalStateException("User Response not yet received");
                }
                if (user.getReferenceNumber != null)) {
                    return true;
                }
                throw new IllegalStateException("Response not yet received");
            });
            return isUserReferenceValid ;
        } catch (IllegalArgumentException e) {

        }
        return true;
    }

}

因此,我实现了一个逻辑,在这个逻辑中,我将发送队列消息,并执行Spring重试(针对配置的时间),以检查数据库中的referenceNumber是否在DB中更新。此外,当队列响应返回时,我将使用referenceNumber更新数据库。

但是,当我实现上述逻辑时,spring重试会一直重试到配置的时间,但我的spring应用程序不会处理任何响应队列。Spring应用程序是否可以并行运行这两个进程。

问题是,如果我删除Spring重试机制,响应队列正在处理我的响应,并用引用号更新用户记录。

但是当我添加重试逻辑时,响应队列不再处理我的队列。

共有1个答案

梁丘飞鸾
2023-03-14

我发现下面的线令人困惑。

“在这里,我将发送队列消息并执行Spring重试(针对配置的时间),以检查数据库中的referenceNumber是否在DB中更新。此外,当队列响应返回时,我将使用referenceNumber更新DB。”

在一行中,你说你正在等待参考号更新,在另一行中,你说你正在更新数据库。这里谁是生产者?有两个不同的线程吗?生产者和消费者在这种情况下是你。

如果您想阻塞当前线程的配置时间,可以考虑使用轮询(长超时,时间单位)的阻塞队列吗?

poll(long timeout, TimeUnit unit) – retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

请编辑问题与足够的细节。

 类似资料:
  • 问题内容: 创建多处理/ GUI编码系统的最佳方法是什么? 我想为互联网社区创建一个场所,以找到有关如何在python中使用该模块的示例。 我已经在互联网上看到了几个简单的全局函数处理过程的小例子,这些过程在一个主模块中被调用,但是我发现这很少会轻易地转化为任何人实际上对GUI所做的任何事情。我认为许多程序将具有在单独的过程中作为对象方法使用的功能(可能是其他对象的集合等),也许单个GUI元素将具

  • 相当新的Spring开发者.. 过去几天我一直在使用Spring,并设法使用JPA和Spring Rest创建了一个简单的CRUD API。现在,我希望能够灵活地改变返回的JSON的组成方式。 例如,我有以下简单实体: GET请求返回以下JSON: 现在我想删除部分并添加其他内容。 这在Spring可能吗? 课程: FaqsCategory(实体) FaqsCategoryRepository

  • 这是一个非阻塞的解决方案,但看起来并不那么优雅。它能以某种方式改进/简化吗?

  • 因此,在我的客户端代码中,我有: 一切正常。之后我要测试错误。因此,我删除了授权头。 当我用像postman这样的工具进行测试时,我会收到401响应。但是对于我的rest模板,我只收到一个IllegalArgumentException。 我也测试了ResponseErrorHandler。 所以我的问题是如何使用rest模板找到我的401错误响应。 这里有个例外: 和堆栈跟踪:

  • 我有一个方法,调用一个endpoint来发布客户我怎么才能只得到消息"EMAIL ALREADY EXISTS"从响应体的Rest模板,以便在FacesContext中显示它 这是回应机构

  • 我正在使用RabbitMQ和Spring amqp,我希望它们不会丢失消息。通过对重试使用指数回退策略,我可能会阻止我的消费者,因为他们可能正在处理他们可以处理的消息。我想给失败的消息几天时间重试指数回退策略,但我不想让使用者阻塞几天,我想让它继续对其他消息工作。 我知道我们可以通过ActiveMQ(在将来的某个时刻重试消息(ActiveMQ))实现这种功能,但无法为RabbitMQ找到类似的解决