Polish "Add Prometheus push gateway support"
Rework Prometheus push gateway support so that the central class can be used outside of auto-configuration. The shutdown flags have also been replaced with a single "shutdown-operation" property since it's unlikely that both "push" and "delete" will be required. It's also possible now to supply a `TaskScheduler` to the manager. See gh-14353pull/14707/head
parent
4e71981f77
commit
20ecf73cd1
@ -0,0 +1,201 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.actuate.metrics.export.prometheus;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.PushGateway;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Class that can be used to managed the pushing of metrics to a {@link PushGateway
|
||||
* Prometheus PushGateway}. Handles the scheduling of push operations, error handling and
|
||||
* shutdown operations.
|
||||
*
|
||||
* @author David J. M. Karlsen
|
||||
* @author Phillip Webb
|
||||
* @since 2.1.0
|
||||
*/
|
||||
public class PrometheusPushGatewayManager {
|
||||
|
||||
private static final Logger logger = LoggerFactory
|
||||
.getLogger(PrometheusPushGatewayManager.class);
|
||||
|
||||
private final PushGateway pushGateway;
|
||||
|
||||
private final CollectorRegistry registry;
|
||||
|
||||
private final String job;
|
||||
|
||||
private final Map<String, String> groupingKey;
|
||||
|
||||
private final ShutdownOperation shutdownOperation;
|
||||
|
||||
private final TaskScheduler scheduler;
|
||||
|
||||
private ScheduledFuture<?> scheduled;
|
||||
|
||||
/**
|
||||
* Create a new {@link PrometheusPushGatewayManager} instance using a single threaded
|
||||
* {@link TaskScheduler}.
|
||||
* @param pushGateway the source push gateway
|
||||
* @param registry the collector registry to push
|
||||
* @param pushRate the rate at which push operations occur
|
||||
* @param job the job ID for the operation
|
||||
* @param groupingKeys an optional set of grouping keys for the operation
|
||||
* @param shutdownOperation the shutdown operation that should be performed when
|
||||
* context is closed.
|
||||
*/
|
||||
public PrometheusPushGatewayManager(PushGateway pushGateway,
|
||||
CollectorRegistry registry, Duration pushRate, String job,
|
||||
Map<String, String> groupingKeys, ShutdownOperation shutdownOperation) {
|
||||
this(pushGateway, registry, new PushGatewayTaskScheduler(), pushRate, job,
|
||||
groupingKeys, shutdownOperation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new {@link PrometheusPushGatewayManager} instance.
|
||||
* @param pushGateway the source push gateway
|
||||
* @param registry the collector registry to push
|
||||
* @param scheduler the scheduler used for operations
|
||||
* @param pushRate the rate at which push operations occur
|
||||
* @param job the job ID for the operation
|
||||
* @param groupingKey an optional set of grouping keys for the operation
|
||||
* @param shutdownOperation the shutdown operation that should be performed when
|
||||
* context is closed.
|
||||
*/
|
||||
public PrometheusPushGatewayManager(PushGateway pushGateway,
|
||||
CollectorRegistry registry, TaskScheduler scheduler, Duration pushRate,
|
||||
String job, Map<String, String> groupingKey,
|
||||
ShutdownOperation shutdownOperation) {
|
||||
Assert.notNull(pushGateway, "PushGateway must not be null");
|
||||
Assert.notNull(registry, "Registry must not be null");
|
||||
Assert.notNull(scheduler, "Scheduler must not be null");
|
||||
Assert.notNull(pushRate, "PushRate must not be null");
|
||||
Assert.hasLength(job, "Job must not be empty");
|
||||
this.pushGateway = pushGateway;
|
||||
this.registry = registry;
|
||||
this.job = job;
|
||||
this.groupingKey = groupingKey;
|
||||
this.shutdownOperation = (shutdownOperation != null) ? shutdownOperation
|
||||
: ShutdownOperation.NONE;
|
||||
this.scheduler = scheduler;
|
||||
this.scheduled = this.scheduler.scheduleAtFixedRate(this::push, pushRate);
|
||||
}
|
||||
|
||||
private void push() {
|
||||
try {
|
||||
this.pushGateway.pushAdd(this.registry, this.job, this.groupingKey);
|
||||
}
|
||||
catch (UnknownHostException ex) {
|
||||
String host = ex.getMessage();
|
||||
String message = "Unable to locate prometheus push gateway host";
|
||||
message += StringUtils.hasLength(host) ? " '" + host + "'" : "";
|
||||
message += ". No longer attempting metrics publication to this host";
|
||||
logger.error(message, ex);
|
||||
shutdown(ShutdownOperation.NONE);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Unable to push metrics to Prometheus Pushgateway", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void delete() {
|
||||
try {
|
||||
this.pushGateway.delete(this.job, this.groupingKey);
|
||||
}
|
||||
catch (Throwable ex) {
|
||||
logger.error("Unable to delete metrics from Prometheus Pushgateway", ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the manager, running any {@link ShutdownOperation}.
|
||||
*/
|
||||
public void shutdown() {
|
||||
shutdown(this.shutdownOperation);
|
||||
}
|
||||
|
||||
private void shutdown(ShutdownOperation shutdownOperation) {
|
||||
if (this.scheduler instanceof PushGatewayTaskScheduler) {
|
||||
((PushGatewayTaskScheduler) this.scheduler).shutdown();
|
||||
}
|
||||
this.scheduled.cancel(false);
|
||||
switch (shutdownOperation) {
|
||||
case PUSH:
|
||||
push();
|
||||
break;
|
||||
case DELETE:
|
||||
delete();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The operation that should be performed on shutdown.
|
||||
*/
|
||||
public enum ShutdownOperation {
|
||||
|
||||
/**
|
||||
* Don't perform any shutdown operation.
|
||||
*/
|
||||
NONE,
|
||||
|
||||
/**
|
||||
* Perform a 'push' before shutdown.
|
||||
*/
|
||||
PUSH,
|
||||
|
||||
/**
|
||||
* Perform a 'delete' before shutdown.
|
||||
*/
|
||||
DELETE
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link TaskScheduler} used when the user doesn't specify one.
|
||||
*/
|
||||
static class PushGatewayTaskScheduler extends ThreadPoolTaskScheduler {
|
||||
|
||||
PushGatewayTaskScheduler() {
|
||||
setPoolSize(1);
|
||||
setDaemon(true);
|
||||
setThreadGroupName("prometheus-push-gateway");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledExecutorService getScheduledExecutor()
|
||||
throws IllegalStateException {
|
||||
return Executors.newSingleThreadScheduledExecutor(this::newThread);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,215 @@
|
||||
/*
|
||||
* Copyright 2012-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.boot.actuate.metrics.export.prometheus;
|
||||
|
||||
import java.net.UnknownHostException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
|
||||
import io.prometheus.client.CollectorRegistry;
|
||||
import io.prometheus.client.exporter.PushGateway;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.springframework.boot.actuate.metrics.export.prometheus.PrometheusPushGatewayManager.PushGatewayTaskScheduler;
|
||||
import org.springframework.boot.actuate.metrics.export.prometheus.PrometheusPushGatewayManager.ShutdownOperation;
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.isA;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.willThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
/**
|
||||
* Tests for {@link PrometheusPushGatewayManager}.
|
||||
*
|
||||
* @author Phillip Webb
|
||||
*/
|
||||
public class PrometheusPushGatewayManagerTests {
|
||||
|
||||
@Mock
|
||||
private PushGateway pushGateway;
|
||||
|
||||
@Mock
|
||||
private CollectorRegistry registry;
|
||||
|
||||
private TaskScheduler scheduler;
|
||||
|
||||
private Duration pushRate = Duration.ofSeconds(1);
|
||||
|
||||
private Map<String, String> groupingKey = Collections.singletonMap("foo", "bar");
|
||||
|
||||
@Captor
|
||||
private ArgumentCaptor<Runnable> task;
|
||||
|
||||
@Mock
|
||||
private ScheduledFuture<Object> future;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
this.scheduler = mockScheduler(TaskScheduler.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWhenPushGatewayIsNullThrowsException() {
|
||||
assertThatIllegalArgumentException()
|
||||
.isThrownBy(() -> new PrometheusPushGatewayManager(null, this.registry,
|
||||
this.scheduler, this.pushRate, "job", this.groupingKey, null))
|
||||
.withMessage("PushGateway must not be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWhenCollectorRegistryIsNullThrowsException() {
|
||||
assertThatIllegalArgumentException()
|
||||
.isThrownBy(() -> new PrometheusPushGatewayManager(this.pushGateway, null,
|
||||
this.scheduler, this.pushRate, "job", this.groupingKey, null))
|
||||
.withMessage("Registry must not be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWhenSchedulerIsNullThrowsException() {
|
||||
assertThatIllegalArgumentException().isThrownBy(
|
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry,
|
||||
null, this.pushRate, "job", this.groupingKey, null))
|
||||
.withMessage("Scheduler must not be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWhenPushRateIsNullThrowsException() {
|
||||
assertThatIllegalArgumentException().isThrownBy(
|
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry,
|
||||
this.scheduler, null, "job", this.groupingKey, null))
|
||||
.withMessage("PushRate must not be null");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createWhenJobIsEmptyThrowsException() {
|
||||
assertThatIllegalArgumentException().isThrownBy(
|
||||
() -> new PrometheusPushGatewayManager(this.pushGateway, this.registry,
|
||||
this.scheduler, this.pushRate, "", this.groupingKey, null))
|
||||
.withMessage("Job must not be empty");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createShouldSchedulePushAsFixedRate() throws Exception {
|
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler,
|
||||
this.pushRate, "job", this.groupingKey, null);
|
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(),
|
||||
eq(this.pushRate));
|
||||
this.task.getValue().run();
|
||||
verify(this.pushGateway).pushAdd(this.registry, "job", this.groupingKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWhenOwnsSchedulerDoesShutdownScheduler() {
|
||||
PushGatewayTaskScheduler ownedScheduler = mockScheduler(
|
||||
PushGatewayTaskScheduler.class);
|
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager(
|
||||
this.pushGateway, this.registry, ownedScheduler, this.pushRate, "job",
|
||||
this.groupingKey, null);
|
||||
manager.shutdown();
|
||||
verify(ownedScheduler).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWhenDoesNotOwnSchedulerDoesNotShutdownScheduler() {
|
||||
ThreadPoolTaskScheduler otherScheduler = mockScheduler(
|
||||
ThreadPoolTaskScheduler.class);
|
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager(
|
||||
this.pushGateway, this.registry, otherScheduler, this.pushRate, "job",
|
||||
this.groupingKey, null);
|
||||
manager.shutdown();
|
||||
verify(otherScheduler, never()).shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWhenShutdownOperationIsPushPerformsPushOnShutdown()
|
||||
throws Exception {
|
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager(
|
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job",
|
||||
this.groupingKey, ShutdownOperation.PUSH);
|
||||
manager.shutdown();
|
||||
verify(this.future).cancel(false);
|
||||
verify(this.pushGateway).pushAdd(this.registry, "job", this.groupingKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWhenShutdownOperationIsDeletePerformsDeleteOnShutdown()
|
||||
throws Exception {
|
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager(
|
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job",
|
||||
this.groupingKey, ShutdownOperation.DELETE);
|
||||
manager.shutdown();
|
||||
verify(this.future).cancel(false);
|
||||
verify(this.pushGateway).delete("job", this.groupingKey);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWhenShutdownOperationIsNoneDoesNothing() {
|
||||
PrometheusPushGatewayManager manager = new PrometheusPushGatewayManager(
|
||||
this.pushGateway, this.registry, this.scheduler, this.pushRate, "job",
|
||||
this.groupingKey, ShutdownOperation.NONE);
|
||||
manager.shutdown();
|
||||
verify(this.future).cancel(false);
|
||||
verifyZeroInteractions(this.pushGateway);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pushWhenUnknownHostExceptionIsThrownDoesShutdown() throws Exception {
|
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler,
|
||||
this.pushRate, "job", this.groupingKey, null);
|
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(),
|
||||
eq(this.pushRate));
|
||||
willThrow(new UnknownHostException("foo")).given(this.pushGateway)
|
||||
.pushAdd(this.registry, "job", this.groupingKey);
|
||||
this.task.getValue().run();
|
||||
verify(this.future).cancel(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pushDoesNotThrowException() throws Exception {
|
||||
new PrometheusPushGatewayManager(this.pushGateway, this.registry, this.scheduler,
|
||||
this.pushRate, "job", this.groupingKey, null);
|
||||
verify(this.scheduler).scheduleAtFixedRate(this.task.capture(),
|
||||
eq(this.pushRate));
|
||||
willThrow(RuntimeException.class).given(this.pushGateway).pushAdd(this.registry,
|
||||
"job", this.groupingKey);
|
||||
this.task.getValue().run();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private <T extends TaskScheduler> T mockScheduler(Class<T> type) {
|
||||
T scheduler = mock(type);
|
||||
given(scheduler.scheduleAtFixedRate(isA(Runnable.class), isA(Duration.class)))
|
||||
.willReturn((ScheduledFuture) this.future);
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue