当前位置: 首页 > 知识库问答 >
问题:

带有TimerService的Java EE多个SessionBeans

养翔
2023-03-14

我有一个带有TimerService的无状态会话Bean。在超时时,它开始使用JMS队列。在处理消息时,它需要访问可能暂时不可用的外部资源。timeout方法在循环中调用MessageConsumer.ReceiveNowAit(),直到:

  • 没有更多的消息要处理:它注册一个新的定时器=现在+10分钟。和结尾。
  • 处理过程中发生错误:它回滚消息并注册一个新的计时器:现在+30分钟。和结尾。

通过这种方式,我可以控制何时重新启动,而且由于TimerService回调,我没有Hibernate线程。

我希望多次出现此会话bean,以预测队列上的瓶颈:

                    +-----<ejb>-------+
                    | timerService    |
                    |                 |              +---------------------+
                ----| onTimeout() {}  | -----------> | external dependency |
               /    |                 |         /    +---------------------+ 
              /     +-----------------+        /
             /                                /
+---------+ /                                /
|||queue|||K                                /
+---------+ \                              /
             \      +-----<ejb>-------+   /
              \     | timerService    |  /
               \    |                 | /
                ----| onTimeout() {}  | 
                    |                 |
                    +-----------------+
@Stateless
public class MyJob {
    
    @Resource
    private TimerService timerService;

    @PostConstruct
    public void init() {
        registerNewTimer(1000L); // -> problem: timerService not accessible
        System.out.println("Initial Timer created");
    }

    private void registerNewTimer(long duration) {
        TimerConfig config = new TimerConfig();
        config.setPersistent(false);
        timerService.createSingleActionTimer(duration, config);
    }

    @Timeout
    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public void execute() {
        try {
            // instantiate JMS session and consumer
            while ((message = consumer.receiveNoWait()) != null) {
                // business logic with message
                message.acknowledge();
            }
            // queue empty, let's stop for now and register a new timer + 10min
            registerNewTimer(10*60*1000);
        } catch (ResourceException re) {
            // external resource unavailable, let's wait 30min
            registerNewTimer(30*60*1000);
            // last message not acknowledged, so rolled back
        }
    }
}

问题:错误出现在@postconstruct带注释的init()方法中:此时不允许使用TimerService。当我将sessionbean设为@singleton时,这是允许的,但随后我就失去了并行处理队列的可能性。有没有人有办法解决这个问题?如果TimerService不是正确的机制,还有什么可供选择的。是否有一个PostConstruct替代方案,允许访问引用的资源,并且在实例化之后只调用一次?

提前感谢任何建设性的信息。

共有1个答案

葛高澹
2023-03-14

我自己找到了一个解决方案,所以我把它贴在这里,这样它可能会帮助其他人。

添加了一个新的单例Startup bean,它包含一个列表,其中列出了通过CDI实现MyJobExecutor接口的所有bean。在JEE环境中,CDI可以很好地与EJB一起工作,因此它可以注入会话bean!警告:CDI只会注入直接实现MyJobExecutor的类,因此如果您有一个实现MyJobExecutor的抽象类和一个从这个抽象类扩展的具体bean,它就不起作用了。它必须显式地实现MyJobExecutor。在startup类的postconstruct中,我调用每个MyJobExecutor bean在其TimerService中注册一个新的计时器。这样,我就可以为每个会话bean创建第一个singleActionTimer(s)。

public interface MyJobExecutor {
    // how many timers to register for this bean meaning how many parallel processes.
    public int nrOfParallellInstances();
}

import javax.annotation.PostConstruct;
import javax.ejb.EJB;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

@Singleton
@Startup
public class MyJobStartup {

    @Inject
    @Any
    private Instance<MyJobExecutor> myJobs;
    
    @PostConstruct
    public void startIt() {
        for (MyJobExecutor ajob : myJobs) {
            for(int i=0; i<ajob.nrOfParallellInstances(); i++)
                // register every instance with a 1 second delay between each timeout
                ajob.registerNewTimer(1000L*(i+1));
        }
    }
}

