AMQStateManager.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client.state;
022 
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.client.protocol.AMQProtocolSession;
025 import org.apache.qpid.framing.*;
026 import org.apache.qpid.protocol.AMQMethodEvent;
027 import org.apache.qpid.protocol.AMQMethodListener;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030 
031 import java.util.Set;
032 import java.util.List;
033 import java.util.concurrent.CopyOnWriteArrayList;
034 
035 /**
036  * The state manager is responsible for managing the state of the protocol session. <p/>
037  * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
038  *
039  * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
040  * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
041  *
042  * The StateManager works by any component can wait for a state change to occur by using the following sequence.
043  *
044  <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
045  <li> // Perform action that will cause state change
046  <li>waiter.await();
047  *
048  * The two step process is required as there is an inherit race condition between starting a process that will cause
049  * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
050  * that any asynchrous errors that occur can be delivered to the correct waiters.
051  */
052 public class AMQStateManager implements AMQMethodListener
053 {
054     private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
055 
056     private AMQProtocolSession _protocolSession;
057 
058     /** The current state */
059     private AMQState _currentState;
060 
061     private final Object _stateLock = new Object();
062 
063     private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait""30000"));
064 
065     protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
066     private Exception _lastException;
067 
068     public AMQStateManager()
069     {
070         this(null);
071     }
072 
073     public AMQStateManager(AMQProtocolSession protocolSession)
074     {
075         this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
076     }
077 
078     protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
079     {
080         _protocolSession = protocolSession;
081         _currentState = state;
082     }
083 
084     public AMQState getCurrentState()
085     {
086         return _currentState;
087     }
088 
089     public void changeState(AMQState newStatethrows AMQException
090     {
091         _logger.debug("State changing to " + newState + " from old state " + _currentState);
092 
093         synchronized (_stateLock)
094         {
095             _currentState = newState;
096 
097             _logger.debug("Notififying State change to " + _waiters.size() " : " + _waiters);
098 
099             for (StateWaiter waiter : _waiters)
100             {
101                 waiter.received(newState);
102             }
103         }
104     }
105 
106     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evtthrows AMQException
107     {
108         B method = evt.getMethod();
109 
110         //    StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
111         method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
112         return true;
113     }
114 
115     /**
116      * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
117      *
118      * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
119      * connection to the network.
120      *
121      @param session The new protocol session
122      */
123     public void setProtocolSession(AMQProtocolSession session)
124     {
125         if (_logger.isInfoEnabled())
126         {
127             _logger.info("Setting ProtocolSession:" + session);
128         }
129         _protocolSession = session;
130     }
131 
132     /**
133      * Propogate error to waiters
134      *
135      @param error The error to propogate.
136      */
137     public void error(Exception error)
138     {
139         if (_waiters.size() == 0)
140         {
141             _logger.error("No Waiters for error saving as last error:" + error.getMessage());
142             _lastException = error;
143         }
144         for (StateWaiter waiter : _waiters)
145         {
146             _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
147             waiter.error(error);
148         }
149     }
150 
151     /**
152      * This provides a single place that the maximum time for state change to occur can be accessed.
153      * It is currently set via System property amqj.MaximumStateWait
154      *
155      @return long Milliseconds value for a timeout
156      */
157     public long getWaitTimeout()
158     {
159         return MAXIMUM_STATE_WAIT_TIME;
160     }
161 
162     /**
163      * Create and add a new waiter to the notifcation list.
164      *
165      @param states The waiter will attempt to wait for one of these desired set states to be achived.
166      *
167      @return the created StateWaiter.
168      */
169     public StateWaiter createWaiter(Set<AMQState> states)
170     {
171         final StateWaiter waiter;
172         synchronized (_stateLock)
173         {
174             waiter = new StateWaiter(this, _currentState, states);
175 
176             _waiters.add(waiter);
177         }
178 
179         return waiter;
180     }
181 
182     /**
183      * Remove the waiter from the notification list.
184      *
185      @param waiter The waiter to remove.
186      */
187     public void removeWaiter(StateWaiter waiter)
188     {
189         synchronized (_stateLock)
190         {
191             _waiters.remove(waiter);
192         }
193     }
194 
195     public Exception getLastException()
196     {
197         return _lastException;
198     }
199 }