当前位置: 首页 > 知识库问答 >
问题:

Spring cloud data flow(带有Spring batch作业扩展考虑)

凤扬
2023-03-14

4.将我们的20个工作的整体罐子分解成单独的spring bootüber罐子并不是一项简单的任务--任何想法/想法/最佳实践都不容易实现。

最好的,埃拉德

共有1个答案

壤驷骁
2023-03-14

我遇到了与Elad的第3点相同的问题,并最终通过使用这里演示的基本框架解决了这个问题,但使用了DeployerPartitionHandler和DeployerStepExecutionHandler的修改版本。

我首先尝试了创建两级分区的简单方法,其中每个worker执行的步骤本身被划分为子分区。但框架似乎不支持这一点;它对台阶的状态感到困惑。

因此,我回到了一个平面分区集,但将多个步骤执行ID传递给每个工作器。为此,我创建了DeployerMultiPartitionHandler,它启动配置的workers数量,并向每个workers传递步骤执行ID列表。请注意,现在有两个自由度:worker的数量和gridSize,gridSize是尽可能均匀地分配给worker的分区总数。不幸的是,我不得不在这里复制大量DeployerPartitionHandler的代码。

@Slf4j
@Getter
@Setter
public class DeployerMultiPartitionHandler implements PartitionHandler, EnvironmentAware, InitializingBean {

    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_IDS =
            "spring.cloud.task.step-execution-ids";

    public static final String SPRING_CLOUD_TASK_JOB_EXECUTION_ID =
            "spring.cloud.task.job-execution-id";

    public static final String SPRING_CLOUD_TASK_STEP_EXECUTION_ID =
            "spring.cloud.task.step-execution-id";

    public static final String SPRING_CLOUD_TASK_STEP_NAME =
            "spring.cloud.task.step-name";

    public static final String SPRING_CLOUD_TASK_PARENT_EXECUTION_ID =
            "spring.cloud.task.parentExecutionId";

    public static final String SPRING_CLOUD_TASK_NAME = "spring.cloud.task.name";

    private int maxWorkers = -1;

    private int gridSize = 1;

    private int currentWorkers = 0;

    private TaskLauncher taskLauncher;

    private JobExplorer jobExplorer;

    private TaskExecution taskExecution;

    private Resource resource;

    private String stepName;

    private long pollInterval = 10000;

    private long timeout = -1;

    private Environment environment;

    private Map<String, String> deploymentProperties;

    private EnvironmentVariablesProvider environmentVariablesProvider;

    private String applicationName;

    private CommandLineArgsProvider commandLineArgsProvider;

    private boolean defaultArgsAsEnvironmentVars = false;

    public DeployerMultiPartitionHandler(TaskLauncher taskLauncher,
                                    JobExplorer jobExplorer,
                                    Resource resource,
                                    String stepName) {
            Assert.notNull(taskLauncher, "A taskLauncher is required");
            Assert.notNull(jobExplorer, "A jobExplorer is required");
            Assert.notNull(resource, "A resource is required");
            Assert.hasText(stepName, "A step name is required");

            this.taskLauncher = taskLauncher;
            this.jobExplorer = jobExplorer;
            this.resource = resource;
            this.stepName = stepName;
    }

    @Override
    public Collection<StepExecution> handle(StepExecutionSplitter stepSplitter,
                                            StepExecution stepExecution) throws Exception {


        final Set<StepExecution> tempCandidates =
                stepSplitter.split(stepExecution, this.gridSize);

        // Following two lines due to https://jira.spring.io/browse/BATCH-2490
        final List<StepExecution> candidates = new ArrayList<>(tempCandidates.size());
        candidates.addAll(tempCandidates);

        int partitions = candidates.size();

        log.debug(String.format("%s partitions were returned", partitions));

        final Set<StepExecution> executed = new HashSet<>(candidates.size());

        if (CollectionUtils.isEmpty(candidates)) {
            return null;
        }

        launchWorkers(candidates, executed);

        candidates.removeAll(executed);

        return pollReplies(stepExecution, executed, partitions);
    }

    private void launchWorkers(List<StepExecution> candidates, Set<StepExecution> executed) {
        int partitions = candidates.size();
        int numWorkers = this.maxWorkers != -1 ? Math.min(this.maxWorkers, partitions) : partitions;
        IntStream.range(0, numWorkers).boxed()
                .map(i -> candidates.subList(partitionOffset(partitions, numWorkers, i), partitionOffset(partitions, numWorkers, i + 1)))
                .filter(not(List::isEmpty))
                .forEach(stepExecutions -> processStepExecutions(stepExecutions, executed));
    }

    private void processStepExecutions(List<StepExecution> stepExecutions, Set<StepExecution> executed) {
        launchWorker(stepExecutions);
        this.currentWorkers++;
        executed.addAll(stepExecutions);
    }

