Clover coverage report -
Coverage timestamp: So Nov 6 2005 14:19:51 CET
file stats: LOC: 180   Methods: 4
NCLOC: 86   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
JMSBroadcastingListener.java 0% 0% 0% 0%
coverage
 1    /*
 2    * Copyright (c) 2002-2003 by OpenSymphony
 3    * All rights reserved.
 4    */
 5    package com.opensymphony.oscache.plugins.clustersupport;
 6   
 7    import com.opensymphony.oscache.base.Cache;
 8    import com.opensymphony.oscache.base.Config;
 9    import com.opensymphony.oscache.base.FinalizationException;
 10    import com.opensymphony.oscache.base.InitializationException;
 11   
 12    import org.apache.commons.logging.Log;
 13    import org.apache.commons.logging.LogFactory;
 14   
 15    import javax.jms.*;
 16   
 17    import javax.naming.InitialContext;
 18   
 19    /**
 20    * A JMS based clustering implementation. This implementation is independent of the
 21    * JMS provider and uses non-persistent messages on a publish subscribe protocol.
 22    *
 23    * @author <a href="mailto:motoras@linuxmail.org">Romulus Pasca</a>
 24    */
 25    public class JMSBroadcastingListener extends AbstractBroadcastingListener {
 26    private final static Log log = LogFactory.getLog(JMSBroadcastingListener.class);
 27   
 28    /**
 29    *The JMS connection used
 30    */
 31    private Connection connection;
 32   
 33    /**
 34    * Th object used to publish new messages
 35    */
 36    private MessageProducer messagePublisher;
 37   
 38    /**
 39    * The current JMS session
 40    */
 41    private Session publisherSession;
 42   
 43    /**
 44    * The name of this cluster. Used to identify the sender of a message.
 45    */
 46    private String clusterNode;
 47   
 48    /**
 49    * <p>Called by the cache administrator class when a cache is instantiated.</p>
 50    * <p>The JMS broadcasting implementation requires the following configuration
 51    * properties to be specified in <code>oscache.properties</code>:
 52    * <ul>
 53    * <li><b>cache.cluster.jms.topic.factory</b> - The JMS connection factory to use</li>
 54    * <li><b>cache.cluster.jms.topic.name</b> - The JMS topic name</li>
 55    * <li><b>cache.cluster.jms.node.name</b> - The name of this node in the cluster. This
 56    * should be unique for each node.</li>
 57    * Please refer to the clustering documentation for further details on configuring
 58    * the JMS clustered caching.</p>
 59    *
 60    * @param cache the cache instance that this listener is attached to.
 61    *
 62    * @throws com.opensymphony.oscache.base.InitializationException thrown when there was a
 63    * problem initializing the listener. The cache administrator will log this error and
 64    * disable the listener.
 65    */
 66  0 public void initialize(Cache cache, Config config) throws InitializationException {
 67  0 super.initialize(cache, config);
 68   
 69    // Get the name of this node
 70  0 clusterNode = config.getProperty("cache.cluster.jms.node.name");
 71   
 72  0 String topic = config.getProperty("cache.cluster.jms.topic.name");
 73  0 String topicFactory = config.getProperty("cache.cluster.jms.topic.factory");
 74   
 75  0 if (log.isInfoEnabled()) {
 76  0 log.info("Starting JMS clustering (node name=" + clusterNode + ", topic=" + topic + ", topic factory=" + topicFactory + ")");
 77    }
 78   
 79  0 try {
 80    // Make sure you have specified the necessary JNDI properties (usually in
 81    // a jndi.properties resource file, or as system properties)
 82  0 InitialContext jndi = new InitialContext();
 83   
 84    // Look up a JMS connection factory
 85  0 ConnectionFactory connectionFactory = (ConnectionFactory) jndi.lookup(topicFactory);
 86   
 87    // Create a JMS connection
 88  0 connection = connectionFactory.createConnection();
 89   
 90    // Create session objects
 91  0 publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 92   
 93  0 Session subSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 94   
 95    // Look up the JMS topic
 96  0 Topic chatTopic = (Topic) jndi.lookup(topic);
 97   
 98    // Create the publisher and subscriber
 99  0 messagePublisher = publisherSession.createProducer(chatTopic);
 100   
 101  0 MessageConsumer messageConsumer = subSession.createConsumer(chatTopic);
 102   
 103    // Set the message listener
 104  0 messageConsumer.setMessageListener(new MessageListener() {
 105  0 public void onMessage(Message message) {
 106  0 try {
 107    //check the message type
 108  0 ObjectMessage objectMessage = null;
 109   
 110  0 if (!(message instanceof ObjectMessage)) {
 111  0 log.error("Cannot handle message of type (class=" + message.getClass().getName() + "). Notification ignored.");
 112  0 return;
 113    }
 114   
 115  0 objectMessage = (ObjectMessage) message;
 116   
 117    //check the message content
 118  0 if (!(objectMessage.getObject() instanceof ClusterNotification)) {
 119  0 log.error("An unknown cluster notification message received (class=" + objectMessage.getObject().getClass().getName() + "). Notification ignored.");
 120  0 return;
 121    }
 122   
 123  0 if (log.isDebugEnabled()) {
 124  0 log.debug(objectMessage.getObject());
 125    }
 126   
 127    // This prevents the notification sent by this node from being handled by itself
 128  0 if (!objectMessage.getStringProperty("nodeName").equals(clusterNode)) {
 129    //now handle the message
 130  0 ClusterNotification notification = (ClusterNotification) objectMessage.getObject();
 131  0 handleClusterNotification(notification);
 132    }
 133    } catch (JMSException jmsEx) {
 134  0 log.error("Cannot handle cluster Notification", jmsEx);
 135    }
 136    }
 137    });
 138   
 139    // Start the JMS connection; allows messages to be delivered
 140  0 connection.start();
 141    } catch (Exception e) {
 142  0 throw new InitializationException("Initialization of the JMSBroadcastingListener failed: " + e);
 143    }
 144    }
 145   
 146    /**
 147    * Called by the cache administrator class when a cache is destroyed.
 148    *
 149    * @throws com.opensymphony.oscache.base.FinalizationException thrown when there was a problem finalizing the
 150    * listener. The cache administrator will catch and log this error.
 151    */
 152  0 public void finialize() throws FinalizationException {
 153  0 try {
 154  0 if (log.isInfoEnabled()) {
 155  0 log.info("Shutting down JMS clustering...");
 156    }
 157   
 158  0 connection.close();
 159   
 160  0 if (log.isInfoEnabled()) {
 161  0 log.info("JMS clustering shutdown complete.");
 162    }
 163    } catch (JMSException e) {
 164  0 log.warn("A problem was encountered when closing the JMS connection", e);
 165    }
 166    }
 167   
 168  0 protected void sendNotification(ClusterNotification message) {
 169  0 try {
 170  0 ObjectMessage objectMessage = publisherSession.createObjectMessage();
 171  0 objectMessage.setObject(message);
 172   
 173    //sign the message, with the name of this node
 174  0 objectMessage.setStringProperty("nodeName", clusterNode);
 175  0 messagePublisher.send(objectMessage);
 176    } catch (JMSException e) {
 177  0 log.error("Cannot send notification " + message, e);
 178    }
 179    }
 180    }