TopicSubscriberAdaptor.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import org.apache.qpid.AMQException;
024 
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageListener;
028 import javax.jms.Topic;
029 import javax.jms.TopicSubscriber;
030 
031 /**
032  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
033  *
034  */
035 class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
036 {
037     private final Topic _topic;
038     private final C _consumer;
039     private final boolean _noLocal;
040 
041     TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
042     {
043         _topic = topic;
044         _consumer = consumer;
045         _noLocal = noLocal;
046     }
047     
048     TopicSubscriberAdaptor(Topic topic, C consumer)
049     {
050         this(topic, consumer, consumer.isNoLocal());
051     }
052     
053     public Topic getTopic() throws JMSException
054     {
055       checkPreConditions();
056         return _topic;
057     }
058 
059     public boolean getNoLocal() throws JMSException
060     {
061       checkPreConditions();
062         return _noLocal;
063     }
064 
065     public String getMessageSelector() throws JMSException
066     {
067       checkPreConditions();
068         return _consumer.getMessageSelector();
069     }
070 
071     public MessageListener getMessageListener() throws JMSException
072     {
073       checkPreConditions();
074         return _consumer.getMessageListener();
075     }
076 
077     public void setMessageListener(MessageListener messageListenerthrows JMSException
078     {
079       checkPreConditions();
080         _consumer.setMessageListener(messageListener);
081     }
082 
083     public Message receive() throws JMSException
084     {
085       checkPreConditions();
086         return _consumer.receive();
087     }
088 
089     public Message receive(long lthrows JMSException
090     {
091         return _consumer.receive(l);
092     }
093 
094     public Message receiveNoWait() throws JMSException
095     {
096       checkPreConditions();
097         return _consumer.receiveNoWait();
098     }
099 
100     public void close() throws JMSException
101     {
102         _consumer.close();
103     }
104     
105     private void checkPreConditions() throws javax.jms.IllegalStateException{
106       C msgConsumer = _consumer;
107       
108       if (msgConsumer.isClosed() ){
109       throw new javax.jms.IllegalStateException("Consumer is closed");
110     }
111     
112     if(_topic == null){
113       throw new UnsupportedOperationException("Topic is null");
114     }
115     
116     AMQSession session = msgConsumer.getSession();
117     
118     if(session == null || session.isClosed()){
119       throw new javax.jms.IllegalStateException("Invalid Session");
120     }
121   }
122 
123     C getMessageConsumer()
124     {
125         return _consumer;
126     }
127 
128     public void addBindingKey(Topic topic, String bindingKeythrows AMQException
129     {
130         _consumer.addBindingKey((AMQDestinationtopic, bindingKey);
131     }
132 }