Merge branch '2.0.x'

pull/14975/head
Phillip Webb 6 years ago
commit 1a0a8470e5

@ -56,6 +56,7 @@ import org.springframework.util.StringUtils;
* @author Dave Syer
* @author Eddú Meléndez
* @author Kazuki Shimizu
* @author Mahmoud Ben Hassine
*/
@Configuration
@ConditionalOnClass({ JobLauncher.class, DataSource.class, JdbcOperations.class })
@ -88,9 +89,10 @@ public class BatchAutoConfiguration {
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.batch.job", name = "enabled", havingValue = "true", matchIfMissing = true)
public JobLauncherCommandLineRunner jobLauncherCommandLineRunner(
JobLauncher jobLauncher, JobExplorer jobExplorer) {
JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository) {
JobLauncherCommandLineRunner runner = new JobLauncherCommandLineRunner(
jobLauncher, jobExplorer);
jobLauncher, jobExplorer, jobRepository);
String jobNames = this.properties.getJob().getNames();
if (StringUtils.hasText(jobNames)) {
runner.setJobNames(jobNames);

@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,14 +19,19 @@ package org.springframework.boot.autoconfigure.batch;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
@ -39,12 +44,14 @@ import org.springframework.batch.core.launch.JobParametersNotFoundException;
import org.springframework.batch.core.launch.NoSuchJobException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.Ordered;
import org.springframework.util.Assert;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
@ -55,6 +62,7 @@ import org.springframework.util.StringUtils;
*
* @author Dave Syer
* @author Jean-Pierre Bergamin
* @author Mahmoud Ben Hassine
*/
public class JobLauncherCommandLineRunner
implements CommandLineRunner, Ordered, ApplicationEventPublisherAware {
@ -69,11 +77,13 @@ public class JobLauncherCommandLineRunner
private JobParametersConverter converter = new DefaultJobParametersConverter();
private JobLauncher jobLauncher;
private final JobLauncher jobLauncher;
private JobRegistry jobRegistry;
private final JobExplorer jobExplorer;
private final JobRepository jobRepository;
private JobExplorer jobExplorer;
private JobRegistry jobRegistry;
private String jobNames;
@ -83,10 +93,38 @@ public class JobLauncherCommandLineRunner
private ApplicationEventPublisher publisher;
/**
* Create a new {@link JobLauncherCommandLineRunner}.
* @param jobLauncher to launch jobs
* @param jobExplorer to check the job repository for previous executions
* @deprecated since 2.0.7 in favor of
* {@link #JobLauncherCommandLineRunner(JobLauncher, JobExplorer, JobRepository)}. A
* job repository is required to check if a job instance exists with the given
* parameters when running a job (which is not possible with the job explorer).
*/
@Deprecated
public JobLauncherCommandLineRunner(JobLauncher jobLauncher,
JobExplorer jobExplorer) {
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = null;
}
/**
* Create a new {@link JobLauncherCommandLineRunner}.
* @param jobLauncher to launch jobs
* @param jobExplorer to check the job repository for previous executions
* @param jobRepository to check if a job instance exists with the given parameters
* when running a job
*/
public JobLauncherCommandLineRunner(JobLauncher jobLauncher, JobExplorer jobExplorer,
JobRepository jobRepository) {
Assert.notNull(jobLauncher, "JobLauncher must not be null");
Assert.notNull(jobExplorer, "JobExplorer must not be null");
Assert.notNull(jobRepository, "JobRepository must not be null");
this.jobLauncher = jobLauncher;
this.jobExplorer = jobExplorer;
this.jobRepository = jobRepository;
}
public void setOrder(int order) {
@ -135,6 +173,20 @@ public class JobLauncherCommandLineRunner
executeRegisteredJobs(jobParameters);
}
private void executeLocalJobs(JobParameters jobParameters)
throws JobExecutionException {
for (Job job : this.jobs) {
if (StringUtils.hasText(this.jobNames)) {
String[] jobsToRun = this.jobNames.split(",");
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
logger.debug("Skipped job: " + job.getName());
continue;
}
}
execute(job, jobParameters);
}
}
private void executeRegisteredJobs(JobParameters jobParameters)
throws JobExecutionException {
if (this.jobRegistry != null && StringUtils.hasText(this.jobNames)) {
@ -158,26 +210,59 @@ public class JobLauncherCommandLineRunner
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException,
JobParametersNotFoundException {
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
JobExecution execution = this.jobLauncher.run(job, nextParameters);
JobParameters parameters = getNextJobParameters(job, jobParameters);
JobExecution execution = this.jobLauncher.run(job, parameters);
if (this.publisher != null) {
this.publisher.publishEvent(new JobExecutionEvent(execution));
}
}
private void executeLocalJobs(JobParameters jobParameters)
throws JobExecutionException {
for (Job job : this.jobs) {
if (StringUtils.hasText(this.jobNames)) {
String[] jobsToRun = this.jobNames.split(",");
if (!PatternMatchUtils.simpleMatch(jobsToRun, job.getName())) {
logger.debug("Skipped job: " + job.getName());
continue;
}
}
execute(job, jobParameters);
private JobParameters getNextJobParameters(Job job, JobParameters jobParameters) {
if (this.jobRepository != null
&& this.jobRepository.isJobInstanceExists(job.getName(), jobParameters)) {
return getNextJobParametersForExisting(job, jobParameters);
}
if (job.getJobParametersIncrementer() == null) {
return jobParameters;
}
JobParameters nextParameters = new JobParametersBuilder(jobParameters,
this.jobExplorer).getNextJobParameters(job).toJobParameters();
return merge(nextParameters, jobParameters);
}
private JobParameters getNextJobParametersForExisting(Job job,
JobParameters jobParameters) {
JobExecution lastExecution = this.jobRepository.getLastJobExecution(job.getName(),
jobParameters);
if (isStoppedOrFailed(lastExecution) && job.isRestartable()) {
JobParameters previousIdentifyingParameters = getGetIdentifying(
lastExecution.getJobParameters());
return merge(previousIdentifyingParameters, jobParameters);
}
return jobParameters;
}
private boolean isStoppedOrFailed(JobExecution execution) {
BatchStatus status = (execution != null) ? execution.getStatus() : null;
return (status == BatchStatus.STOPPED || status == BatchStatus.FAILED);
}
private JobParameters getGetIdentifying(JobParameters parameters) {
HashMap<String, JobParameter> nonIdentifying = new LinkedHashMap<>(
parameters.getParameters().size());
parameters.getParameters().forEach((key, value) -> {
if (value.isIdentifying()) {
nonIdentifying.put(key, value);
}
});
return new JobParameters(nonIdentifying);
}
private JobParameters merge(JobParameters parameters, JobParameters additionals) {
Map<String, JobParameter> merged = new LinkedHashMap<>();
merged.putAll(parameters.getParameters());
merged.putAll(additionals.getParameters());
return new JobParameters(merged);
}
}

@ -1,5 +1,5 @@
/*
* Copyright 2012-2017 the original author or authors.
* Copyright 2012-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -21,6 +21,8 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
@ -34,6 +36,7 @@ import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
@ -43,12 +46,15 @@ import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
/**
* Tests for {@link JobLauncherCommandLineRunner}.
*
* @author Dave Syer
* @author Jean-Pierre Bergamin
* @author Mahmoud Ben Hassine
*/
public class JobLauncherCommandLineRunnerTests {
@ -80,7 +86,8 @@ public class JobLauncherCommandLineRunnerTests {
this.step = this.steps.get("step").tasklet(tasklet).build();
this.job = this.jobs.get("job").start(this.step).build();
this.jobExplorer = this.context.getBean(JobExplorer.class);
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer);
this.runner = new JobLauncherCommandLineRunner(jobLauncher, this.jobExplorer,
jobRepository);
this.context.getBean(BatchConfiguration.class).clear();
}
@ -113,8 +120,25 @@ public class JobLauncherCommandLineRunnerTests {
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
.incrementer(new RunIdIncrementer()).build();
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job, new JobParameters());
this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
}
@Test
public void runDifferentInstances() throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(throwingTasklet()).build()).build();
// start a job instance
JobParameters jobParameters = new JobParametersBuilder().addString("name", "foo")
.toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// start a different job instance
JobParameters otherJobParameters = new JobParametersBuilder()
.addString("name", "bar").toJobParameters();
this.runner.execute(this.job, otherJobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
}
@Test
@ -127,6 +151,12 @@ public class JobLauncherCommandLineRunnerTests {
// A failed job that is not restartable does not re-use the job params of
// the last execution, but creates a new job instance when running it again.
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(2);
assertThatExceptionOfType(JobRestartException.class).isThrownBy(() -> {
// try to re-run a failed execution
this.runner.execute(this.job,
new JobParametersBuilder().addLong("run.id", 1L).toJobParameters());
fail("expected JobRestartException");
}).withMessageContaining("JobInstance already exists and is not restartable");
}
@Test
@ -137,8 +167,43 @@ public class JobLauncherCommandLineRunnerTests {
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
.addLong("foo", 2L, false).toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// try to re-run a failed execution with non identifying parameters
this.runner.execute(this.job, new JobParametersBuilder(jobParameters)
.addLong("run.id", 1L).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
}
@Test
public void retryFailedExecutionWithDifferentNonIdentifyingParametersFromPreviousExecution()
throws Exception {
this.job = this.jobs.get("job")
.start(this.steps.get("step").tasklet(throwingTasklet()).build())
.incrementer(new RunIdIncrementer()).build();
JobParameters jobParameters = new JobParametersBuilder().addLong("id", 1L, false)
.addLong("foo", 2L, false).toJobParameters();
this.runner.execute(this.job, jobParameters);
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
// try to re-run a failed execution with non identifying parameters
this.runner.execute(this.job, new JobParametersBuilder().addLong("run.id", 1L)
.addLong("id", 2L, false).addLong("foo", 3L, false).toJobParameters());
assertThat(this.jobExplorer.getJobInstances("job", 0, 100)).hasSize(1);
JobInstance jobInstance = this.jobExplorer.getJobInstance(0L);
assertThat(this.jobExplorer.getJobExecutions(jobInstance)).hasSize(2);
// first execution
JobExecution firstJobExecution = this.jobExplorer.getJobExecution(0L);
JobParameters parameters = firstJobExecution.getJobParameters();
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
assertThat(parameters.getLong("id")).isEqualTo(1L);
assertThat(parameters.getLong("foo")).isEqualTo(2L);
// second execution
JobExecution secondJobExecution = this.jobExplorer.getJobExecution(1L);
parameters = secondJobExecution.getJobParameters();
// identifying parameters should be the same as previous execution
assertThat(parameters.getLong("run.id")).isEqualTo(1L);
// non-identifying parameters should be the newly specified ones
assertThat(parameters.getLong("id")).isEqualTo(2L);
assertThat(parameters.getLong("foo")).isEqualTo(3L);
}
private Tasklet throwingTasklet() {

Loading…
Cancel
Save