Revision: 63533
Updated Code
at May 15, 2013 05:03 by laurenceosx
Updated Code
#!/usr/bin/env groovy @Grapes([ @Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false) ]) // file: EmbeddedBroker.groovy /* ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013 This example overcomes some limitations of the basic ActiveMQ embedded brokers examples I found online Some of the challenges were: # Multiple instances on same machine and be able to use JMX. # Running on a machine with less than 50G or 100G disk space caused combinations of ActiveMQ errors or warnings. # Groovy Grapes/Grab syntax to use that would work on pc and mac. The broker in this example uses a nonpersistent store and is multicast discoverable and should allow you to run multiple instances of it (in separate processes of course) which is the reason for all the code snips containing random port nums and random thread sleeps to increase the odds of success of each new embedded broker process to get a working set of port nums. */ import javax.management.ObjectName; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory; import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54 import org.apache.activemq.transport.discovery.DiscoveryListener; public final class EmbeddedBroker { static random = new java.util.Random(); static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ; static Integer javaPid = calcPid(); static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING; private EmbeddedBroker() {} // not used - from Java example static def sockets // to pre-allocate ports for ActiveMQ static def ports // randomly generated list of free ports static def calcMqPort = { ports.last() } public static void main(String[] args) throws Exception { // /* SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^ -Dcom.sun.management.jmxremote.port=51001 ^ -Dcom.sun.management.jmxremote.local.only=false ^ -Dcom.sun.management.jmxremote.authenticate=false ^ -DmyJmxConnectorPort=41001 */ def tryCounter = 1000; sockets = []; def base = 4000; // lowest port num to try def portRange = 5000; def calcPorts = { ports = []; def rnd = { base + random.nextInt(portRange) } def p = rnd(); ports = (0 ..< 3).collect { ((p + it) as Integer) } // lets make activemq port same as pid so easy to use jconsole ports[-1] = javaPid ports; // return } calcPorts(); while ( tryCounter-- >= 0 ) { try { Thread.sleep random.nextInt( 100 ); // sockets = ports.collect { new Socket(it) } ports.each { itPort -> sockets << new ServerSocket(itPort) } assert sockets.size() == ports.size(); // need at least 3 break; } catch(Exception ex) { if ( !(ex instanceof java.net.BindException) ) { System.err.println ex } sockets.findAll { it != null }.each { itSocketToClose -> try { itSocketToClose.close(); } catch(Exception ex2) {} } sockets.clear(); calcPorts(); Thread.sleep( random.nextInt( 200 ) + 500 ); } } Thread.sleep random.nextInt( 200 ); sockets.each { it.close() } def sm = [:] // for system map props sm.'com.sun.management.jmxremote.port' = ports[0].toString() sm.'com.sun.management.jmxremote.local.only' = 'false' sm.'com.sun.management.jmxremote.authenticate' = 'false' sm.'myJmxConnectorPort' = ports[1].toString() // ports[0] is for com.sun.management.jmxremote.port // ports[1] is for broker.getManagementContext().setConnectorPort sm.keySet().each { key -> System.properties[ key ] = sm[key] } BrokerService broker def brokerCreated = false; tryCounter = 100; while( (!brokerCreated) && (tryCounter-- >= 0) ) { try { broker = createBroker(); brokerCreated = true; // run forever Object lock = new Object(); synchronized (lock) { lock.wait(); } break; // } catch(Exception ex) { println "### Oops: ${ex}" } } // end while } public static BrokerService createBroker() throws Exception { def gi = groovy.inspect.swingui.ObjectBrowser.&inspect; BrokerService broker = new BrokerService(); broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me broker.setUseShutdownHook(true); // Stop ActiveMQ 5.8 Errors or Warnings when running on machines with // less than 50G to 100G of diskspace Long HundredGig = 107374182400L File fileVisitor = broker.tmpDataDirectory.canonicalFile; while( !fileVisitor.exists() ) { fileVisitor = new File(fileVisitor, '..').canonicalFile } if ( fileVisitor.usableSpace < HundredGig ) { broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2; broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2; } broker.systemUsage.setSendFailIfNoSpace(false); broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true); // String theBrokerSuffix = sJavaPid.replace('@','_'); broker.brokerName = 'broker1' broker.setUseJmx(true); // sometimes set in bat/sh starter Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger(); broker.getManagementContext().setConnectorPort( myJmxConnectorPort ); // !!! for jmx usage broker.setBrokerObjectName( BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName) ) def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better // conn.name += "_port_${javaPid}" // for discovery conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ? broker.start(); } }
Revision: 63532
Initial Code
Initial URL
Initial Description
Initial Title
Initial Tags
Initial Language
at May 15, 2013 05:00 by laurenceosx
Initial Code
#!/usr/bin/env groovy @Grapes([ @Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false) ]) // file: EmbeddedBroker.groovy /* ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013 This example overcomes some limitations of the basic ActiveMQ embedded brokers examples I found online Some of the challenges were: # Multiple instances on same machine and be able to use JMX. # Running on a machine with less than 50G or 100G disk space caused combinations of ActiveMQ errors or warnings. # Groovy Grapes/Grab syntax to use that would work on pc and mac. The broker in this example uses a nonperistent store and is multicast discoverable and should allow you to run multiple instances of it (in separate processes of course) which is the reason for all the code snips containing random port nums and random thread sleeps to increase the odds of success of each new embedded broker process to get a working set of port nums. */ import javax.management.ObjectName; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerMBeanSupport; import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory; import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54 import org.apache.activemq.transport.discovery.DiscoveryListener; public final class EmbeddedBroker { static random = new java.util.Random(); static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ; static Integer javaPid = calcPid(); static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING; private EmbeddedBroker() {} // not used - from Java example static def sockets // to pre-allocate ports for ActiveMQ static def ports // randomly generated list of free ports static def calcMqPort = { ports.last() } public static void main(String[] args) throws Exception { // /* SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^ -Dcom.sun.management.jmxremote.port=51001 ^ -Dcom.sun.management.jmxremote.local.only=false ^ -Dcom.sun.management.jmxremote.authenticate=false ^ -DmyJmxConnectorPort=41001 */ def tryCounter = 1000; sockets = []; def base = 4000; // lowest port num to try def portRange = 5000; def calcPorts = { ports = []; def rnd = { base + random.nextInt(portRange) } def p = rnd(); ports = (0 ..< 3).collect { ((p + it) as Integer) } // lets make activemq port same as pid so easy to use jconsole ports[-1] = javaPid ports; // return } calcPorts(); while ( tryCounter-- >= 0 ) { try { Thread.sleep random.nextInt( 100 ); // sockets = ports.collect { new Socket(it) } ports.each { itPort -> sockets << new ServerSocket(itPort) } assert sockets.size() == ports.size(); // need at least 3 break; } catch(Exception ex) { if ( !(ex instanceof java.net.BindException) ) { System.err.println ex } sockets.findAll { it != null }.each { itSocketToClose -> try { itSocketToClose.close(); } catch(Exception ex2) {} } sockets.clear(); calcPorts(); Thread.sleep( random.nextInt( 200 ) + 500 ); } } Thread.sleep random.nextInt( 200 ); sockets.each { it.close() } def sm = [:] // for system map props sm.'com.sun.management.jmxremote.port' = ports[0].toString() sm.'com.sun.management.jmxremote.local.only' = 'false' sm.'com.sun.management.jmxremote.authenticate' = 'false' sm.'myJmxConnectorPort' = ports[1].toString() // ports[0] is for com.sun.management.jmxremote.port // ports[1] is for broker.getManagementContext().setConnectorPort sm.keySet().each { key -> System.properties[ key ] = sm[key] } BrokerService broker def brokerCreated = false; tryCounter = 100; while( (!brokerCreated) && (tryCounter-- >= 0) ) { try { broker = createBroker(); brokerCreated = true; // run forever Object lock = new Object(); synchronized (lock) { lock.wait(); } break; // } catch(Exception ex) { println "### Oops: ${ex}" } } // end while } public static BrokerService createBroker() throws Exception { def gi = groovy.inspect.swingui.ObjectBrowser.&inspect; BrokerService broker = new BrokerService(); broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me broker.setUseShutdownHook(true); // Stop ActiveMQ 5.8 Errors or Warnings when running on machines with // less than 50G to 100G of diskspace Long HundredGig = 107374182400L File fileVisitor = broker.tmpDataDirectory.canonicalFile; while( !fileVisitor.exists() ) { fileVisitor = new File(fileVisitor, '..').canonicalFile } if ( fileVisitor.usableSpace < HundredGig ) { broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2; broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2; } broker.systemUsage.setSendFailIfNoSpace(false); broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true); // String theBrokerSuffix = sJavaPid.replace('@','_'); broker.brokerName = 'broker1' broker.setUseJmx(true); // sometimes set in bat/sh starter Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger(); broker.getManagementContext().setConnectorPort( myJmxConnectorPort ); // !!! for jmx usage broker.setBrokerObjectName( BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName) ) def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better // conn.name += "_port_${javaPid}" // for discovery conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ? broker.start(); } }
Initial URL
Initial Description
ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013 This example overcomes some limitations of the basic ActiveMQ embedded brokers examples I found online Some of the challenges were: # Multiple instances on same machine and be able to use JMX. # Running on a machine with less than 50G or 100G disk space caused combinations of ActiveMQ errors or warnings. # Groovy Grapes/Grab syntax to use that would work on pc and mac. The broker in this example uses a nonpersistent store and is multicast discoverable and should allow you to run multiple instances of it (in separate processes of course) which is the reason for all the code snips containing random port nums and random thread sleeps to increase the odds of success of each new embedded broker process to get a working set of port nums.
Initial Title
Groovy ActiveMQ 5.8 Embedded Broker
Initial Tags
groovy
Initial Language
Groovy