    private void launchWorker(List<StepExecution> workerStepExecutions) {
        List<String> arguments = new ArrayList<>();

        StepExecution firstWorkerStepExecution = workerStepExecutions.get(0);
        ExecutionContext copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());

        arguments.addAll(
                this.commandLineArgsProvider
                        .getCommandLineArgs(copyContext));

        String jobExecutionId = String.valueOf(firstWorkerStepExecution.getJobExecution().getId());
        String stepExecutionIds = workerStepExecutions.stream().map(workerStepExecution -> String.valueOf(workerStepExecution.getId())).collect(joining(","));
        String taskName = String.format("%s_%s_%s",
                taskExecution.getTaskName(),
                firstWorkerStepExecution.getJobExecution().getJobInstance().getJobName(),
                firstWorkerStepExecution.getStepName());
        String parentExecutionId = String.valueOf(taskExecution.getExecutionId());

        if(!this.defaultArgsAsEnvironmentVars) {
            arguments.add(formatArgument(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                    jobExecutionId));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS,
                    stepExecutionIds));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_STEP_NAME, this.stepName));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_NAME, taskName));
            arguments.add(formatArgument(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                    parentExecutionId));
        }

        copyContext = new ExecutionContext(firstWorkerStepExecution.getExecutionContext());

        log.info("launchWorker context={}", copyContext);

        Map<String, String> environmentVariables = this.environmentVariablesProvider.getEnvironmentVariables(copyContext);

        if(this.defaultArgsAsEnvironmentVars) {
            environmentVariables.put(SPRING_CLOUD_TASK_JOB_EXECUTION_ID,
                    jobExecutionId);
            environmentVariables.put(SPRING_CLOUD_TASK_STEP_EXECUTION_ID,
                    String.valueOf(firstWorkerStepExecution.getId()));
            environmentVariables.put(SPRING_CLOUD_TASK_STEP_NAME, this.stepName);
            environmentVariables.put(SPRING_CLOUD_TASK_NAME, taskName);
            environmentVariables.put(SPRING_CLOUD_TASK_PARENT_EXECUTION_ID,
                    parentExecutionId);
        }

        AppDefinition definition =
                new AppDefinition(resolveApplicationName(),
                        environmentVariables);

        AppDeploymentRequest request =
                new AppDeploymentRequest(definition,
                        this.resource,
                        this.deploymentProperties,
                        arguments);

        taskLauncher.launch(request);
    }

    private String resolveApplicationName() {
        if(StringUtils.hasText(this.applicationName)) {
            return this.applicationName;
        }
        else {
            return this.taskExecution.getTaskName();
        }
    }

    private String formatArgument(String key, String value) {
        return String.format("--%s=%s", key, value);
    }

    private Collection<StepExecution> pollReplies(final StepExecution masterStepExecution,
                                                  final Set<StepExecution> executed,
                                                  final int size) throws Exception {

        final Collection<StepExecution> result = new ArrayList<>(executed.size());

        Callable<Collection<StepExecution>> callback = new Callable<Collection<StepExecution>>() {
            @Override
            public Collection<StepExecution> call() {
                Set<StepExecution> newExecuted = new HashSet<>();

                for (StepExecution curStepExecution : executed) {
                    if (!result.contains(curStepExecution)) {
                        StepExecution partitionStepExecution =
                                jobExplorer.getStepExecution(masterStepExecution.getJobExecutionId(), curStepExecution.getId());

                        if (isComplete(partitionStepExecution.getStatus())) {
                            result.add(partitionStepExecution);
                            currentWorkers--;
                        }
                    }
                }

                executed.addAll(newExecuted);

                if (result.size() == size) {
                    return result;
                }
                else {
                    return null;
                }
            }
        };

        Poller<Collection<StepExecution>> poller = new DirectPoller<>(this.pollInterval);
        Future<Collection<StepExecution>> resultsFuture = poller.poll(callback);

        if (timeout >= 0) {
            return resultsFuture.get(timeout, TimeUnit.MILLISECONDS);
        }
        else {
            return resultsFuture.get();
        }
    }

    private boolean isComplete(BatchStatus status) {
        return status.equals(BatchStatus.COMPLETED) || status.isGreaterThan(BatchStatus.STARTED);
    }

    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    @Override
    public void afterPropertiesSet() {
        Assert.notNull(taskExecution, "A taskExecution is required");

        if(this.environmentVariablesProvider == null) {
            this.environmentVariablesProvider =
                    new CloudEnvironmentVariablesProvider(this.environment);
        }

        if(this.commandLineArgsProvider == null) {
            SimpleCommandLineArgsProvider simpleCommandLineArgsProvider = new SimpleCommandLineArgsProvider();
            simpleCommandLineArgsProvider.onTaskStartup(taskExecution);
            this.commandLineArgsProvider = simpleCommandLineArgsProvider;
        }
    }

}
static int partitionOffset(int length, int numberOfPartitions, int partitionIndex) {
    return partitionIndex * (length / numberOfPartitions) + Math.min(partitionIndex, length % numberOfPartitions);
}
@Slf4j
public class DeployerMultiStepExecutionHandler extends TaskExecutorPartitionHandler implements CommandLineRunner {