内部循环将根据ajob.nrofParallellInstances()值为同一个bean注册多个计时器。但是,当先前的超时仍在运行时,TimerService不会触发超时。这将阻止预期的并行处理:(

import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.naming.InitialContext;

@Resource(name = "concurrent/AsyncJobThreadFactory")
private ManagedThreadFactory threadFactory;

@Timeout
private void onTimeout(Timer timer) {
    LOG.info("Timeout triggered for " + timer.getInfo());
    Thread thread = threadFactory.newThread(new AsyncExecRunnable((String) timer.getInfo()));
    if (thread != null) {
        thread.start();
    } else {
        LOG.warn("No thread available for this job. Retry in 5 minutes.");
        registerNewTimer(1000L * 60 * 5);
    }

}
private static class AsyncExecRunnable implements Runnable {

    private String extra info;
    
    public AsyncExecRunnable(String info) {
        this.info = info;
    }

    @Override
    public void run() {
        try {
            LOG.info("Job executor thread started for " + info);
            InitialContext ic=new InitialContext();
            // business logic. With ic you can lookup other EJBs
        } catch (Exception e) {
            LOG.error("Problem executing AsyncJobExecutor:", e);
        }
    }
}

在此设置下,我有:

  • 在部署时为实现MyJobExecutor接口的所有会话bean自动注册第一个计时器。
  • 注册多个具有相同时间的计时器的可能性,并且它们将并行运行。
  • 通过使用ManagedThreadFactory,线程还可以访问JNDI上下文并查找EJB。
 类似资料:
  • 一方面,我有一个CronScheduler类,用于每个应用程序启动一次,配置一个TimerService。 另一方面,我有一个繁重的任务(注释为),我想在计时器的中调用该任务。请注意,在计时器中,我创建了一个调用的线程 代码: 结果是被多次调用。注释工作正常吗?

  • 问题内容: 我需要构建一个具有多个窗口的应用程序。在其中一个窗口中,我需要能够玩一个简单的游戏,而另一个窗口必须显示问题并获得影响游戏的用户的响应。 (1)我想使用pygame来制作游戏。有一种 简单的 方法可以让pygame在多个窗口中运行吗? (2)如果没有简单的方法可以解决(1),是否有一种简单的方法来使用其他一些Python GUI结构,从而允许我同时运行pygame和另一个窗口? 问题答

  • 我现在一筹莫展。 请求代码收集在一个ArrayList中,这样当程序退出时,另一个函数可以清除所有警报。 现在的问题是:我的警报器打不响。我能够找到这个函数的错误。AlarmManager实例很好。我在底部设置了一个测试警报(在带有星号的行之后)。着火了。为什么???

  • 我在RestController中对一个实体使用QuerydslPredicate,它有一个date对象,我希望能够查询给定日期之前/之后/之间的日期,希望有类似的东西 null

  • 问题内容: 我正在尝试建立一个查询,该查询将找到所有用户文档(docType =用户),然后根据许多过滤器对其进行过滤。例如位置,性别,年龄等。过滤器是根据我正在构建的搜索功能上的用户输入来添加/删除的。 以下没有结果: 以下返回结果: 后者虽然返回结果,但从长远来看是行不通的,因为我可能想为年龄,性别等添加一个额外的过滤器,而且我似乎无法添加多个字段。如果我删除位置过滤器,则第一个查询有效。 问

  • 问题内容: 我的桌子上有40多个列,我还需要添加一些其他字段,例如当前的城市,家乡,学校,工作,大学,拼贴画。 对于作为共同朋友(与其他用户朋友一起加入朋友表以查看共同朋友)并且未被阻止并且还尚未与该用户成为朋友的许多匹配用户,将提取这些用户数据。 上面的请求有点复杂,所以我认为将额外的数据放在同一用户表中以进行快速访问是个好主意,而不是向该表中添加更多的联接,这将使查询速度变慢。但是我想得到你的