Better support for HornetQ embedded broker

Prior to this commit it was not safe to start several contexts
using the HornetQAutoConfiguration in the same VM. Each context
was trying to start their own HornetQ embedded broker by default but
only the first was really starting. Worse, the various InVM connection
factories were all silently connecting to the first broker.

This commit introduces a new "serverId" property that is an auto-
incremented integer by default. This identifies the server to connect
to and allows each context to start its own embedded broker in total
isolation of other contexts.

This commits makes it possible for a context to disable its own
embedded broker and connect to an existing one, potentially started
by another context.

Fixes gh-1063
pull/1116/head
Stephane Nicoll 11 years ago
parent f6ae300ec3
commit 2bff12a7b5

@ -105,7 +105,7 @@ public class HornetQAutoConfiguration {
private ConnectionFactory createEmbeddedConnectionFactory() {
try {
TransportConfiguration transportConfiguration = new TransportConfiguration(
InVMConnectorFactory.class.getName());
InVMConnectorFactory.class.getName(), properties.getEmbedded().generateTransportParameters());
ServerLocator serviceLocator = HornetQClient
.createServerLocatorWithoutHA(transportConfiguration);
return new HornetQConnectionFactory(serviceLocator);

@ -62,7 +62,7 @@ class HornetQEmbeddedConfigurationFactory {
}
TransportConfiguration transportConfiguration = new TransportConfiguration(
InVMAcceptorFactory.class.getName());
InVMAcceptorFactory.class.getName(), properties.generateTransportParameters());
configuration.getAcceptorConfigurations().add(transportConfiguration);
// HORNETQ-1143

@ -16,7 +16,12 @@
package org.springframework.boot.autoconfigure.jms.hornetq;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -70,6 +75,10 @@ public class HornetQProperties {
*/
public static class Embedded {
private static final AtomicInteger serverIdCounter = new AtomicInteger();
private int serverId = serverIdCounter.getAndIncrement();
private boolean enabled = true;
private boolean persistent;
@ -84,6 +93,14 @@ public class HornetQProperties {
private boolean defaultClusterPassword = true;
public int getServerId() {
return serverId;
}
public void setServerId(int serverId) {
this.serverId = serverId;
}
public boolean isEnabled() {
return this.enabled;
}
@ -137,6 +154,18 @@ public class HornetQProperties {
return this.defaultClusterPassword;
}
/**
* Creates the minimal transport parameters for an embedded transport configuration.
* <p>Specifies the identifier of the server.
*
* @see TransportConstants#SERVER_ID_PROP_NAME
*/
public Map<String,Object> generateTransportParameters() {
Map<String, Object> parameters = new HashMap<String, Object>();
parameters.put(TransportConstants.SERVER_ID_PROP_NAME, getServerId());
return parameters;
}
}
}

@ -16,6 +16,8 @@
package org.springframework.boot.autoconfigure.jms.hornetq;
import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
@ -54,10 +56,6 @@ import org.springframework.jms.core.SessionCallback;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
/**
* Tests for {@link HornetQAutoConfiguration}.
*
@ -238,6 +236,56 @@ public class HornetQAutoConfigurationTests {
((TextMessage) message).getText());
}
@Test
public void severalEmbeddedBrokers() {
load(EmptyConfiguration.class,
"spring.hornetq.embedded.queues=Queue1");
AnnotationConfigApplicationContext anotherContext = doLoad(EmptyConfiguration.class,
"spring.hornetq.embedded.queues=Queue2");
try {
HornetQProperties properties = this.context.getBean(HornetQProperties.class);
HornetQProperties anotherProperties = anotherContext.getBean(HornetQProperties.class);
assertTrue("ServerId should not match",
properties.getEmbedded().getServerId() < anotherProperties.getEmbedded().getServerId());
DestinationChecker checker = new DestinationChecker(this.context);
checker.checkQueue("Queue1", true);
checker.checkQueue("Queue2", false);
DestinationChecker anotherChecker = new DestinationChecker(anotherContext);
anotherChecker.checkQueue("Queue2", true);
anotherChecker.checkQueue("Queue1", false);
}
finally {
anotherContext.close();
}
}
@Test
public void connectToASpecificEmbeddedBroker() {
load(EmptyConfiguration.class,
"spring.hornetq.embedded.serverId=93",
"spring.hornetq.embedded.queues=Queue1");
AnnotationConfigApplicationContext anotherContext = doLoad(EmptyConfiguration.class,
"spring.hornetq.mode=embedded",
"spring.hornetq.embedded.serverId=93", // Connect to the "main" broker
"spring.hornetq.embedded.enabled=false"); // do not start a specific one
try {
DestinationChecker checker = new DestinationChecker(this.context);
checker.checkQueue("Queue1", true);
DestinationChecker anotherChecker = new DestinationChecker(anotherContext);
anotherChecker.checkQueue("Queue1", true);
}
finally {
anotherContext.close();
}
}
private TransportConfiguration assertInVmConnectionFactory(
HornetQConnectionFactory connectionFactory) {
TransportConfiguration transportConfig = getSingleTransportConfiguration(connectionFactory);
@ -265,11 +313,16 @@ public class HornetQAutoConfigurationTests {
}
private void load(Class<?> config, String... environment) {
this.context = new AnnotationConfigApplicationContext();
this.context.register(config);
this.context.register(HornetQAutoConfiguration.class, JmsAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(this.context, environment);
this.context.refresh();
this.context = doLoad(config, environment);
}
private AnnotationConfigApplicationContext doLoad(Class<?> config, String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(config);
applicationContext.register(HornetQAutoConfiguration.class, JmsAutoConfiguration.class);
EnvironmentTestUtils.addEnvironment(applicationContext, environment);
applicationContext.refresh();
return applicationContext;
}
private static class DestinationChecker {

@ -258,6 +258,7 @@ content into your application; rather pick only the properties that you need.
spring.hornetq.host=localhost # hornetQ host (native mode)
spring.hornetq.port=5445 # hornetQ port (native mode)
spring.hornetq.embedded.enabled=true # if the embedded server is enabled (needs hornetq-jms-server.jar)
spring.hornetq.embedded.serverId= # auto-generated id of the embedded server (integer)
spring.hornetq.embedded.persistent=false # message persistence
spring.hornetq.embedded.data-directory= # location of data content (when persistence is enabled)
spring.hornetq.embedded.queues= # comma separate queues to create on startup

Loading…
Cancel
Save