Wednesday, 24 December 2014

Simple example of Spring JMS template

This Blog instruction will help us to access JMS queues using Spring framework JMS api.
Java Messaging Service provide communication between applications systems like peer-to-peer facility.Each client connects to a messaging agent that provides facilities for creating, sending, receiving, and reading messages. JMS enables distributed communication that is loosely coupled. There are three message exchange model.
  • Publish-Subscribe Messaging

  • Point-To-Point Messaging

  • Request-Reply Messaging
This blog instructions will help us to implement simple example of Point-To-Point Messaging.
Please check JMS specification for detailed decription.
Please check spring-jms-<version>.RELEASE speciication for detailed description.

  • Pre-requsite
  • Instructions assumes that jdk, ActiveMQ is installed without any error.
    Following image will guide us to start ActiveMQ and create one Queue name "myQ".

  • Implemenation instructions
    1. Create a Simple spring project name "spring-jms-example". Following image will guide us. Add the required libraries in classpath.
    2. - spring-aop-4.1.3.RELEASE.jar
      - spring-beans-4.1.3.RELEASE.jar
      - spring-context-4.1.3.RELEASE.jar
      - spring-context-support-4.1.3.RELEASE.jar
      - spring-core-4.1.3.RELEASE.jar
      - spring-expression-4.1.3.RELEASE.jar
      - spring-jms-4.1.3.RELEASE.jar
      - activemq-broker-5.10.0.jar
      - activemq-client-5.10.0.jar
      - activemq-jaas-5.10.0.jar
      - geronimo-jms_1.1_spec-1.1.1.jar
      - hawtbuf-1.10.jar
      - activemq-spring-5.10.0.jar
      - geronimo-j2ee-management_1.1_spec-1.0.1.jar
      - log4j-1.2.14.jar
      - slf4j-jcl-1.7.5.jar

    3. Create following packages.
      - example.spring.simple.jms.producer
      - example.spring.simple.jms.consumer
      - example.spring.dao
    4. Create simple pojo class Member in example.spring.dao package and add following snippet.
    5.   /**
        *
        * Copyright © Kaustuv Maji , 2014
        * Repos - https://github.com/kaustuvmaji
        * Blog -  http://kaustuvmaji.blogspot.in
        *
        */
        package example.spring.dao;
      
        import java.io.Serializable;
      
        /**
         * @author KMaji
         *
         */
        public class Member implements Serializable {
      
         /**
          *
          */
         private static final long serialVersionUID = 8944832256293484687L;
      
      
         private String name;
         private Integer id;
         private Integer age;
      
         /**
          * Default constructor
          */
         public Member() {
      
         }
      
         /**
          * Constructor with argument
          * @param name
          * @param age
          */
         public Member(String name, Integer age) {
          super();
          this.name = name;
          this.age = age;
         }
      
         /**
          * @return the name
          */
         public final String getName() {
          return name;
         }
         /**
          * @param name the name to set
          */
         public final void setName(String name) {
          this.name = name;
         }
         /**
          * @return the id
          */
         public final Integer getId() {
          return id;
         }
         /**
          * @param id the id to set
          */
         public final void setId(Integer id) {
          this.id = id;
         }
         /**
          * @return the age
          */
         public final Integer getAge() {
          return age;
         }
         /**
          * @param age the age to set
          */
         public final void setAge(Integer age) {
          this.age = age;
         }
      
         /* (non-Javadoc)
          * @see java.lang.Object#toString()
          */
         @Override
         public String toString() {
          StringBuilder builder = new StringBuilder();
          builder.append("Member [");
          if (name != null)
           builder.append("name=").append(name).append(", ");
          if (id != null)
           builder.append("id=").append(id).append(", ");
          if (age != null)
           builder.append("age=").append(age);
          builder.append("]");
          return builder.toString();
         }
        }
    6. Create SimpleMessageProducer class in example.spring.simple.jms.producer and add following snippet.
    7.   /**
         *
         * Copyright © Kaustuv Maji , 2014
         * Repos - https://github.com/kaustuvmaji
         * Blog -  http://kaustuvmaji.blogspot.in
         *
         */
        package example.spring.simple.jms.producer;
      
        import java.util.Date;
      
        import javax.jms.BytesMessage;
        import javax.jms.JMSException;
        import javax.jms.MapMessage;
        import javax.jms.Message;
        import javax.jms.Session;
      
        import org.apache.log4j.Logger;
        import org.springframework.jms.core.JmsTemplate;
        import org.springframework.jms.core.MessageCreator;
      
        import example.spring.dao.Member;
      
        /**
         * Example spring jms producer.
         *
         * @author KMaji
         *
         */
        public class SimpleMessageProducer {
      
         private static final Logger LOG = Logger.getLogger(SimpleMessageProducer.class.getName());
      
         protected JmsTemplate jmsTemplate;
      
         /**
          * Getter Method of JMSTemplate
          *
          * @return the jmsTemplate
          */
         public final JmsTemplate getJmsTemplate() {
          return jmsTemplate;
         }
      
         /**
          * Setter Method of JMSTemplate
          *
          * @param jmsTemplate
          *            the jmsTemplate to set
          */
         public final void setJmsTemplate(JmsTemplate jmsTemplate) {
          this.jmsTemplate = jmsTemplate;
         }
      
         protected final void sendMessages(String sendType, int i) {
          try {
           if ("textMessage".equalsIgnoreCase(sendType)) {
            jmsSendTextMessages(i);
           } else if ("objectMessage".equalsIgnoreCase(sendType)) {
            jmsSendObjectMessages(i);
           } else if ("ByteMessage".equalsIgnoreCase(sendType)) {
            jmsSendBytesMessages(i);
           } else if ("MapMessage".equalsIgnoreCase(sendType)) {
            jmsSendMapMessages(i);
           }
          } catch (JMSException e) {
           LOG.error("unable to process", e);
          }
         }
      
         /**
          * This method is used to produce TextMessages
          *
          * @throws JMSException
          */
         protected void jmsSendTextMessages(final int i) throws JMSException {
      
          final StringBuilder buffer = new StringBuilder();
      
          buffer.append("jmsSendTextMessages '").append(i).append("' sent at: ").append(new Date());
      
          final String payload = buffer.toString();
          jmsTemplate.send(new MessageCreator() {
           public Message createMessage(Session session) throws JMSException {
            Message message = session.createTextMessage(payload);
            message.setIntProperty("messageCount", i);
            LOG.info("Sending message number ->" + i + " message [" + message + "]");
            return message;
           }
          });
         }
      
         /**
          * This method is used to produce ObjectMessages
          *
          * @throws JMSException
          */
         protected void jmsSendObjectMessages(final int i) {
      
          jmsTemplate.send(new MessageCreator() {
      
           @Override
           public Message createMessage(Session session) throws JMSException {
            Member member = new Member();
            member.setId(i);
            member.setName("jmsSendObjectMessage_" + i);
            member.setAge(10 + i);
            Message message = session.createObjectMessage(member);
            LOG.info("Sending message number ->" + i + " message [" + message + "]");
            return message;
           }
          });
         }
      
         /**
          * This method is used to produce BytesMessages.
          *
          * @throws JMSException
          */
         public void jmsSendBytesMessages(final int i) throws JMSException {
          final StringBuilder buffer = new StringBuilder();
      
          buffer.append("Byte message [").append(i).append("] sent at -> ").append(new Date());
      
          final String payload = buffer.toString();
      
          jmsTemplate.send(new MessageCreator() {
           public Message createMessage(Session session) throws JMSException {
            BytesMessage message = session.createBytesMessage();
            message.writeUTF(payload);
            message.setIntProperty("messageCount", i);
            LOG.info("Sending message number ->" + i + " message [" + message + "]");
            return message;
           }
          });
         }
      
         /**
          * This method is used to produce MapMessages.
          *
          * @throws JMSException
          */
         public void jmsSendMapMessages(final int i) throws JMSException {
          final StringBuilder buffer = new StringBuilder();
      
          buffer.append("Map message [").append(i).append("] sent at -> ").append(new Date());
      
          final String payload = buffer.toString();
      
          jmsTemplate.send(new MessageCreator() {
           public Message createMessage(Session session) throws JMSException {
            MapMessage message = session.createMapMessage();
            message.setString("payload", payload);
            LOG.info("Sending message number ->" + i + " message [" + message + "]");
            return message;
           }
          });
         }
         
         /**
          * This method is used to produce StreamMessages.
          *
          * @throws JMSException
          */
         public void jmsSendStreamMessages(final int i) throws JMSException {
          final StringBuilder buffer = new StringBuilder();
      
          buffer.append("Stream message [").append(i).append("] sent at -> ").append(new Date());
      
          final String payload = buffer.toString();
      
          jmsTemplate.send(new MessageCreator() {
           public Message createMessage(Session session) throws JMSException {
            javax.jms.StreamMessage message = session.createStreamMessage();
            message.writeObject(payload);
            LOG.info("Sending message number ->" + i + " message [" + message + "]");
            return message;
           }
          });
         }
        }

      Following table explains the different types of JMS messages.
      Method name Message Type Body Contains
      jmsSendTextMessages() TextMessage java.lang.String object
      jmsSendMapMessages() MapMessage A set of name-value pairs <String,Object>
      jmsSendBytesMessages() BytesMessage A stream of uninterpreted bytes.
      jmsSendStreamMessages StreamMessage A stream of primitive values, filled and read sequentially.
      jmsSendObjectMessages() ObjectMessage A Serializable object.

    8. Create class ProducerApp in example.spring.simple.jms.producer and add following snippet.
    9.   /**
         *
         * Copyright © Kaustuv Maji , 2014
         * Repos - https://github.com/kaustuvmaji
         * Blog -  http://kaustuvmaji.blogspot.in
         *
         */
        package example.spring.simple.jms.producer;
      
      
        import org.apache.log4j.Logger;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.support.ClassPathXmlApplicationContext;
      
        /**
         * @author KMaji
         *
         */
        public class ProducerApp implements Runnable {
      
         private static final Logger LOG = Logger
           .getLogger(ProducerApp.class);
      
         /**
          * @param args
          */
         public static void main(String[] args) {
          new ProducerApp().run();
         }
      
         @Override
         public void run() {
          String sendType = "TextMessage";
          @SuppressWarnings("resource")
          ApplicationContext context = new ClassPathXmlApplicationContext(
            "simple-producer-jms-context.xml");
      
          SimpleMessageProducer producer = (SimpleMessageProducer) context
            .getBean("messageProducer");
      
          LOG.info("Using the sendType "+sendType);
          int i = 0 ;
      
          while (true) {
      
           producer.sendMessages(sendType, i++);
           try {
      
            Thread.sleep(1000);
           } catch (InterruptedException e) {
            e.printStackTrace();
           }
          }
         }
        }
    10. Create spring bean configuration simple-producer-jms-context.xml in resource folder. This bean configuration will be used for jms message producer.
    11.   <?xml version="1.0" encoding="UTF-8"?>
        <!-- Copyright © Kaustuv Maji , 2014
          Repos - https://github.com/kaustuvmaji
          Blog - http://kaustuvmaji.blogspot.in
        -->
        <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:aop="http://www.springframework.org/schema/aop"
         xmlns:context="http://www.springframework.org/schema/context"
         xmlns:jee="http://www.springframework.org/schema/jee"
         xmlns:jms="http://www.springframework.org/schema/jms"
         xmlns:p="http://www.springframework.org/schema/p"
         xmlns:util="http://www.springframework.org/schema/util"
         xsi:schemaLocation="http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/aop
              http://www.springframework.org/schema/aop/spring-aop.xsd
              http://www.springframework.org/schema/context
              http://www.springframework.org/schema/context/spring-context.xsd
              http://www.springframework.org/schema/jee
              http://www.springframework.org/schema/jee/spring-jee.xsd
              http://www.springframework.org/schema/jms
              http://www.springframework.org/schema/jms/spring-jms.xsd
              http://www.springframework.org/schema/util
              http://www.springframework.org/schema/util/spring-util.xsd">
      
         <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
         p:brokerURL="tcp://localhost:61616"
         p:userName="admin" p:password="admin"
         p:maxThreadPoolSize="15"/>
      
         <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
          <constructor-arg value="myQ" />
         </bean>
      
         <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="connectionFactory"
          p:defaultDestination-ref="destination">
         </bean>
      
         <bean class="example.spring.simple.jms.producer.SimpleMessageProducer" id="messageProducer">
          <property name="jmsTemplate" ref="jmsTemplate" />
         </bean>
      
        </beans>

    12. Create class SimpleMessageReceiver in package example.spring.simple.jms.consumer and add following snippet.
    13.   /**
         *
         * Copyright © Kaustuv Maji , 2014
         * Repos - https://github.com/kaustuvmaji
         * Blog -  http://kaustuvmaji.blogspot.in
         *
         */
        package example.spring.simple.jms.consumer;
      
        import javax.jms.BytesMessage;
        import javax.jms.JMSException;
        import javax.jms.MapMessage;
        import javax.jms.Message;
        import javax.jms.ObjectMessage;
        import javax.jms.StreamMessage;
        import javax.jms.TextMessage;
      
        import org.apache.log4j.Logger;
        import org.springframework.jms.core.JmsTemplate;
      
        import example.spring.dao.Member;
      
        /**
         *
         * @author KMaji
         *
         */
        public class SimpleMessageReceiver {
      
         private static final Logger LOG = Logger.getLogger(SimpleMessageReceiver.class);
      
         protected JmsTemplate jmsTemplate;
      
         public JmsTemplate getJmsTemplate() {
          return jmsTemplate;
         }
      
         public void setJmsTemplate(JmsTemplate jmsTemplate) {
          this.jmsTemplate = jmsTemplate;
         }
      
         public void receive() throws JMSException {
      
          Message message = jmsTemplate.receive();
      
          if (message == null) {
           // do not work for null message ignore
          }
      
          if (message instanceof TextMessage) {
           LOG.info("Received a JMS message:  -> [" + message + "]");
          } else if (message instanceof ObjectMessage) {
           try {
            Member obj = (Member) ((ObjectMessage) message).getObject();
            LOG.info("Received a JMS message: ObjectMessage -> [" + obj.toString() + "]");
           } catch (JMSException e) {
            LOG.error("unable to process message", e);
           }
          } else if (message instanceof MapMessage) {
           MapMessage msg = (MapMessage) message;
           LOG.info("Received a JMS message: MapMessage -> [" + msg.getString("payload") + "]");
          } else if (message instanceof BytesMessage) {
           BytesMessage bInMsg = (BytesMessage) message;
           int msgLength = (int) bInMsg.getBodyLength();
           byte[] msg = new byte[msgLength];
           bInMsg.readBytes(msg);
           LOG.info("Received a JMS message: BytesMessage -> [" + msg + "]");
          }else if (message instanceof StreamMessage) {
           StreamMessage msg = (StreamMessage) message;
           LOG.info("Received a JMS message: MapMessage -> [" + msg.readObject() + "]");
          }else {
           // only 5 type of message will come so relax
          }
         }
        }
    14. Create class customize AttributesMapper name SingleAttributesMapper and add following snippet.
    15.    /**
         *
         * Copyright © Kaustuv Maji , 2014
         * Repos - https://github.com/kaustuvmaji
         * Blog -  http://kaustuvmaji.blogspot.in
         *
         */
        package example.spring.simple.jms.consumer;
      
        import javax.jms.JMSException;
      
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        import org.springframework.context.ApplicationContext;
        import org.springframework.context.support.ClassPathXmlApplicationContext;
      
        /**
         * Spring JMS Consumer example.
         *
         * @author KMaji
         *
         */
        public class ConsumerApp implements Runnable {
      
         private static final Logger LOG = LoggerFactory.getLogger(ConsumerApp.class);
      
         /**
          * @param args
          */
         public static void main(String[] args) {
          new ConsumerApp().run();
          LOG.debug("Consumer started ...");
         }
      
         @Override
         public void run() {
          @SuppressWarnings("resource")
          ApplicationContext context = new ClassPathXmlApplicationContext("simple-consumer-jms-context.xml");
          SimpleMessageReceiver receiver = (SimpleMessageReceiver) context.getBean("messageReceiver");
          while (true) {
           try {
            receiver.receive();
            Thread.sleep(5000);
           } catch (InterruptedException | JMSException e) {
            LOG.error("unable to process", e);
           }
          }
         }
        }
    16. Create spring bean configuration simple-consumer-jms-context.xml in resource folder. This bean configuration will be used for jms message consumer.
    17.   <?xml version="1.0" encoding="UTF-8"?>
        <!-- Copyright © Kaustuv Maji , 2014
          Repos - https://github.com/kaustuvmaji
          Blog - http://kaustuvmaji.blogspot.in
        -->
        <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:aop="http://www.springframework.org/schema/aop"
         xmlns:context="http://www.springframework.org/schema/context"
         xmlns:jee="http://www.springframework.org/schema/jee"
         xmlns:jms="http://www.springframework.org/schema/jms"
         xmlns:p="http://www.springframework.org/schema/p"
         xmlns:util="http://www.springframework.org/schema/util"
         xsi:schemaLocation="http://www.springframework.org/schema/beans
              http://www.springframework.org/schema/beans/spring-beans.xsd
              http://www.springframework.org/schema/aop
              http://www.springframework.org/schema/aop/spring-aop.xsd
              http://www.springframework.org/schema/context
              http://www.springframework.org/schema/context/spring-context.xsd
              http://www.springframework.org/schema/jee
              http://www.springframework.org/schema/jee/spring-jee.xsd
              http://www.springframework.org/schema/jms
              http://www.springframework.org/schema/jms/spring-jms.xsd
              http://www.springframework.org/schema/util
              http://www.springframework.org/schema/util/spring-util.xsd">
      
         <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
            p:brokerURL="tcp://localhost:61616"
            p:userName="admin" p:password="admin"
            p:maxThreadPoolSize="15" />
      
         <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
          <constructor-arg value="myQ" />
         </bean>
      
         <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
          p:connectionFactory-ref="connectionFactory"
          p:defaultDestination-ref="destination"
          p:receiveTimeout="1000"/>
      
      
         <bean id="messageReceiver" class="example.spring.simple.jms.consumer.SimpleMessageReceiver">
          <property name="jmsTemplate" ref="jmsTemplate" />
         </bean>
      
        </beans>

    18. Create log4j.xml in resource folder and add following snippet.
    19.      <?xml version="1.0" encoding="UTF-8"?>
           <!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" "log4j.dtd">
           <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="true">
      
            <!-- Appenders -->
            <appender name="console" class="org.apache.log4j.ConsoleAppender">
             <param name="Target" value="System.out" />
             <layout class="org.apache.log4j.PatternLayout">
              <param name="ConversionPattern" value="%d{ISO8601} %-5p %-1X{TID} %t [%c] %m%n" />
             </layout>
            </appender>
      
             <!-- Appenders -->
            <appender name="file" class="org.apache.log4j.RollingFileAppender">
             <param name="Threshold" value="ALL" />
             <param name="File" value="diag.log" />
             <param name="Append" value="true" />
             <param name="MaxFileSize" value="500000KB" />
             <param name="MaxBackupIndex" value="10" />
             <layout class="org.apache.log4j.PatternLayout">
              <param name="ConversionPattern" value="%d{ISO8601} %-5p %-1X{TID} %t [%c] %m%n" />
             </layout>
            </appender>
      
            <!-- Root Logger -->
            <root>
             <priority value="ALL" />
             <appender-ref ref="console" />
             <appender-ref ref="file" />
            </root>
      
           </log4j:configuration>

        Post Development testing


    1. Run ProducerApp class in package example.spring.simple.jms.producer. This class will continuously push message to "myQ". Keep in mind until unless we stop this class it will continuously push message to queue. We can check number of messages in queue by activeMQ web app. check following image.
    2. Run ConsumerApp class in package example.spring.simple.jms.consumer. This class will continuously pull message from "myQ". Keep in mind until unless we stop this class it will continuously push message from queue. We can check status of messages processed in queue by activeMQ web app. check following image.
    Click here to download source code of above example

    Source Code


    European Union laws require you to give European Union visitors information about cookies used on your blog. In many cases, these laws also require you to obtain consent.

    As a courtesy, we have added a notice on your blog to explain Google's use of certain Blogger and Google cookies, including use of Google Analytics and AdSense cookies.

    No comments:

    Post a Comment