001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import org.apache.qpid.AMQException;
024
025 import javax.jms.JMSException;
026 import javax.jms.Message;
027 import javax.jms.MessageListener;
028 import javax.jms.Topic;
029 import javax.jms.TopicSubscriber;
030
031 /**
032 * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
033 *
034 */
035 class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
036 {
037 private final Topic _topic;
038 private final C _consumer;
039 private final boolean _noLocal;
040
041 TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
042 {
043 _topic = topic;
044 _consumer = consumer;
045 _noLocal = noLocal;
046 }
047
048 TopicSubscriberAdaptor(Topic topic, C consumer)
049 {
050 this(topic, consumer, consumer.isNoLocal());
051 }
052
053 public Topic getTopic() throws JMSException
054 {
055 checkPreConditions();
056 return _topic;
057 }
058
059 public boolean getNoLocal() throws JMSException
060 {
061 checkPreConditions();
062 return _noLocal;
063 }
064
065 public String getMessageSelector() throws JMSException
066 {
067 checkPreConditions();
068 return _consumer.getMessageSelector();
069 }
070
071 public MessageListener getMessageListener() throws JMSException
072 {
073 checkPreConditions();
074 return _consumer.getMessageListener();
075 }
076
077 public void setMessageListener(MessageListener messageListener) throws JMSException
078 {
079 checkPreConditions();
080 _consumer.setMessageListener(messageListener);
081 }
082
083 public Message receive() throws JMSException
084 {
085 checkPreConditions();
086 return _consumer.receive();
087 }
088
089 public Message receive(long l) throws JMSException
090 {
091 return _consumer.receive(l);
092 }
093
094 public Message receiveNoWait() throws JMSException
095 {
096 checkPreConditions();
097 return _consumer.receiveNoWait();
098 }
099
100 public void close() throws JMSException
101 {
102 _consumer.close();
103 }
104
105 private void checkPreConditions() throws javax.jms.IllegalStateException{
106 C msgConsumer = _consumer;
107
108 if (msgConsumer.isClosed() ){
109 throw new javax.jms.IllegalStateException("Consumer is closed");
110 }
111
112 if(_topic == null){
113 throw new UnsupportedOperationException("Topic is null");
114 }
115
116 AMQSession session = msgConsumer.getSession();
117
118 if(session == null || session.isClosed()){
119 throw new javax.jms.IllegalStateException("Invalid Session");
120 }
121 }
122
123 C getMessageConsumer()
124 {
125 return _consumer;
126 }
127
128 public void addBindingKey(Topic topic, String bindingKey) throws AMQException
129 {
130 _consumer.addBindingKey((AMQDestination) topic, bindingKey);
131 }
132 }
|