Fix Spring Batch job restart parameters handling

Fix the `JobLauncherCommandLineRunner` to correctly deal with job
parameters when restarting a job.

Prior to this commit, we were was calling the `getNextJobParameters`
method of the `JobParametersBuilder` from batch. This method was getting
the previous parameters of the wrong job instance in a restart scenario.

This commit fixes the issue by first getting the right job instance with
the provided parameters, then restarting it.

Closes gh-14933
pull/16246/head
Mahmoud Ben Hassine 6 years ago committed by Phillip Webb
parent d1ce315602
commit ad3c3ad361

@ -56,6 +56,7 @@ import org.springframework.util.StringUtils;
* @author Dave Syer
* @author Eddú Meléndez
* @author Kazuki Shimizu
* @author Mahmoud Ben Hassine
*/
@Configuration
@ConditionalOnClass({ JobLauncher.class, DataSource.class, JdbcOperations.class })
@ -88,9 +89,10 @@ public class BatchAutoConfiguration {
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherCommandLineRunner jobLauncherCommandLineRunner(
JobLauncher jobLauncher, JobExplorer jobExplorer) {
JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository) {
JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner(
jobLauncher, jobExplorer);
jobLauncher, jobExplorer, jobRepository);
String jobNames = this.properties.getJob().getNames();
if (StringUtils.hasText(jobNames)) {
runner.setJobNames(jobNames);

@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,16 +19,21 @@ package org.springframework.boot.autoconfigure.batch;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.converter.DefaultJobParametersConverter;
@ -39,6 +44,7 @@ import org.springframework.batch.core.launch.JobParametersNotFoundException;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
@ -55,6 +61,7 @@ import org.springframework.util.StringUtils;
*
* @author Dave Syer
* @author Jean-Pierre Bergamin
* @author Mahmoud Ben Hassine
*/
public class JobLauncherCommandLineRunner
implements CommandLineRunner, Ordered, ApplicationEventPublisherAware {
@ -75,6 +82,8 @@ public class JobLauncherCommandLineRunner
private JobExplorer jobExplorer;
private JobRepository jobRepository;
private String jobNames;
private Collection<Job> jobs = Collections.emptySet();
@ -83,12 +92,37 @@ public class JobLauncherCommandLineRunner
private ApplicationEventPublisher publisher;
/**
* Create a new {@link JobLauncherCommandLineRunner}.
* @param jobLauncher to launch jobs
* @param jobExplorer to check the job repository for previous executions
* @deprecated This constructor is deprecated in favor of
* {@link JobLauncherCommandLineRunner#JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}.
* A job repository is required to check if a job instance exists with the given
* parameters when running a job (which is not possible with the job explorer). This
* constructor will be removed in a future version.
*/
@Deprecated
public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
JobExplorer jobExplorer) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
}
/**
* Create a new {@link JobLauncherCommandLineRunner}.
* @param jobLauncher to launch jobs
* @param jobExplorer to check the job repository for previous executions
* @param jobRepository to check if a job instance exists with the given parameters
* when running a job
*/
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
}
public void setOrder(int order) {
this.order = order;
}
@ -158,9 +192,39 @@ public class JobLauncherCommandLineRunner
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException,
JobParametersNotFoundException {
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
JobExecution execution = this.jobLauncher.run(job, nextParameters);
String jobName = job.getName();
JobParameters parameters = jobParameters;
boolean jobInstanceExists = this.jobRepository.isJobInstanceExists(jobName,
parameters);
if (jobInstanceExists) {
JobExecution lastJobExecution = this.jobRepository
.getLastJobExecution(jobName, jobParameters);
if (lastJobExecution != null && isStoppedOrFailed(lastJobExecution)
&& job.isRestartable()) {
// Retry a failed or stopped execution with previous parameters
JobParameters previousParameters = lastJobExecution.getJobParameters();
/*
* remove Non-identifying parameters from the previous execution's
* parameters since there is no way to remove them programmatically. If
* they are required (or need to be modified) on a restart, they need to
* be (re)specified.
*/
JobParameters previousIdentifyingParameters = removeNonIdentifying(
previousParameters);
// merge additional parameters with previous ones (overriding those with
// the same key)
parameters = merge(previousIdentifyingParameters, jobParameters);
}
}
else {
JobParametersIncrementer incrementer = job.getJobParametersIncrementer();
if (incrementer != null) {
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
parameters = merge(nextParameters, jobParameters);
}
}
JobExecution execution = this.jobLauncher.run(job, parameters);
if (this.publisher != null) {
this.publisher.publishEvent(new JobExecutionEvent(execution));
}
@ -180,4 +244,27 @@ public class JobLauncherCommandLineRunner
}
}
private JobParameters removeNonIdentifying(JobParameters parameters) {
Map<String, JobParameter> parameterMap = parameters.getParameters();
HashMap<String, JobParameter> copy = new HashMap<>(parameterMap);
for (Map.Entry<String, JobParameter> parameter : copy.entrySet()) {
if (!parameter.getValue().isIdentifying()) {
parameterMap.remove(parameter.getKey());
}
}
return new JobParameters(parameterMap);
}
private boolean isStoppedOrFailed(JobExecution execution) {
BatchStatus status = execution.getStatus();
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
}
private JobParameters merge(JobParameters parameters, JobParameters additionals) {
Map<String, JobParameter> merged = new HashMap<>();
merged.putAll(parameters.getParameters());
merged.putAll(additionals.getParameters());
return new JobParameters(merged);
}
}

