当前位置: 首页 > 知识库问答 >
问题:

在集群环境中运行的Spring计划任务

封弘伟
2023-03-14

我正在编写一个应用程序,它有一个cron作业,每60秒执行一次。应用程序配置为在需要时扩展到多个实例。我只想每60秒在一个实例上执行一次任务(在任何节点上)。开箱即用,我找不到解决这个问题的方法,我很惊讶以前没有人问过多次。我使用的是Spring 4.1.6。

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>

共有3个答案

龙隐水
2023-03-14

是在集群中安全执行作业的另一种简单而健壮的方法。只有当节点是集群中的“领导者”时,才能基于数据库执行任务。

此外,当集群中的一个节点失败或关闭时,另一个节点成为领导者。

你所要做的就是创建一个“领导人选举”机制,每次都要检查你的上司是否是领导人:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

遵循以下步骤:

1.定义集群中每个节点都包含一个条目的对象和表:

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2.创建服务,a)在数据库中插入节点,b)检查leader

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3.ping数据库以发送您的帐户处于活动状态

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4.你准备好了!在执行任务之前,只需检查您是否是领导者:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}
周宏胜
2023-03-14

我认为必须将Quartz集群与JDBC JobStore结合使用才能实现这一目的

金昌胤
2023-03-14

有一个ShedLock项目正是为了这个目的。您只需对执行时应锁定的任务进行注释

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

配置Spring和LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}
 类似资料:
  • 问题内容: 我正在编写一个具有cron作业的应用程序,该作业每60秒执行一次。该应用程序被配置为在需要时扩展到多个实例。我只想每60秒(在任何节点上)在1个实例上执行任务。开箱即用,我找不到解决方案,但令我惊讶的是,之前没有多次被问到。我正在使用Spring 4.1.6。 问题答案: 批处理和计划的作业通常在自己的独立服务器上运行,而不是面向客户的应用程序,因此将作业包含在预期在群集中运行的应用程

  • 我使用Spring调度程序,使用@调度注释来调度运行文件生成服务的作业。应用程序部署在集群环境中Tomcat的5个单独节点上,用于负载平衡和故障转移。正因为如此,服务被调度了5次,这是不可能的。有没有办法将调度程序配置为仅在当前节点上运行? 有一种方法使用数据库找出当前活动节点,并在这里调用该特定实例的调度器 另一种方法是使用石英调度器 由于我无法对部署的应用程序进行重大更改,是否有简单的解决方案

  • 我有一套使用spring框架用Java写的服务。一些服务有预定的任务(使用spring的< code>@Scheduled注释)来做一些内务处理(生成处理过的数据,数据清理等。).由于这些计划任务,我不能运行服务的多个实例,因为所有实例都选择相同的任务,并且多次执行,导致重复/损坏的数据。为了解决这个问题,我想在任务执行时进行检查,并且只允许在一个实例上执行。我该怎么做?这个问题有更好的解决方法吗

  • 我在使用spring Boot开发的Web应用程序中有一个预定的任务。我在tomcat集群上运行它,所以在X小时,计划的任务从每个节点开始。 我读到了:https://github.com/lukas-krecan/shedlock,所以我跟着指南走了,但它不起作用..以下是我所做的: 我在POM中包含了这些依赖项: 然后我把这个添加到我的方法中: 如何避免使用spring Boot同时执行多次任

  • 我在context.xml文件中定义了一个Spring调度任务,它每分钟运行一次。该任务调用postgres存储过程。存储过程运行时可以持续一分钟以上。如果当前运行没有完成,spring框架会调用相同的调度程序吗?谢谢,

  • 问题内容: 是否可以在确切指定的时间仅安排一次Spring服务方法?例如,当前时间是下午2点,但是当我按下操作按钮时,我希望我的服务方法从晚上8点开始。我熟悉@Scheduled批注,但不确定如何编写cron表达式以使其不定期运行。这一次,每天晚上8点触发。 有什么建议? 问题答案: 您可以使用Spring的TaskScheduler的实现之一。我在下面提供了一个示例,该示例不需要太多配置(包装了