001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import java.io.IOException;
024 import java.net.ConnectException;
025 import java.nio.channels.UnresolvedAddressException;
026 import java.text.MessageFormat;
027 import java.util.ArrayList;
028 import java.util.EnumSet;
029 import java.util.Iterator;
030 import java.util.Set;
031
032 import javax.jms.JMSException;
033 import javax.jms.XASession;
034
035 import org.apache.qpid.AMQException;
036 import org.apache.qpid.client.failover.FailoverException;
037 import org.apache.qpid.client.failover.FailoverProtectedOperation;
038 import org.apache.qpid.client.failover.FailoverRetrySupport;
039 import org.apache.qpid.client.state.AMQState;
040 import org.apache.qpid.client.state.StateWaiter;
041 import org.apache.qpid.client.transport.TransportConnection;
042 import org.apache.qpid.framing.BasicQosBody;
043 import org.apache.qpid.framing.BasicQosOkBody;
044 import org.apache.qpid.framing.ChannelOpenBody;
045 import org.apache.qpid.framing.ChannelOpenOkBody;
046 import org.apache.qpid.framing.ProtocolVersion;
047 import org.apache.qpid.framing.TxSelectBody;
048 import org.apache.qpid.framing.TxSelectOkBody;
049 import org.apache.qpid.jms.BrokerDetails;
050 import org.apache.qpid.jms.ChannelLimitReachedException;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053
054 public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
055 {
056 private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
057 private AMQConnection _conn;
058
059
060 public void closeConnection(long timeout) throws JMSException, AMQException
061 {
062 _conn.getProtocolHandler().closeConnection(timeout);
063
064 }
065
066 public AMQConnectionDelegate_8_0(AMQConnection conn)
067 {
068 _conn = conn;
069 }
070
071 protected boolean checkException(Throwable thrown)
072 {
073 Throwable cause = thrown.getCause();
074
075 if (cause == null)
076 {
077 cause = thrown;
078 }
079
080 return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
081 }
082
083 public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws AMQException, IOException
084 {
085 final Set<AMQState> openOrClosedStates =
086 EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
087
088
089 StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
090
091 // TODO: use system property thingy for this
092 if (System.getProperty("UseTransportIo", "false").equals("false"))
093 {
094 TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
095 }
096 else
097 {
098 _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
099 }
100
101 // this blocks until the connection has been set up or when an error
102 // has prevented the connection being set up
103
104 AMQState state = waiter.await();
105
106 if(state == AMQState.CONNECTION_OPEN)
107 {
108 _conn._failoverPolicy.attainedConnection();
109 _conn._connected = true;
110 }
111
112 return null;
113 }
114
115 public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
116 throws JMSException
117 {
118 return createSession(transacted, acknowledgeMode, prefetch, prefetch);
119 }
120
121 public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
122 {
123 throw new UnsupportedOperationException("0_8 version does not provide XA support");
124 }
125
126 public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
127 final int prefetchHigh, final int prefetchLow) throws JMSException
128 {
129 _conn.checkNotClosed();
130
131 if (_conn.channelLimitReached())
132 {
133 throw new ChannelLimitReachedException(_conn._maximumChannelCount);
134 }
135
136 return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
137 new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
138 {
139 public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
140 {
141 int channelId = _conn._idFactory.incrementAndGet();
142
143 if (_logger.isDebugEnabled())
144 {
145 _logger.debug("Write channel open frame for channel id " + channelId);
146 }
147
148 // We must create the session and register it before actually sending the frame to the server to
149 // open it, so that there is no window where we could receive data on the channel and not be set
150 // up to handle it appropriately.
151 AMQSession session =
152 new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
153 prefetchLow);
154 // _protocolHandler.addSessionByChannel(channelId, session);
155 _conn.registerSession(channelId, session);
156
157 boolean success = false;
158 try
159 {
160 createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
161 success = true;
162 }
163 catch (AMQException e)
164 {
165 JMSException jmse = new JMSException("Error creating session: " + e);
166 jmse.setLinkedException(e);
167 throw jmse;
168 }
169 finally
170 {
171 if (!success)
172 {
173 _conn.deregisterSession(channelId);
174 }
175 }
176
177 if (_conn._started)
178 {
179 try
180 {
181 session.start();
182 }
183 catch (AMQException e)
184 {
185 throw new JMSAMQException(e);
186 }
187 }
188
189 return session;
190 }
191 }, _conn).execute();
192 }
193
194 private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
195 throws AMQException, FailoverException
196 {
197 ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
198 // TODO: Be aware of possible changes to parameter order as versions change.
199 _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
200
201 // todo send low water mark when protocol allows.
202 // todo Be aware of possible changes to parameter order as versions change.
203 BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
204 _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
205
206 if (transacted)
207 {
208 if (_logger.isDebugEnabled())
209 {
210 _logger.debug("Issuing TxSelect for " + channelId);
211 }
212 TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
213
214 // TODO: Be aware of possible changes to parameter order as versions change.
215 _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
216 }
217 }
218
219 public void failoverPrep()
220 {
221 // do nothing
222 }
223
224 /**
225 * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
226 * The caller must hold the failover mutex before calling this method.
227 */
228 public void resubscribeSessions() throws JMSException, AMQException, FailoverException
229 {
230 ArrayList sessions = new ArrayList(_conn.getSessions().values());
231 _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
232 for (Iterator it = sessions.iterator(); it.hasNext();)
233 {
234 AMQSession s = (AMQSession) it.next();
235 // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
236 reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
237 s.resubscribe();
238 }
239 }
240
241 private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
242 throws AMQException, FailoverException
243 {
244 try
245 {
246 createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
247 }
248 catch (AMQException e)
249 {
250 _conn.deregisterSession(channelId);
251 throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
252 }
253 }
254
255 public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
256 {
257 while (true)
258 {
259 try
260 {
261 _conn.blockUntilNotFailingOver();
262 }
263 catch (InterruptedException e)
264 {
265 _logger.debug("Interrupted: " + e, e);
266
267 return null;
268 }
269
270 synchronized (_conn.getFailoverMutex())
271 {
272 try
273 {
274 return operation.execute();
275 }
276 catch (FailoverException e)
277 {
278 _logger.debug("Failover exception caught during operation: " + e, e);
279 }
280 catch (IllegalStateException e)
281 {
282 if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
283 {
284 throw e;
285 }
286 }
287 }
288 }
289 }
290
291 public void setIdleTimeout(long l){}
292 }
|