001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied. See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 *
019 *
020 */
021
022 package org.apache.qpid.client;
023
024 import javax.jms.Destination;
025 import javax.jms.IllegalStateException;
026 import javax.jms.InvalidDestinationException;
027 import javax.jms.JMSException;
028 import javax.jms.Message;
029 import javax.jms.Topic;
030 import javax.jms.TopicPublisher;
031
032 public class TopicPublisherAdapter implements TopicPublisher
033 {
034
035 private BasicMessageProducer _delegate;
036 private Topic _topic;
037
038 public TopicPublisherAdapter(BasicMessageProducer msgProducer, Topic topic)
039 {
040 _delegate = msgProducer;
041 _topic = topic;
042 }
043
044 public Topic getTopic() throws JMSException
045 {
046 checkPreConditions();
047 return _topic;
048 }
049
050 public void publish(Message msg) throws JMSException
051 {
052 checkPreConditions();
053 checkTopic(_topic);
054 _delegate.send(msg);
055 }
056
057 public void publish(Topic topic, Message msg) throws JMSException
058 {
059 checkPreConditions();
060 checkTopic(topic);
061 _delegate.send(topic, msg);
062 }
063
064 public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
065 throws JMSException
066 {
067 checkPreConditions();
068 checkTopic(_topic);
069 _delegate.send(msg, deliveryMode, priority, timeToLive);
070 }
071
072 public int getDeliveryMode() throws JMSException {
073 checkPreConditions();
074 return _delegate.getDeliveryMode();
075 }
076
077 public void publish(Topic topic, Message msg, int deliveryMode, int priority, long timeToLive)
078 throws JMSException
079 {
080 checkPreConditions();
081 checkTopic(topic);
082 _delegate.send(topic, msg, deliveryMode, priority, timeToLive);
083 }
084
085 public void close() throws JMSException
086 {
087 _delegate.close();
088 }
089
090 public boolean getDisableMessageID() throws JMSException {
091 checkPreConditions();
092 return _delegate.getDisableMessageID();
093 }
094
095 public boolean getDisableMessageTimestamp() throws JMSException {
096 checkPreConditions();
097 return _delegate.getDisableMessageTimestamp();
098 }
099
100 public Destination getDestination() throws JMSException
101 {
102 checkPreConditions();
103 return _delegate.getDestination();
104 }
105
106 public int getPriority() throws JMSException {
107 checkPreConditions();
108 return _delegate.getPriority();
109 }
110
111 public long getTimeToLive() throws JMSException {
112 checkPreConditions();
113 return _delegate.getTimeToLive();
114 }
115
116 public void send(Message msg) throws JMSException
117 {
118 checkPreConditions();
119 checkTopic(_topic);
120 _delegate.send(msg);
121 }
122
123 public void send(Destination dest, Message msg) throws JMSException
124 {
125 checkPreConditions();
126 checkTopic(_topic);
127 _delegate.send(dest, msg);
128 }
129
130 public void send(Message msg, int deliveryMode, int priority, long timeToLive)
131 throws JMSException
132 {
133 checkPreConditions();
134 checkTopic(_topic);
135 _delegate.send(msg, deliveryMode, priority, timeToLive);
136 }
137
138 public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
139 {
140 checkPreConditions();
141 checkTopic(dest);
142 _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
143 }
144
145 public void setDeliveryMode(int deliveryMode) throws JMSException
146 {
147 checkPreConditions();
148 _delegate.setDeliveryMode(deliveryMode);
149 }
150
151 public void setDisableMessageID(boolean disableMessageID) throws JMSException
152 {
153 checkPreConditions();
154 _delegate.setDisableMessageID(disableMessageID);
155 }
156
157 public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException
158 {
159 checkPreConditions();
160 _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
161 }
162
163 public void setPriority(int priority) throws JMSException
164 {
165 checkPreConditions();
166 _delegate.setPriority(priority);
167 }
168
169 public void setTimeToLive(long timeToLive) throws JMSException
170 {
171 checkPreConditions();
172 _delegate.setTimeToLive(timeToLive);
173 }
174
175 private void checkPreConditions() throws IllegalStateException
176 {
177 if (_delegate.isClosed())
178 {
179 throw new javax.jms.IllegalStateException("Publisher is _closed");
180 }
181
182 AMQSession session = _delegate.getSession();
183 if (session == null || session.isClosed())
184 {
185 throw new javax.jms.IllegalStateException("Invalid Session");
186 }
187 }
188
189 private void checkTopic(Destination topic) throws InvalidDestinationException
190 {
191 if (topic == null)
192 {
193 throw new UnsupportedOperationException("Topic is null");
194 }
195 if (!(topic instanceof Topic))
196 {
197 throw new InvalidDestinationException("Destination " + topic + " is not a topic");
198 }
199 if(!(topic instanceof AMQDestination))
200 {
201 throw new InvalidDestinationException("Destination " + topic + " is not a Qpid topic");
202 }
203
204 }
205 }
|