Current page last modified at: 07-Dec-2017 12:32:44

RabbitMQ - ATTX MessageBroker

Image

rabbitmq:3.6.11-management

Debuging messages using Firehose

http://www.rabbitmq.com/firehose.html

Enable trace

rabbitmqctl trace_on

Enable tracing plugin

rabbitmq-plugins enable rabbitmq_tracing

Tracing plugin assumes that there is guest/guest user available (add if missing). Tracing UI can be found under the Admin tab.

Working with JMS messages

JMS support is based on JMS topic selector plugin and JMS client library. More detailed information can be found in https://www.rabbitmq.com/jms-client.html.

Setup

Enable jms plugin for a running RabbitMQ instance.

docker exec -it {containerID} rabbitmq-plugins enable rabbitmq_jms_topic_exchange

Another way to enable the plugin is to modify file /etc/rabbitmq/enabled_plugins and add "rabbitmq_jms_topic_exchange" to the array of enabled plugins before starting the container.

Usage

Add JMS client dependency to the project.

Maven:

<dependency>
    <groupId>com.rabbitmq.jms</groupId>
    <artifactId>rabbitmq-jms</artifactId>
    <version>1.7.0</version>
</dependency>

Gradle:

compile group: 'com.rabbitmq.jms', name: 'rabbitmq-jms', version: '1.7.0'

Sending messages

On the client side (ie. the one sending service requests), the only required change concerns the ConnectionFactory, which needs to setup in the following way:

RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("");
connectionFactory.setPassword("");
connectionFactory.setHost("");
connectionFactory.setPort(12345);

Receiving messages

Simple example service, which uses a temporary queue setup by the client to send its response.

import com.rabbitmq.jms.admin.RMQConnectionFactory;
import com.rabbitmq.jms.admin.RMQDestination;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class Service implements MessageListener {

    private final String messageQueueName = "serviceQueue";
    private final int ackMode = Session.AUTO_ACKNOWLEDGE;

    private Session session = null;
    private MessageConsumer consumer = null;
    private MessageProducer producer = null;

    public RMLService() throws Exception {
        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("");
        connectionFactory.setPassword("");

        Connection c = connectionFactory.createConnection();
        c.setClientID("Service");
        c.start();

        this.session = c.createSession(false, ackMode);
        this.consumer = session.createConsumer(session.createQueue(messageQueueName));
        this.consumer.setMessageListener(this);

        this.producer = session.createProducer(null);        
        log.info("Service created");

    }    

    @Override
    public void onMessage(Message msg) {
        try {
            TextMessage m = (TextMessage)msg;

            TextMessage response =  this.session.createTextMessage();            
            response.setText("response");          
            response.setStringProperty("correlation-id", m.getStringProperty("correlation-id"));
            String replyTo = msg.getStringProperty("reply-to");
            RMQDestination d = new RMQDestination(replyTo, true, true);
            d.setDeclared(true); // needed, because the sender has already created the queue
            this.producer.send(d, response);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

TODO: SpringBoot example

results matching ""

    No results matching ""