Skip to content

JavaAMQPImplementation

Tony Woode edited this page Apr 8, 2015 · 1 revision

The RPC Client:

import java.io.IOException;
import java.util.Properties;

import org.apache.log4j.Logger;

import com.openshare.util.properties.PropertiesLoader;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * A client to send a message
 * @author james.mcilroy
 *
 */
public class RPCClient{
	
	private static final Logger logger = Logger.getLogger(RPCClient.class);
	
	private static final String RPC_PROP_FILE_NAME = "openshare.conf";
	private Connection connection;
	private Channel channel;
	private String host = "localhost";
	private String replyQueueName;
	private QueueingConsumer consumer;

	/**
	 * constructor
	 */
	public RPCClient() {
		try{
			Properties props = null;
			try {
				PropertiesLoader loader = new PropertiesLoader(RPC_PROP_FILE_NAME);
				props = loader.getProps();
				host = props.getProperty("message_switch_hostname");
			} 
			catch (IOException e) {
				logger.error("failed to load config file: " + RPC_PROP_FILE_NAME);
			}
		    
		    ConnectionFactory factory = new ConnectionFactory();
		    factory.setHost(host);
		    connection = factory.newConnection();
		    channel = connection.createChannel();

		    replyQueueName = channel.queueDeclare().getQueue(); 
		    consumer = new QueueingConsumer(channel);
		    channel.basicConsume(replyQueueName, true, consumer);
		}
		catch(Throwable t){
			throw new RuntimeException("failed to connect to queues, cause: ",t);
		}
	}

	/**
	 * Send a message
	 * @param message
	 * @param txId
	 * @param module
	 * @param method
	 * @return
	 * @throws Exception
	 */
	public String send(String message,String txId,String module,String method,boolean ignoreReply) throws Exception {     
	    String response = null;
	    String corrId = txId;

	    if(!ignoreReply){
		    BasicProperties props = new BasicProperties
	                .Builder()
	                .correlationId(corrId)
	                .replyTo(replyQueueName)
	                .build();
	
			channel.basicPublish("", module+"."+method, props, message.getBytes());
			
			while (true) {
				QueueingConsumer.Delivery delivery = consumer.nextDelivery();
				if (delivery.getProperties().getCorrelationId().equals(corrId)) {
					response = new String(delivery.getBody());
					break;
				}
			}
	    }
	    else{
	    	BasicProperties props = new BasicProperties
	                .Builder()
	                .correlationId(corrId)
	                .build();
	
			channel.basicPublish("", module+"."+method, props, message.getBytes());
	    }
		
		return response;
	}

	public String send(String message,String txId,String module,String method) throws Exception {
		return send(message, txId, module, method,false);
	}
	
	public void close() throws Exception {
	    connection.close();
	}
}

The RPC Server

import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.log4j.Logger;

import com.openshare.util.properties.PropertiesLoader;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * A RPC server to consume messages on a queue
 * @author james.mcilroy
 *
 */
public class RPCServer implements Runnable{
	
	private static final Logger logger = Logger.getLogger(RPCServer.class);
	
	private static final String RPC_PROP_FILE_NAME = "openshare.conf";
	
	private String inBoundQueueName = "inBoundQueueName";
	private String host = "localhost";
	private String replyQueueName;
	private AtomicBoolean stop = new AtomicBoolean(false);
	
	/**
	 * RPC Server COnstructor
	 * @param methodName
	 */
	public RPCServer(String methodName) {
		try{
			Properties props = null;
			PropertiesLoader loader = new PropertiesLoader(RPC_PROP_FILE_NAME);
			props = loader.getProps();
			host = props.getProperty("message_switch_hostname");
			String moduleName = props.getProperty("workflow_module_queue_name");
			inBoundQueueName = moduleName + "." + methodName;
		
		}
		catch(Throwable t){
			throw new RuntimeException("failed to connect to queues, cause: ",t);
		}
	}

	/**
	 * Stop message handling
	 */
	public void stop(){
		this.stop.set(true);
	}
	