@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,8 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
@ -34,6 +36,7 @@ import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
@ -43,12 +46,14 @@ import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
/**
* Tests for {@link JobLauncherCommandLineRunner}.
*
* @author Dave Syer
* @author Jean-Pierre Bergamin
* @author Mahmoud Ben Hassine
*/
public class JobLauncherCommandLineRunnerTests {
@ -80,7 +85,8 @@ public class JobLauncherCommandLineRunnerTests {
this.step = this.steps.get("step").tasklet(tasklet).build();
this.job = this.jobs.get("job").start(this.step).build();
this.jobExplorer = this.context.getBean(JobExplorer.class);
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer);
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer,
jobRepository);
this.context.getBean(BatchConfiguration.class).clear();
}
@ -113,10 +119,27 @@ public class JobLauncherCommandLineRunnerTests {
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
.incrementer(new RunIdIncrementer()).build();
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
}
@Test
public void runDifferentInstances() throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(throwingTasklet()).build()).build();
// start a job instance
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo")
.toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// start a different job instance
JobParameters otherJobParameters = new JobParametersBuilder()
.addString("name", "bar").toJobParameters();
this.runner.execute(this.job, otherJobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
}
@Test
public void retryFailedExecutionOnNonRestartableJob() throws Exception {
this.job = this.jobs.get("job").preventRestart()
@ -127,6 +150,17 @@ public class JobLauncherCommandLineRunnerTests {
// A failed job that is not restartable does not re-use the job params of
// the last execution, but creates a new job instance when running it again.
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
try {
// try to re-run a failed execution
this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
fail("expected JobRestartException");
}
catch (JobRestartException ex) {
assertThat(ex.getMessage())
.isEqualTo("JobInstance already exists and is not restartable");
// expected
}
}
@Test
@ -137,8 +171,43 @@ public class JobLauncherCommandLineRunnerTests {
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
.addLong("foo", 2L, false).toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// try to re-run a failed execution with non identifying parameters
this.runner.execute(this.job, new JobParametersBuilder(jobParameters)
.addLong("run.id", 1L).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
}
@Test
public void retryFailedExecutionWithDifferentNonIdentifyingParametersFromPreviousExecution()
throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
.incrementer(new RunIdIncrementer()).build();
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
.addLong("foo", 2L, false).toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// try to re-run a failed execution with non identifying parameters
this.runner.execute(this.job, new JobParametersBuilder().addLong("run.id", 1L)
.addLong("id", 2L, false).addLong("foo", 3L, false).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
JobInstance jobInstance = this.jobExplorer.getJobInstance(0L);
assertThat(this.jobExplorer.getJobExecutions(jobInstance)).hasSize(2);
// first execution
JobExecution firstJobExecution = this.jobExplorer.getJobExecution(0L);
JobParameters parameters = firstJobExecution.getJobParameters();
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
assertThat(parameters.getLong("id")).isEqualTo(1L);
assertThat(parameters.getLong("foo")).isEqualTo(2L);
// second execution
JobExecution secondJobExecution = this.jobExplorer.getJobExecution(1L);
parameters = secondJobExecution.getJobParameters();
// identifying parameters should be the same as previous execution
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
// non-identifying parameters should be the newly specified ones
assertThat(parameters.getLong("id")).isEqualTo(2L);
assertThat(parameters.getLong("foo")).isEqualTo(3L);
}
private Tasklet throwingTasklet() {

Loading…
Cancel
Save