- Publish-Subscribe Messaging
- Point-To-Point Messaging
- Request-Reply Messaging
-
Required Software
- Developemnt
- Testing
- Java test class used for testing.
- Create a Simple spring project name "spring-jms-example". Following image will guide us. Add the required libraries in classpath. - spring-aop-4.1.3.RELEASE.jar - spring-beans-4.1.3.RELEASE.jar - spring-context-4.1.3.RELEASE.jar - spring-context-support-4.1.3.RELEASE.jar - spring-core-4.1.3.RELEASE.jar - spring-expression-4.1.3.RELEASE.jar - spring-jms-4.1.3.RELEASE.jar - activemq-broker-5.10.0.jar - activemq-client-5.10.0.jar - activemq-jaas-5.10.0.jar - geronimo-jms_1.1_spec-1.1.1.jar - hawtbuf-1.10.jar - activemq-spring-5.10.0.jar - geronimo-j2ee-management_1.1_spec-1.0.1.jar - log4j-1.2.14.jar - slf4j-jcl-1.7.5.jar
- Create following packages. - example.spring.simple.jms.producer - example.spring.simple.jms.consumer - example.spring.dao
- Create simple pojo class Member in example.spring.dao package and add following snippet.
- Create SimpleMessageProducer class in example.spring.simple.jms.producer and add following snippet.
- Create class ProducerApp in example.spring.simple.jms.producer and add following snippet.
- Create spring bean configuration simple-producer-jms-context.xml in resource folder. This bean configuration will be used for jms message producer.
- Create class SimpleMessageReceiver in package example.spring.simple.jms.consumer and add following snippet.
- Create class customize AttributesMapper name SingleAttributesMapper and add following snippet.
- Create spring bean configuration simple-consumer-jms-context.xml in resource folder. This bean configuration will be used for jms message consumer.
- Create log4j.xml in resource folder and add following snippet.
/** * * Copyright © Kaustuv Maji , 2014 * Repos - https://github.com/kaustuvmaji * Blog - http://kaustuvmaji.blogspot.in * */ package example.spring.dao; import java.io.Serializable; /** * @author KMaji * */ public class Member implements Serializable { /** * */ private static final long serialVersionUID = 8944832256293484687L; private String name; private Integer id; private Integer age; /** * Default constructor */ public Member() { } /** * Constructor with argument * @param name * @param age */ public Member(String name, Integer age) { super(); this.name = name; this.age = age; } /** * @return the name */ public final String getName() { return name; } /** * @param name the name to set */ public final void setName(String name) { this.name = name; } /** * @return the id */ public final Integer getId() { return id; } /** * @param id the id to set */ public final void setId(Integer id) { this.id = id; } /** * @return the age */ public final Integer getAge() { return age; } /** * @param age the age to set */ public final void setAge(Integer age) { this.age = age; } /* (non-Javadoc) * @see java.lang.Object#toString() */ @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append("Member ["); if (name != null) builder.append("name=").append(name).append(", "); if (id != null) builder.append("id=").append(id).append(", "); if (age != null) builder.append("age=").append(age); builder.append("]"); return builder.toString(); } }
/** * * Copyright © Kaustuv Maji , 2014 * Repos - https://github.com/kaustuvmaji * Blog - http://kaustuvmaji.blogspot.in * */ package example.spring.simple.jms.producer; import java.util.Date; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.Session; import org.apache.log4j.Logger; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import example.spring.dao.Member; /** * Example spring jms producer. * * @author KMaji * */ public class SimpleMessageProducer { private static final Logger LOG = Logger.getLogger(SimpleMessageProducer.class.getName()); protected JmsTemplate jmsTemplate; /** * Getter Method of JMSTemplate * * @return the jmsTemplate */ public final JmsTemplate getJmsTemplate() { return jmsTemplate; } /** * Setter Method of JMSTemplate * * @param jmsTemplate * the jmsTemplate to set */ public final void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } protected final void sendMessages(String sendType, int i) { try { if ("textMessage".equalsIgnoreCase(sendType)) { jmsSendTextMessages(i); } else if ("objectMessage".equalsIgnoreCase(sendType)) { jmsSendObjectMessages(i); } else if ("ByteMessage".equalsIgnoreCase(sendType)) { jmsSendBytesMessages(i); } else if ("MapMessage".equalsIgnoreCase(sendType)) { jmsSendMapMessages(i); } } catch (JMSException e) { LOG.error("unable to process", e); } } /** * This method is used to produce TextMessages * * @throws JMSException */ protected void jmsSendTextMessages(final int i) throws JMSException { final StringBuilder buffer = new StringBuilder(); buffer.append("jmsSendTextMessages '").append(i).append("' sent at: ").append(new Date()); final String payload = buffer.toString(); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message message = session.createTextMessage(payload); message.setIntProperty("messageCount", i); LOG.info("Sending message number ->" + i + " message [" + message + "]"); return message; } }); } /** * This method is used to produce ObjectMessages * * @throws JMSException */ protected void jmsSendObjectMessages(final int i) { jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Member member = new Member(); member.setId(i); member.setName("jmsSendObjectMessage_" + i); member.setAge(10 + i); Message message = session.createObjectMessage(member); LOG.info("Sending message number ->" + i + " message [" + message + "]"); return message; } }); } /** * This method is used to produce BytesMessages. * * @throws JMSException */ public void jmsSendBytesMessages(final int i) throws JMSException { final StringBuilder buffer = new StringBuilder(); buffer.append("Byte message [").append(i).append("] sent at -> ").append(new Date()); final String payload = buffer.toString(); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { BytesMessage message = session.createBytesMessage(); message.writeUTF(payload); message.setIntProperty("messageCount", i); LOG.info("Sending message number ->" + i + " message [" + message + "]"); return message; } }); } /** * This method is used to produce MapMessages. * * @throws JMSException */ public void jmsSendMapMessages(final int i) throws JMSException { final StringBuilder buffer = new StringBuilder(); buffer.append("Map message [").append(i).append("] sent at -> ").append(new Date()); final String payload = buffer.toString(); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("payload", payload); LOG.info("Sending message number ->" + i + " message [" + message + "]"); return message; } }); } /** * This method is used to produce StreamMessages. * * @throws JMSException */ public void jmsSendStreamMessages(final int i) throws JMSException { final StringBuilder buffer = new StringBuilder(); buffer.append("Stream message [").append(i).append("] sent at -> ").append(new Date()); final String payload = buffer.toString(); jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { javax.jms.StreamMessage message = session.createStreamMessage(); message.writeObject(payload); LOG.info("Sending message number ->" + i + " message [" + message + "]"); return message; } }); } }Following table explains the different types of JMS messages.
Method name | Message Type | Body Contains |
---|---|---|
jmsSendTextMessages() | TextMessage | java.lang.String object |
jmsSendMapMessages() | MapMessage | A set of name-value pairs <String,Object> |
jmsSendBytesMessages() | BytesMessage | A stream of uninterpreted bytes. |
jmsSendStreamMessages | StreamMessage | A stream of primitive values, filled and read sequentially. |
jmsSendObjectMessages() | ObjectMessage | A Serializable object. |
/** * * Copyright © Kaustuv Maji , 2014 * Repos - https://github.com/kaustuvmaji * Blog - http://kaustuvmaji.blogspot.in * */ package example.spring.simple.jms.producer; import org.apache.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @author KMaji * */ public class ProducerApp implements Runnable { private static final Logger LOG = Logger .getLogger(ProducerApp.class); /** * @param args */ public static void main(String[] args) { new ProducerApp().run(); } @Override public void run() { String sendType = "TextMessage"; @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( "simple-producer-jms-context.xml"); SimpleMessageProducer producer = (SimpleMessageProducer) context .getBean("messageProducer"); LOG.info("Using the sendType "+sendType); int i = 0 ; while (true) { producer.sendMessages(sendType, i++); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
<?xml version="1.0" encoding="UTF-8"?> <!-- Copyright © Kaustuv Maji , 2014 Repos - https://github.com/kaustuvmaji Blog - http://kaustuvmaji.blogspot.in --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" p:userName="admin" p:password="admin" p:maxThreadPoolSize="15"/> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="myQ" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" p:defaultDestination-ref="destination"> </bean> <bean class="example.spring.simple.jms.producer.SimpleMessageProducer" id="messageProducer"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> </beans>
/** * * Copyright © Kaustuv Maji , 2014 * Repos - https://github.com/kaustuvmaji * Blog - http://kaustuvmaji.blogspot.in * */ package example.spring.simple.jms.consumer; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.log4j.Logger; import org.springframework.jms.core.JmsTemplate; import example.spring.dao.Member; /** * * @author KMaji * */ public class SimpleMessageReceiver { private static final Logger LOG = Logger.getLogger(SimpleMessageReceiver.class); protected JmsTemplate jmsTemplate; public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void receive() throws JMSException { Message message = jmsTemplate.receive(); if (message == null) { // do not work for null message ignore } if (message instanceof TextMessage) { LOG.info("Received a JMS message: -> [" + message + "]"); } else if (message instanceof ObjectMessage) { try { Member obj = (Member) ((ObjectMessage) message).getObject(); LOG.info("Received a JMS message: ObjectMessage -> [" + obj.toString() + "]"); } catch (JMSException e) { LOG.error("unable to process message", e); } } else if (message instanceof MapMessage) { MapMessage msg = (MapMessage) message; LOG.info("Received a JMS message: MapMessage -> [" + msg.getString("payload") + "]"); } else if (message instanceof BytesMessage) { BytesMessage bInMsg = (BytesMessage) message; int msgLength = (int) bInMsg.getBodyLength(); byte[] msg = new byte[msgLength]; bInMsg.readBytes(msg); LOG.info("Received a JMS message: BytesMessage -> [" + msg + "]"); }else if (message instanceof StreamMessage) { StreamMessage msg = (StreamMessage) message; LOG.info("Received a JMS message: MapMessage -> [" + msg.readObject() + "]"); }else { // only 5 type of message will come so relax } } }
/** * * Copyright © Kaustuv Maji , 2014 * Repos - https://github.com/kaustuvmaji * Blog - http://kaustuvmaji.blogspot.in * */ package example.spring.simple.jms.consumer; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Spring JMS Consumer example. * * @author KMaji * */ public class ConsumerApp implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(ConsumerApp.class); /** * @param args */ public static void main(String[] args) { new ConsumerApp().run(); LOG.debug("Consumer started ..."); } @Override public void run() { @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext("simple-consumer-jms-context.xml"); SimpleMessageReceiver receiver = (SimpleMessageReceiver) context.getBean("messageReceiver"); while (true) { try { receiver.receive(); Thread.sleep(5000); } catch (InterruptedException | JMSException e) { LOG.error("unable to process", e); } } } }
<?xml version="1.0" encoding="UTF-8"?> <!-- Copyright © Kaustuv Maji , 2014 Repos - https://github.com/kaustuvmaji Blog - http://kaustuvmaji.blogspot.in --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" p:userName="admin" p:password="admin" p:maxThreadPoolSize="15" /> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="myQ" /> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="connectionFactory" p:defaultDestination-ref="destination" p:receiveTimeout="1000"/> <bean id="messageReceiver" class="example.spring.simple.jms.consumer.SimpleMessageReceiver"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> </beans>
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN" "log4j.dtd"> <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="true"> <!-- Appenders --> <appender name="console" class="org.apache.log4j.ConsoleAppender"> <param name="Target" value="System.out" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{ISO8601} %-5p %-1X{TID} %t [%c] %m%n" /> </layout> </appender> <!-- Appenders --> <appender name="file" class="org.apache.log4j.RollingFileAppender"> <param name="Threshold" value="ALL" /> <param name="File" value="diag.log" /> <param name="Append" value="true" /> <param name="MaxFileSize" value="500000KB" /> <param name="MaxBackupIndex" value="10" /> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="%d{ISO8601} %-5p %-1X{TID} %t [%c] %m%n" /> </layout> </appender> <!-- Root Logger --> <root> <priority value="ALL" /> <appender-ref ref="console" /> <appender-ref ref="file" /> </root> </log4j:configuration>
- Run ProducerApp class in package example.spring.simple.jms.producer. This class will continuously push message to "myQ". Keep in mind until unless we stop this class it will continuously push message to queue. We can check number of messages in queue by activeMQ web app. check following image.
- Run ConsumerApp class in package example.spring.simple.jms.consumer. This class will continuously pull message from "myQ". Keep in mind until unless we stop this class it will continuously push message from queue. We can check status of messages processed in queue by activeMQ web app. check following image.
Post Development testing
Source Code
- References:
European Union laws require you to give European Union visitors information about cookies used on your blog. In many cases, these laws also require you to obtain consent.
As a courtesy, we have added a notice on your blog to explain Google's use of certain Blogger and Google cookies, including use of Google Analytics and AdSense cookies.