    private JobExplorer jobExplorer;

    private JobRepository jobRepository;

    private Log logger = LogFactory.getLog(org.springframework.cloud.task.batch.partition.DeployerStepExecutionHandler.class);

    @Autowired
    private Environment environment;

    private StepLocator stepLocator;

    public DeployerMultiStepExecutionHandler(BeanFactory beanFactory, JobExplorer jobExplorer, JobRepository jobRepository) {
        Assert.notNull(beanFactory, "A beanFactory is required");
        Assert.notNull(jobExplorer, "A jobExplorer is required");
        Assert.notNull(jobRepository, "A jobRepository is required");

        this.stepLocator = new BeanFactoryStepLocator();
        ((BeanFactoryStepLocator) this.stepLocator).setBeanFactory(beanFactory);

        this.jobExplorer = jobExplorer;
        this.jobRepository = jobRepository;
    }

    @Override
    public void run(String... args) throws Exception {

        validateRequest();

        Long jobExecutionId = Long.parseLong(environment.getProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID));
        Stream<Long> stepExecutionIds = Stream.of(environment.getProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS).split(",")).map(Long::parseLong);
        Set<StepExecution> stepExecutions = stepExecutionIds.map(stepExecutionId -> jobExplorer.getStepExecution(jobExecutionId, stepExecutionId)).collect(Collectors.toSet());

        log.info("found stepExecutions:\n{}", stepExecutions.stream().map(stepExecution -> stepExecution.getId() + ":" + stepExecution.getExecutionContext()).collect(joining("\n")));

        if (stepExecutions.isEmpty()) {
            throw new NoSuchStepException(String.format("No StepExecution could be located for step execution id %s within job execution %s", stepExecutionIds, jobExecutionId));
        }

        String stepName = environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME);
        setStep(stepLocator.getStep(stepName));

        doHandle(null, stepExecutions);
    }

    private void validateRequest() {
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_JOB_EXECUTION_ID), "A job execution id is required");
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_EXECUTION_IDS), "A step execution id is required");
        Assert.isTrue(environment.containsProperty(SPRING_CLOUD_TASK_STEP_NAME), "A step name is required");

        Assert.isTrue(this.stepLocator.getStepNames().contains(environment.getProperty(SPRING_CLOUD_TASK_STEP_NAME)), "The step requested cannot be found in the provided BeanFactory");
    }
}
 类似资料:
  • 本节将会详细介绍VS Code可扩展性的各种功能,并深入到其内部细节。在开始之前,建议回顾之前关于扩展部分以及范例'Hello World'。 查看VS Code扩展运行的最简单方法是通过扩展市场。当你编写好了你的第一个插件后或者准备分享你的定制化信息后,那么你可以把它发布出去,供他人下载并安装。 可扩展性参考文档 在本节中将会讲述以下几个主题: 主题 描述 package.json 扩展清单 每

  • 下面是场景。 我有我的网络应用程序 所以这里的IDP Prxy是:http://idpproxydev.devs1.int:8080/openam 国内流离失所者网址是:http://idpdev.devs1.int:80/openam 我的网络应用程序是:http://ocr-jq0zt91.devs1.int:9081/LOS 我开始使用 http://static.springsource.

  • MissingMethodException:方法:javaposse.jobdsl.dsl.job.extendeDemail()的签名不适用于参数类型:(java.lang.string,java.lang.string,java.lang.string,java.lang.string)值:[me@halfempty.org,Oops,somether broked]在org.codehau

  • 用例:步骤1:ItemReader:从数据库中读取1000个ItemProcessor块中的数据:处理这些数据。ItemWriter:将数据写入地图,以便下一步使用 步骤2:ItemReader:读取地图ItemProcessor:处理地图数据并获取新对象。ItemWriter:将新的进程对象持久化到数据库中。 现在我希望Map在整个作业中保持不变,目前我已经为Map创建了一个不同的POJO类,并

  • 设置: 我安装了Postresql(11.10版)和TimescaleDB(1.7.1版)扩展。我有2个表,我想用安装在Kafka Connect上的Debezium(ver1.3.1)连接器监视它们,目的是CDC(捕获数据更改)。 表是table1和table2hyper,但table2hyper是hypertable。 在Kafka Connect中创建Debezium连接器后,我可以看到创建

  • 我读了很多关于如何使用主/从范式实现单个作业的并行处理和分块的内容。考虑一个已经实现的Spring批处理解决方案,该解决方案打算在独立服务器上运行。通过最少的重构,我希望使其能够水平扩展,并在生产操作中更具弹性。速度和效率不是目标。 http://www.mkyong.com/spring-batch/spring-batch-hello-world-example/ 在以下示例中,使用连接到的作