Return to Snippet

Revision: 63533
at May 15, 2013 05:03 by laurenceosx


Updated Code
#!/usr/bin/env groovy

@Grapes([
	@Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
])

// file: EmbeddedBroker.groovy

/*
	ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
	This example overcomes some limitations of the basic ActiveMQ embedded
	brokers examples I found online
	
	Some of the challenges were:
		# Multiple instances on same machine and be able to use JMX.
		# Running on a machine with less than 50G or 100G disk space 
		  caused combinations of ActiveMQ errors or warnings.
		# Groovy Grapes/Grab syntax to use that would work on pc and mac.
		
	The broker in this example uses a nonpersistent store and 
	is multicast discoverable and should allow you to run multiple instances 
	of it (in separate processes of course) which is the reason for all the 
	code snips containing random port nums and random thread sleeps 
	to increase the odds of success of each new embedded broker process 
	to get a working set of port nums.
*/

import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;

import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
import org.apache.activemq.transport.discovery.DiscoveryListener;

public final class EmbeddedBroker {
    
	static random = new java.util.Random();

	static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
	
	static Integer javaPid = calcPid();
	static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
	
	static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;

    private EmbeddedBroker() {} // not used - from Java example
    
    static def sockets // to pre-allocate ports for ActiveMQ
    static def ports   // randomly generated list of free ports
    static def calcMqPort = { ports.last() }
    
    public static void main(String[] args) throws Exception {
		//
		/*
		SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^                                     
		-Dcom.sun.management.jmxremote.port=51001 ^
		-Dcom.sun.management.jmxremote.local.only=false ^
		-Dcom.sun.management.jmxremote.authenticate=false ^
		-DmyJmxConnectorPort=41001
		*/
		
		def tryCounter = 1000;
		sockets = [];   
		
		def base      = 4000; // lowest port num to try
		def portRange = 5000;
		
		def calcPorts = {
		    ports = [];
            def rnd = { base + random.nextInt(portRange) }
            def p = rnd();
            ports = (0 ..< 3).collect { ((p + it) as Integer) }
            // lets make activemq port same as pid so easy to use jconsole
            ports[-1] = javaPid 
            ports; // return
		}
		
		calcPorts();
		while ( tryCounter-- >= 0 ) {
		    try {
				Thread.sleep random.nextInt( 100 );
		        // sockets = ports.collect { new Socket(it) }
		        ports.each { itPort -> sockets << new ServerSocket(itPort) }
		        assert sockets.size() == ports.size(); // need at least 3
		        break;
		    } catch(Exception ex) {
		        if ( !(ex instanceof java.net.BindException) ) {
		            System.err.println ex
		        }
		        sockets.findAll { it != null }.each { itSocketToClose ->
		            try { itSocketToClose.close(); } catch(Exception ex2) {}
		        }
		        sockets.clear();
		        calcPorts();
		        Thread.sleep( random.nextInt( 200 ) + 500 );
		    }
		}
		Thread.sleep random.nextInt( 200 );
		sockets.each { it.close() }
		
		def sm = [:] // for system map props
		sm.'com.sun.management.jmxremote.port'         = ports[0].toString()
		sm.'com.sun.management.jmxremote.local.only'   = 'false'
		sm.'com.sun.management.jmxremote.authenticate' = 'false'
		sm.'myJmxConnectorPort'                        = ports[1].toString()
		
		// ports[0] is for com.sun.management.jmxremote.port
		// ports[1] is for broker.getManagementContext().setConnectorPort
		
		sm.keySet().each { key -> System.properties[ key ] = sm[key] }
		
		BrokerService broker
		def brokerCreated = false;
		
		tryCounter = 100;
		while( (!brokerCreated) && (tryCounter-- >= 0) ) {
			try {
				broker = createBroker();
				brokerCreated = true;
				
				// run forever
				Object lock = new Object();
                synchronized (lock) {
                    lock.wait();
                }
                
                break; //
			} catch(Exception ex) {
				println "### Oops: ${ex}"
			}
		} // end while
    }

    public static BrokerService createBroker() throws Exception {
		def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
		
		BrokerService broker = new BrokerService();
		broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
		broker.setUseShutdownHook(true);
		
		// Stop ActiveMQ 5.8 Errors or Warnings when running on machines with 
		// less than 50G to 100G of diskspace
		Long HundredGig = 107374182400L
        File fileVisitor = broker.tmpDataDirectory.canonicalFile;
        while( !fileVisitor.exists() ) {
        		fileVisitor = new File(fileVisitor, '..').canonicalFile
        }
        if ( fileVisitor.usableSpace < HundredGig ) {
        		broker.systemUsage.tempUsage.limit  = fileVisitor.usableSpace/2;
        		broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
        }
        broker.systemUsage.setSendFailIfNoSpace(false);
        broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
        
        // String theBrokerSuffix = sJavaPid.replace('@','_');
        broker.brokerName = 'broker1'
        
        broker.setUseJmx(true);
        
        // sometimes set in bat/sh starter
        Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
        broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
                
        // !!! for jmx usage
        broker.setBrokerObjectName(
			BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
        )
        
        def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
        // conn.name += "_port_${javaPid}"
        // for discovery
        conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
        
        broker.start();
    }
}

Revision: 63532
at May 15, 2013 05:00 by laurenceosx


Initial Code
#!/usr/bin/env groovy

@Grapes([
	@Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
])

// file: EmbeddedBroker.groovy