	public void run() {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(host);
		try{
			Connection connection = factory.newConnection();
			Channel channelReceive = connection.createChannel();
			channelReceive.queueDeclare(inBoundQueueName, true, false, false, null);
			logger.info(" [*] Waiting for messages.");
			channelReceive.basicQos(1);
			QueueingConsumer consumerReceive = new QueueingConsumer(channelReceive);
			channelReceive.basicConsume(inBoundQueueName, false, consumerReceive);
			stop.set(false);
			while (!stop.get()) {
				logger.info("checking for messages");
				try{
					QueueingConsumer.Delivery delivery = consumerReceive.nextDelivery();
					//hand off to the method handler
					RPCMethodHandler handler = new RPCMethodHandler(channelReceive, delivery, replyQueueName);
					Thread handlerThread = new Thread(handler);
					logger.info("handing of to method handler thread");
					handlerThread.start();
				}
				catch(Throwable t){
					logger.error("Failed to consume message, cause: ",t);
				}
			}
		}
		catch(Throwable t){
			logger.error("failed to run queue receive startup, cause: ",t);
			this.stop.set(true);
		}
	}
	
	
	public String getInBoundQueueName() {
		return inBoundQueueName;
	}
}

The RPC Method Handler for non blocking message processing:

import java.util.UUID;

import org.apache.log4j.Logger;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.openshare.service.workflow.rpc.MethodHandler;
import com.openshare.service.workflow.rpc.OpenShareResponse;
import com.openshare.service.workflow.rpc.ServiceMethodMapper;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
/**
 * RPC Message handler class allows non blocking method handling
 * @author james.mcilroy
 *
 */
public class RPCMethodHandler implements Runnable{
	
	private static final Logger logger = Logger.getLogger(RPCMethodHandler.class);
	
	private QueueingConsumer.Delivery delivery;
	private Channel channelReceive;
	private String replyQueueName;
	
	/**
	 * Constructor
	 * @param channelReceive
	 * @param delivery
	 * @param replyQueueName
	 */
	public RPCMethodHandler(Channel channelReceive,QueueingConsumer.Delivery delivery,String replyQueueName){
		this.delivery = delivery;
		this.channelReceive = channelReceive;
		this.replyQueueName = replyQueueName;
	}

	@Override
	public void run() {
		try{
			String message = new String(delivery.getBody());
			logger.info("Message [x] Received '" + message + "'");
			String transactionId = delivery.getProperties().getCorrelationId();
			String replyToQueue = delivery.getProperties().getReplyTo();
			String routingKey = delivery.getEnvelope().getRoutingKey();
			//assume routing key is "service.method"
			String method = routingKey.substring(routingKey.indexOf(".")+1);
			MethodHandler<?> handler = ServiceMethodMapper.getMethodHandler(transactionId, method, new String(delivery.getBody()));
			OpenShareResponse response = handler.handleExecution();
			logger.info("Message Processing [x] Done");
			//do reply to message (if queue set)
			UUID uuid = UUID.randomUUID();
			if(replyToQueue!=null && !replyToQueue.isEmpty()){
				BasicProperties props = new BasicProperties
	                    .Builder()
	                    .correlationId(transactionId)
	                    .replyTo(replyQueueName+uuid.toString())
	                    .build();
				ObjectMapper mapper = new ObjectMapper();
				String responseMessage = mapper.writeValueAsString(response.getPayload());
				channelReceive.basicPublish("", replyToQueue, props, responseMessage.getBytes());
			}
			channelReceive.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
		catch(Throwable t){
			logger.error("Failed to consume message, cause: ",t);
		}
		
	}
	
}

The RPC Server Container to set up all RPC Servers (one per method of module)

import java.util.HashMap;

import org.apache.log4j.Logger;

import com.openshare.service.workflow.rpc.ServiceMethodMapper;

/**
 * Class that contains all the RPC Servers (one per method)
 * @author james.mcilroy
 *
 */
public class RPCServerContainer {
	
	private static final Logger logger = Logger.getLogger(RPCServerContainer.class);

	private final HashMap<ServiceMethodMapper,RPCServer> serverMap;
	
	private static class SingletonHolder { 
        public static final RPCServerContainer INSTANCE = new RPCServerContainer();
	}

	/**
	 * singleton access
	 * @return
	 */
	public static RPCServerContainer getInstance() {
	        return SingletonHolder.INSTANCE;
	}
	
