Revision: 63533
Updated Code
at May 15, 2013 05:03 by laurenceosx
Updated Code
#!/usr/bin/env groovy
@Grapes([
@Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
])
// file: EmbeddedBroker.groovy
/*
ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
This example overcomes some limitations of the basic ActiveMQ embedded
brokers examples I found online
Some of the challenges were:
# Multiple instances on same machine and be able to use JMX.
# Running on a machine with less than 50G or 100G disk space
caused combinations of ActiveMQ errors or warnings.
# Groovy Grapes/Grab syntax to use that would work on pc and mac.
The broker in this example uses a nonpersistent store and
is multicast discoverable and should allow you to run multiple instances
of it (in separate processes of course) which is the reason for all the
code snips containing random port nums and random thread sleeps
to increase the odds of success of each new embedded broker process
to get a working set of port nums.
*/
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
import org.apache.activemq.transport.discovery.DiscoveryListener;
public final class EmbeddedBroker {
static random = new java.util.Random();
static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
static Integer javaPid = calcPid();
static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;
private EmbeddedBroker() {} // not used - from Java example
static def sockets // to pre-allocate ports for ActiveMQ
static def ports // randomly generated list of free ports
static def calcMqPort = { ports.last() }
public static void main(String[] args) throws Exception {
//
/*
SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^
-Dcom.sun.management.jmxremote.port=51001 ^
-Dcom.sun.management.jmxremote.local.only=false ^
-Dcom.sun.management.jmxremote.authenticate=false ^
-DmyJmxConnectorPort=41001
*/
def tryCounter = 1000;
sockets = [];
def base = 4000; // lowest port num to try
def portRange = 5000;
def calcPorts = {
ports = [];
def rnd = { base + random.nextInt(portRange) }
def p = rnd();
ports = (0 ..< 3).collect { ((p + it) as Integer) }
// lets make activemq port same as pid so easy to use jconsole
ports[-1] = javaPid
ports; // return
}
calcPorts();
while ( tryCounter-- >= 0 ) {
try {
Thread.sleep random.nextInt( 100 );
// sockets = ports.collect { new Socket(it) }
ports.each { itPort -> sockets << new ServerSocket(itPort) }
assert sockets.size() == ports.size(); // need at least 3
break;
} catch(Exception ex) {
if ( !(ex instanceof java.net.BindException) ) {
System.err.println ex
}
sockets.findAll { it != null }.each { itSocketToClose ->
try { itSocketToClose.close(); } catch(Exception ex2) {}
}
sockets.clear();
calcPorts();
Thread.sleep( random.nextInt( 200 ) + 500 );
}
}
Thread.sleep random.nextInt( 200 );
sockets.each { it.close() }
def sm = [:] // for system map props
sm.'com.sun.management.jmxremote.port' = ports[0].toString()
sm.'com.sun.management.jmxremote.local.only' = 'false'
sm.'com.sun.management.jmxremote.authenticate' = 'false'
sm.'myJmxConnectorPort' = ports[1].toString()
// ports[0] is for com.sun.management.jmxremote.port
// ports[1] is for broker.getManagementContext().setConnectorPort
sm.keySet().each { key -> System.properties[ key ] = sm[key] }
BrokerService broker
def brokerCreated = false;
tryCounter = 100;
while( (!brokerCreated) && (tryCounter-- >= 0) ) {
try {
broker = createBroker();
brokerCreated = true;
// run forever
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
break; //
} catch(Exception ex) {
println "### Oops: ${ex}"
}
} // end while
}
public static BrokerService createBroker() throws Exception {
def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
BrokerService broker = new BrokerService();
broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
broker.setUseShutdownHook(true);
// Stop ActiveMQ 5.8 Errors or Warnings when running on machines with
// less than 50G to 100G of diskspace
Long HundredGig = 107374182400L
File fileVisitor = broker.tmpDataDirectory.canonicalFile;
while( !fileVisitor.exists() ) {
fileVisitor = new File(fileVisitor, '..').canonicalFile
}
if ( fileVisitor.usableSpace < HundredGig ) {
broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2;
broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
}
broker.systemUsage.setSendFailIfNoSpace(false);
broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
// String theBrokerSuffix = sJavaPid.replace('@','_');
broker.brokerName = 'broker1'
broker.setUseJmx(true);
// sometimes set in bat/sh starter
Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
// !!! for jmx usage
broker.setBrokerObjectName(
BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
)
def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
// conn.name += "_port_${javaPid}"
// for discovery
conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
broker.start();
}
}
Revision: 63532
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at May 15, 2013 05:00 by laurenceosx
Initial Code
#!/usr/bin/env groovy
@Grapes([
@Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
])
// file: EmbeddedBroker.groovy
/*
ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
This example overcomes some limitations of the basic ActiveMQ embedded
brokers examples I found online
Some of the challenges were:
# Multiple instances on same machine and be able to use JMX.
# Running on a machine with less than 50G or 100G disk space
caused combinations of ActiveMQ errors or warnings.
# Groovy Grapes/Grab syntax to use that would work on pc and mac.
The broker in this example uses a nonperistent store and
is multicast discoverable and should allow you to run multiple instances
of it (in separate processes of course) which is the reason for all the
code snips containing random port nums and random thread sleeps
to increase the odds of success of each new embedded broker process
to get a working set of port nums.
*/
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
import org.apache.activemq.transport.discovery.DiscoveryListener;
public final class EmbeddedBroker {
static random = new java.util.Random();
static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
static Integer javaPid = calcPid();
static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;
private EmbeddedBroker() {} // not used - from Java example
static def sockets // to pre-allocate ports for ActiveMQ
static def ports // randomly generated list of free ports
static def calcMqPort = { ports.last() }
public static void main(String[] args) throws Exception {
//
/*
SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^
-Dcom.sun.management.jmxremote.port=51001 ^
-Dcom.sun.management.jmxremote.local.only=false ^
-Dcom.sun.management.jmxremote.authenticate=false ^
-DmyJmxConnectorPort=41001
*/
def tryCounter = 1000;
sockets = [];
def base = 4000; // lowest port num to try
def portRange = 5000;
def calcPorts = {
ports = [];
def rnd = { base + random.nextInt(portRange) }
def p = rnd();
ports = (0 ..< 3).collect { ((p + it) as Integer) }
// lets make activemq port same as pid so easy to use jconsole
ports[-1] = javaPid
ports; // return
}
calcPorts();
while ( tryCounter-- >= 0 ) {
try {
Thread.sleep random.nextInt( 100 );
// sockets = ports.collect { new Socket(it) }
ports.each { itPort -> sockets << new ServerSocket(itPort) }
assert sockets.size() == ports.size(); // need at least 3
break;
} catch(Exception ex) {
if ( !(ex instanceof java.net.BindException) ) {
System.err.println ex
}
sockets.findAll { it != null }.each { itSocketToClose ->
try { itSocketToClose.close(); } catch(Exception ex2) {}
}
sockets.clear();
calcPorts();
Thread.sleep( random.nextInt( 200 ) + 500 );
}
}
Thread.sleep random.nextInt( 200 );
sockets.each { it.close() }
def sm = [:] // for system map props
sm.'com.sun.management.jmxremote.port' = ports[0].toString()
sm.'com.sun.management.jmxremote.local.only' = 'false'
sm.'com.sun.management.jmxremote.authenticate' = 'false'
sm.'myJmxConnectorPort' = ports[1].toString()
// ports[0] is for com.sun.management.jmxremote.port
// ports[1] is for broker.getManagementContext().setConnectorPort
sm.keySet().each { key -> System.properties[ key ] = sm[key] }
BrokerService broker
def brokerCreated = false;
tryCounter = 100;
while( (!brokerCreated) && (tryCounter-- >= 0) ) {
try {
broker = createBroker();
brokerCreated = true;
// run forever
Object lock = new Object();
synchronized (lock) {
lock.wait();
}
break; //
} catch(Exception ex) {
println "### Oops: ${ex}"
}
} // end while
}
public static BrokerService createBroker() throws Exception {
def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
BrokerService broker = new BrokerService();
broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
broker.setUseShutdownHook(true);
// Stop ActiveMQ 5.8 Errors or Warnings when running on machines with
// less than 50G to 100G of diskspace
Long HundredGig = 107374182400L
File fileVisitor = broker.tmpDataDirectory.canonicalFile;
while( !fileVisitor.exists() ) {
fileVisitor = new File(fileVisitor, '..').canonicalFile
}
if ( fileVisitor.usableSpace < HundredGig ) {
broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2;
broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
}
broker.systemUsage.setSendFailIfNoSpace(false);
broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
// String theBrokerSuffix = sJavaPid.replace('@','_');
broker.brokerName = 'broker1'
broker.setUseJmx(true);
// sometimes set in bat/sh starter
Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
// !!! for jmx usage
broker.setBrokerObjectName(
BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
)
def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
// conn.name += "_port_${javaPid}"
// for discovery
conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
broker.start();
}
}
Initial URL
Initial Description
ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013 This example overcomes some limitations of the basic ActiveMQ embedded brokers examples I found online Some of the challenges were: # Multiple instances on same machine and be able to use JMX. # Running on a machine with less than 50G or 100G disk space caused combinations of ActiveMQ errors or warnings. # Groovy Grapes/Grab syntax to use that would work on pc and mac. The broker in this example uses a nonpersistent store and is multicast discoverable and should allow you to run multiple instances of it (in separate processes of course) which is the reason for all the code snips containing random port nums and random thread sleeps to increase the odds of success of each new embedded broker process to get a working set of port nums.
Initial Title
Groovy ActiveMQ 5.8 Embedded Broker
Initial Tags
groovy
Initial Language
Groovy