AMQConnectionDelegate_8_0.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import java.io.IOException;
024 import java.net.ConnectException;
025 import java.nio.channels.UnresolvedAddressException;
026 import java.text.MessageFormat;
027 import java.util.ArrayList;
028 import java.util.EnumSet;
029 import java.util.Iterator;
030 import java.util.Set;
031 
032 import javax.jms.JMSException;
033 import javax.jms.XASession;
034 
035 import org.apache.qpid.AMQException;
036 import org.apache.qpid.client.failover.FailoverException;
037 import org.apache.qpid.client.failover.FailoverProtectedOperation;
038 import org.apache.qpid.client.failover.FailoverRetrySupport;
039 import org.apache.qpid.client.state.AMQState;
040 import org.apache.qpid.client.state.StateWaiter;
041 import org.apache.qpid.client.transport.TransportConnection;
042 import org.apache.qpid.framing.BasicQosBody;
043 import org.apache.qpid.framing.BasicQosOkBody;
044 import org.apache.qpid.framing.ChannelOpenBody;
045 import org.apache.qpid.framing.ChannelOpenOkBody;
046 import org.apache.qpid.framing.ProtocolVersion;
047 import org.apache.qpid.framing.TxSelectBody;
048 import org.apache.qpid.framing.TxSelectOkBody;
049 import org.apache.qpid.jms.BrokerDetails;
050 import org.apache.qpid.jms.ChannelLimitReachedException;
051 import org.slf4j.Logger;
052 import org.slf4j.LoggerFactory;
053 
054 public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
055 {
056     private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
057     private AMQConnection _conn;
058 
059 
060     public void closeConnection(long timeoutthrows JMSException, AMQException
061     {
062         _conn.getProtocolHandler().closeConnection(timeout);
063 
064     }
065 
066     public AMQConnectionDelegate_8_0(AMQConnection conn)
067     {
068         _conn = conn;
069     }
070 
071     protected boolean checkException(Throwable thrown)
072     {
073         Throwable cause = thrown.getCause();
074 
075         if (cause == null)
076         {
077             cause = thrown;
078         }
079 
080         return ((cause instanceof ConnectException|| (cause instanceof UnresolvedAddressException));
081     }
082 
083     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetailthrows AMQException, IOException
084     {
085         final Set<AMQState> openOrClosedStates =
086                 EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
087 
088 
089         StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates);
090 
091         // TODO: use system property thingy for this
092         if (System.getProperty("UseTransportIo""false").equals("false"))   
093         {
094             TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
095         
096         else 
097         {
098             _conn.getProtocolHandler().createIoTransportSession(brokerDetail);
099         }
100         
101         // this blocks until the connection has been set up or when an error
102         // has prevented the connection being set up
103 
104         AMQState state = waiter.await();
105 
106         if(state == AMQState.CONNECTION_OPEN)
107         {
108             _conn._failoverPolicy.attainedConnection();
109             _conn._connected = true;
110         }
111 
112         return null;
113     }
114 
115     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
116             throws JMSException
117     {
118         return createSession(transacted, acknowledgeMode, prefetch, prefetch);
119     }
120 
121     public XASession createXASession(int prefetchHigh, int prefetchLowthrows JMSException
122     {
123         throw new UnsupportedOperationException("0_8 version does not provide XA support");
124     }
125 
126     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
127                                                      final int prefetchHigh, final int prefetchLowthrows JMSException
128     {
129         _conn.checkNotClosed();
130 
131         if (_conn.channelLimitReached())
132         {
133             throw new ChannelLimitReachedException(_conn._maximumChannelCount);
134         }
135 
136         return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
137                 new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
138                 {
139                     public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
140                     {
141                         int channelId = _conn._idFactory.incrementAndGet();
142 
143                         if (_logger.isDebugEnabled())
144                         {
145                             _logger.debug("Write channel open frame for channel id " + channelId);
146                         }
147 
148                         // We must create the session and register it before actually sending the frame to the server to
149                         // open it, so that there is no window where we could receive data on the channel and not be set
150                         // up to handle it appropriately.
151                         AMQSession session =
152                                 new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
153                                                prefetchLow);
154                         // _protocolHandler.addSessionByChannel(channelId, session);
155                         _conn.registerSession(channelId, session);
156 
157                         boolean success = false;
158                         try
159                         {
160                             createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
161                             success = true;
162                         }
163                         catch (AMQException e)
164                         {
165                             JMSException jmse = new JMSException("Error creating session: " + e);
166                             jmse.setLinkedException(e);
167                             throw jmse;
168                         }
169                         finally
170                         {
171                             if (!success)
172                             {
173                                 _conn.deregisterSession(channelId);
174                             }
175                         }
176 
177                         if (_conn._started)
178                         {
179                             try
180                             {
181                                 session.start();
182                             }
183                             catch (AMQException e)
184                             {
185                                 throw new JMSAMQException(e);
186                             }
187                         }
188 
189                         return session;
190                     }
191                 }, _conn).execute();
192     }
193 
194     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
195             throws AMQException, FailoverException
196     {
197         ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
198         // TODO: Be aware of possible changes to parameter order as versions change.
199         _conn._protocolHandler.syncWrite(channelOpenBody.generateFrame(channelId),  ChannelOpenOkBody.class);
200 
201         // todo send low water mark when protocol allows.
202         // todo Be aware of possible changes to parameter order as versions change.
203         BasicQosBody basicQosBody = _conn.getProtocolHandler().getMethodRegistry().createBasicQosBody(0,prefetchHigh,false);
204         _conn._protocolHandler.syncWrite(basicQosBody.generateFrame(channelId),BasicQosOkBody.class);
205         
206         if (transacted)
207         {
208             if (_logger.isDebugEnabled())
209             {
210                 _logger.debug("Issuing TxSelect for " + channelId);
211             }
212             TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
213             
214             // TODO: Be aware of possible changes to parameter order as versions change.
215             _conn._protocolHandler.syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
216         }
217     }
218 
219     public void failoverPrep()
220     {
221         // do nothing
222     }
223 
224     /**
225      * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
226      * The caller must hold the failover mutex before calling this method.
227      */
228     public void resubscribeSessions() throws JMSException, AMQException, FailoverException
229     {
230         ArrayList sessions = new ArrayList(_conn.getSessions().values());
231         _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size()))// FIXME: removeKey?
232         for (Iterator it = sessions.iterator(); it.hasNext();)
233         {
234             AMQSession s = (AMQSessionit.next();
235             // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
236             reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
237             s.resubscribe();
238         }
239     }
240 
241     private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
242     throws AMQException, FailoverException
243     {
244         try
245         {
246             createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
247         }
248         catch (AMQException e)
249         {
250             _conn.deregisterSession(channelId);
251             throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
252         }
253     }
254 
255     public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operationthrows E
256     {
257         while (true)
258         {
259             try
260             {
261                 _conn.blockUntilNotFailingOver();
262             }
263             catch (InterruptedException e)
264             {
265                 _logger.debug("Interrupted: " + e, e);
266 
267                 return null;
268             }
269 
270             synchronized (_conn.getFailoverMutex())
271             {
272                 try
273                 {
274                     return operation.execute();
275                 }
276                 catch (FailoverException e)
277                 {
278                     _logger.debug("Failover exception caught during operation: " + e, e);
279                 }
280                 catch (IllegalStateException e)
281                 {
282                     if (!(e.getMessage().startsWith("Fail-over interupted no-op failover support")))
283                     {
284                         throw e;
285                     }
286                 }
287             }
288         }
289     }
290     
291     public void setIdleTimeout(long l){}
292 }