	private RPCServerContainer(){
		logger.info("initialising RPC server er available method:");
		serverMap = new HashMap<ServiceMethodMapper,RPCServer>();
		for(ServiceMethodMapper smm : ServiceMethodMapper.values()){
			//create rpc server
			RPCServer rpcServer = new RPCServer(smm.getMethodName());
			//place in map
			serverMap.put(smm, rpcServer);
			//attempt to start server
			logger.info("satring RPC server for " + rpcServer.getInBoundQueueName());
			Thread serverThread = new Thread(rpcServer);
			serverThread.start();
		}
	}
	
	public void stop(){
		logger.info("stopping all RPC Servers");
		for(ServiceMethodMapper smm : ServiceMethodMapper.values()){
			RPCServer rpcServer = serverMap.get(smm);
			logger.info("stopping RPC server for " + rpcServer.getInBoundQueueName());
			rpcServer.stop();
		}
	}
}

The Method Handler

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;

import com.openshare.service.workflow.exception.OpenshareException;


/**
 * a method handler class.
 * @author james.mcilroy
 *
 */
public abstract class MethodHandler<T> {

	private static final Logger logger = Logger.getLogger(MethodHandler.class);
	
	private Object payload;
	protected String transactionId;
	protected Class<T> type;
	
	/**
	 * Default constructor
	 */
	@SuppressWarnings("unchecked")
	public MethodHandler(){
		Type t = getClass().getGenericSuperclass();
		ParameterizedType pt = (ParameterizedType) t;
		type = (Class<T>) pt.getActualTypeArguments()[0];
	}
	
	/**
	 * Handles the method execution
	 * @return
	 * @throws OpenshareException
	 */
	public final OpenShareResponse handleExecution() throws OpenshareException{
		try{
			T payloadConverted = getEntryFromPayload();
			return executeWithConvertedPayload(payloadConverted);
		}
		catch(Throwable t){
			logger.error("Failed to execute method handler, cause:",t);
			throw new OpenshareException("Failed to execute method handler, cause:",t);
		}
	}
	
	protected abstract OpenShareResponse executeWithConvertedPayload(T convertedPayload) throws OpenshareException;
	
	protected T getEntryFromPayload() throws OpenshareException{
		try{
			ObjectMapper mapper = new ObjectMapper();
			T entry = mapper.convertValue(payload, type);
			return entry;
		}
		catch(Throwable t){
			throw new OpenshareException("failed to convert payload: " + payload);
		}
	}
	
	public Object getPayload() {
		return payload;
	}

	public void setPayload(Object payload) {
		this.payload = payload;
	}
	
	public String getTransactionId() {
		return transactionId;
	}

	public void setTransactionId(String transactionId) {
		this.transactionId = transactionId;
	}

	protected OpenShareResponse generateErrorResponse(String error){
		OpenShareResponse response = new OpenShareResponse();
		response.setTxid(transactionId);
//		response.setStatus(StatusEnum.ERROR);
		response.setPayload(error);
		return response;
	}
}

The example available method enumeration

import com.openshare.service.workflow.exception.OpenshareException;
import com.openshare.service.workflow.rpc.impl.handler.config.WorkflowConfigRetriever;
import com.openshare.service.workflow.rpc.impl.handler.config.WorkflowDefinitionRetriever;
import com.openshare.service.workflow.rpc.impl.handler.config.WorkflowInstanceRetriever;
import com.openshare.service.workflow.rpc.impl.handler.mapping.WorkflowMappingHandler;
import com.openshare.service.workflow.rpc.impl.handler.mapping.WorkflowMappingListHandler;
import com.openshare.service.workflow.rpc.impl.handler.ping.PingHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.definition.WorkflowDefinitionRemovalHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.definition.WorkflowDefinitionUploadHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.instance.WorkflowInstanceResumeHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.instance.WorkflowInstanceRunnerHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.instance.WorkflowInstanceSuspenderHandler;
import com.openshare.service.workflow.rpc.impl.handler.process.instance.WorkflowInstanceTriggerHandler;
import com.openshare.service.workflow.rpc.impl.palyoad.mapping.TriggerWorkflowMappingPayload;
import com.openshare.service.workflow.rpc.impl.palyoad.process.definition.WorkFlowDefinitionPayload;
import com.openshare.service.workflow.rpc.impl.palyoad.process.instance.WorkflowInstanceResumePayload;
import com.openshare.service.workflow.rpc.impl.palyoad.process.instance.WorkflowInstanceRunPayload;
import com.openshare.service.workflow.rpc.impl.palyoad.process.instance.WorkflowInstanceTriggerPayload;
/**
 * default service methods, if no service method is defined to override this in the factory, then these are the ones used.
 * @author james.mcilroy
 *
 */
public enum ServiceMethodMapper {
	