/*
	ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
	This example overcomes some limitations of the basic ActiveMQ embedded
	brokers examples I found online
	
	Some of the challenges were:
		# Multiple instances on same machine and be able to use JMX.
		# Running on a machine with less than 50G or 100G disk space 
		  caused combinations of ActiveMQ errors or warnings.
		# Groovy Grapes/Grab syntax to use that would work on pc and mac.
		
	The broker in this example uses a nonperistent store and 
	is multicast discoverable and should allow you to run multiple instances 
	of it (in separate processes of course) which is the reason for all the 
	code snips containing random port nums and random thread sleeps 
	to increase the odds of success of each new embedded broker process 
	to get a working set of port nums.
*/

import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;

import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
import org.apache.activemq.transport.discovery.DiscoveryListener;

public final class EmbeddedBroker {
    
	static random = new java.util.Random();

	static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
	
	static Integer javaPid = calcPid();
	static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
	
	static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;

    private EmbeddedBroker() {} // not used - from Java example
    
    static def sockets // to pre-allocate ports for ActiveMQ
    static def ports   // randomly generated list of free ports
    static def calcMqPort = { ports.last() }
    
    public static void main(String[] args) throws Exception {
		//
		/*
		SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^                                     
		-Dcom.sun.management.jmxremote.port=51001 ^
		-Dcom.sun.management.jmxremote.local.only=false ^
		-Dcom.sun.management.jmxremote.authenticate=false ^
		-DmyJmxConnectorPort=41001
		*/
		
		def tryCounter = 1000;
		sockets = [];   
		
		def base      = 4000; // lowest port num to try
		def portRange = 5000;
		
		def calcPorts = {
		    ports = [];
            def rnd = { base + random.nextInt(portRange) }
            def p = rnd();
            ports = (0 ..< 3).collect { ((p + it) as Integer) }
            // lets make activemq port same as pid so easy to use jconsole
            ports[-1] = javaPid 
            ports; // return
		}
		
		calcPorts();
		while ( tryCounter-- >= 0 ) {
		    try {
				Thread.sleep random.nextInt( 100 );
		        // sockets = ports.collect { new Socket(it) }
		        ports.each { itPort -> sockets << new ServerSocket(itPort) }
		        assert sockets.size() == ports.size(); // need at least 3
		        break;
		    } catch(Exception ex) {
		        if ( !(ex instanceof java.net.BindException) ) {
		            System.err.println ex
		        }
		        sockets.findAll { it != null }.each { itSocketToClose ->
		            try { itSocketToClose.close(); } catch(Exception ex2) {}
		        }
		        sockets.clear();
		        calcPorts();
		        Thread.sleep( random.nextInt( 200 ) + 500 );
		    }
		}
		Thread.sleep random.nextInt( 200 );
		sockets.each { it.close() }
		
		def sm = [:] // for system map props
		sm.'com.sun.management.jmxremote.port'         = ports[0].toString()
		sm.'com.sun.management.jmxremote.local.only'   = 'false'
		sm.'com.sun.management.jmxremote.authenticate' = 'false'
		sm.'myJmxConnectorPort'                        = ports[1].toString()
		
		// ports[0] is for com.sun.management.jmxremote.port
		// ports[1] is for broker.getManagementContext().setConnectorPort
		
		sm.keySet().each { key -> System.properties[ key ] = sm[key] }
		
		BrokerService broker
		def brokerCreated = false;
		
		tryCounter = 100;
		while( (!brokerCreated) && (tryCounter-- >= 0) ) {
			try {
				broker = createBroker();
				brokerCreated = true;
				
				// run forever
				Object lock = new Object();
                synchronized (lock) {
                    lock.wait();
                }
                
                break; //
			} catch(Exception ex) {
				println "### Oops: ${ex}"
			}
		} // end while
    }

    public static BrokerService createBroker() throws Exception {
		def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
		
		BrokerService broker = new BrokerService();
		broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
		broker.setUseShutdownHook(true);
		
		// Stop ActiveMQ 5.8 Errors or Warnings when running on machines with 
		// less than 50G to 100G of diskspace
		Long HundredGig = 107374182400L
        File fileVisitor = broker.tmpDataDirectory.canonicalFile;
        while( !fileVisitor.exists() ) {
        		fileVisitor = new File(fileVisitor, '..').canonicalFile
        }
        if ( fileVisitor.usableSpace < HundredGig ) {
        		broker.systemUsage.tempUsage.limit  = fileVisitor.usableSpace/2;
        		broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
        }
        broker.systemUsage.setSendFailIfNoSpace(false);
        broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
        
        // String theBrokerSuffix = sJavaPid.replace('@','_');
        broker.brokerName = 'broker1'
        
        broker.setUseJmx(true);
        
        // sometimes set in bat/sh starter
        Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
        broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
                
        // !!! for jmx usage
        broker.setBrokerObjectName(
			BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
        )
        
        def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
        // conn.name += "_port_${javaPid}"
        // for discovery
        conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
        
        broker.start();
    }
}

Initial URL

                                

Initial Description
ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
	This example overcomes some limitations of the basic ActiveMQ embedded
	brokers examples I found online
	
	Some of the challenges were:
		# Multiple instances on same machine and be able to use JMX.
		# Running on a machine with less than 50G or 100G disk space 
		  caused combinations of ActiveMQ errors or warnings.
		# Groovy Grapes/Grab syntax to use that would work on pc and mac.
		
	The broker in this example uses a nonpersistent store and 
	is multicast discoverable and should allow you to run multiple instances 
	of it (in separate processes of course) which is the reason for all the 
	code snips containing random port nums and random thread sleeps 
	to increase the odds of success of each new embedded broker process 
	to get a working set of port nums.

Initial Title
Groovy ActiveMQ 5.8 Embedded Broker

Initial Tags
groovy

Initial Language
Groovy