	//ping "hello world" handler
	PING				("ping",	String.class,							PingHandler.class),
	//workflow definition operations
	WORKFLOW_ADD		("add",		WorkFlowDefinitionPayload.class,		WorkflowDefinitionUploadHandler.class),
	WORKFLOW_DELETE		("remove",	String.class,							WorkflowDefinitionRemovalHandler.class),
	//workflow instance operations
	WORKFLOW_RUN		("run",		WorkflowInstanceRunPayload.class,		WorkflowInstanceRunnerHandler.class),
	WORKFLOW_SUSPEND	("stop",	String.class,							WorkflowInstanceSuspenderHandler.class),
	WORKFLOW_RESUME 	("resume",	WorkflowInstanceResumePayload.class,	WorkflowInstanceResumeHandler.class),
	//trigger operations that use the trigger-workflow map to instantiate workflows
	WORKFLOW_TRIGGER	("trigger",	WorkflowInstanceTriggerPayload.class,	WorkflowInstanceTriggerHandler.class),
	//do operations on the workflow-trigger mapper objects
	WORKFLOW_MAPPER_OP	("map", 	TriggerWorkflowMappingPayload.class,  	WorkflowMappingHandler.class),
	WORKFLOW_MAPPER_LIST("list", 	String.class,  							WorkflowMappingListHandler.class),
	//component config
	COMPONENT_CONFIG	("config",	String.class,							WorkflowConfigRetriever.class),
	//instance retrieval
	INSTANCE_DISPLAY	("instance_display", String.class, 					WorkflowInstanceRetriever.class),
	//definition retrieval
	DEFINITION_DISPLAY	("definition_display", String.class,				WorkflowDefinitionRetriever.class);
	
	private final String methodName;
	private final Class<?> payloadClass;
	private final Class<? extends MethodHandler<?>> methodHandlerClass;
	
	private ServiceMethodMapper(String methodName,Class<?> payloadClass, Class<? extends MethodHandler<?>> methodHandlerClass){
		this.methodName = methodName;
		this.payloadClass = payloadClass;
		this.methodHandlerClass = methodHandlerClass;
	}
	
	public String getMethodName() {
		return methodName;
	}

	public Class<?> getPayloadClass() {
		return payloadClass;
	}

	
	public Class<? extends MethodHandler<?>> getMethodHandlerClass() {
		return methodHandlerClass;
	}
	
	/**
	 * create a method handler
	 * @param transactionId
	 * @param method
	 * @param payload
	 * @return
	 * @throws OpenshareException
	 */
	public static MethodHandler<?> getMethodHandler(String transactionId, String method, Object payload) throws OpenshareException{
		try{
			for(ServiceMethodMapper smm : ServiceMethodMapper.values()){
				if(smm.getMethodName().equals(method)){
					Class<? extends MethodHandler<?>> handlerClass = smm.getMethodHandlerClass();
					MethodHandler<?> handler = handlerClass.newInstance();
					//check payload is correct at this point...
					handler.setTransactionId(transactionId);
					handler.setPayload(payload);
					return handler;
				}
			}
			throw new OpenshareException("method: "+ method + " is not a recognised method");
		}
		catch(Throwable t){
			throw new OpenshareException("failed to execute requested method " + method + " for txid " + transactionId + " failed, cause:",t);
		}
	}

}

And finally, a tester class that can be run once module is started up:

import org.apache.log4j.Logger;

public class RPCClientTester {
	
	private static final Logger logger = Logger.getLogger(RPCClientTester.class);

	public static void test(){
		for(int i=0 ; i < 10 ; i++){
			try {
				RPCClient client = new RPCClient();
				logger.info("sending message to AMPQ");
				String result = client.send("Message for test on ping service " + i,""+ 666 + i, "workflow", "ping");
				logger.info("result: " + result);
				client.close();
			} catch (Exception e) {
				logger.error("failed to send message / recieve response, cause: ",e);
			}
		}
	}
}
Clone this wiki locally