001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client.state;
022
023 import org.apache.qpid.AMQException;
024 import org.apache.qpid.client.protocol.AMQProtocolSession;
025 import org.apache.qpid.framing.*;
026 import org.apache.qpid.protocol.AMQMethodEvent;
027 import org.apache.qpid.protocol.AMQMethodListener;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 import java.util.Set;
032 import java.util.List;
033 import java.util.concurrent.CopyOnWriteArrayList;
034
035 /**
036 * The state manager is responsible for managing the state of the protocol session. <p/>
037 * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
038 *
039 * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
040 * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
041 *
042 * The StateManager works by any component can wait for a state change to occur by using the following sequence.
043 *
044 * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
045 * <li> // Perform action that will cause state change
046 * <li>waiter.await();
047 *
048 * The two step process is required as there is an inherit race condition between starting a process that will cause
049 * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
050 * that any asynchrous errors that occur can be delivered to the correct waiters.
051 */
052 public class AMQStateManager implements AMQMethodListener
053 {
054 private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
055
056 private AMQProtocolSession _protocolSession;
057
058 /** The current state */
059 private AMQState _currentState;
060
061 private final Object _stateLock = new Object();
062
063 private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
064
065 protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
066 private Exception _lastException;
067
068 public AMQStateManager()
069 {
070 this(null);
071 }
072
073 public AMQStateManager(AMQProtocolSession protocolSession)
074 {
075 this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
076 }
077
078 protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
079 {
080 _protocolSession = protocolSession;
081 _currentState = state;
082 }
083
084 public AMQState getCurrentState()
085 {
086 return _currentState;
087 }
088
089 public void changeState(AMQState newState) throws AMQException
090 {
091 _logger.debug("State changing to " + newState + " from old state " + _currentState);
092
093 synchronized (_stateLock)
094 {
095 _currentState = newState;
096
097 _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
098
099 for (StateWaiter waiter : _waiters)
100 {
101 waiter.received(newState);
102 }
103 }
104 }
105
106 public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
107 {
108 B method = evt.getMethod();
109
110 // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
111 method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
112 return true;
113 }
114
115 /**
116 * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
117 *
118 * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
119 * connection to the network.
120 *
121 * @param session The new protocol session
122 */
123 public void setProtocolSession(AMQProtocolSession session)
124 {
125 if (_logger.isInfoEnabled())
126 {
127 _logger.info("Setting ProtocolSession:" + session);
128 }
129 _protocolSession = session;
130 }
131
132 /**
133 * Propogate error to waiters
134 *
135 * @param error The error to propogate.
136 */
137 public void error(Exception error)
138 {
139 if (_waiters.size() == 0)
140 {
141 _logger.error("No Waiters for error saving as last error:" + error.getMessage());
142 _lastException = error;
143 }
144 for (StateWaiter waiter : _waiters)
145 {
146 _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
147 waiter.error(error);
148 }
149 }
150
151 /**
152 * This provides a single place that the maximum time for state change to occur can be accessed.
153 * It is currently set via System property amqj.MaximumStateWait
154 *
155 * @return long Milliseconds value for a timeout
156 */
157 public long getWaitTimeout()
158 {
159 return MAXIMUM_STATE_WAIT_TIME;
160 }
161
162 /**
163 * Create and add a new waiter to the notifcation list.
164 *
165 * @param states The waiter will attempt to wait for one of these desired set states to be achived.
166 *
167 * @return the created StateWaiter.
168 */
169 public StateWaiter createWaiter(Set<AMQState> states)
170 {
171 final StateWaiter waiter;
172 synchronized (_stateLock)
173 {
174 waiter = new StateWaiter(this, _currentState, states);
175
176 _waiters.add(waiter);
177 }
178
179 return waiter;
180 }
181
182 /**
183 * Remove the waiter from the notification list.
184 *
185 * @param waiter The waiter to remove.
186 */
187 public void removeWaiter(StateWaiter waiter)
188 {
189 synchronized (_stateLock)
190 {
191 _waiters.remove(waiter);
192 }
193 }
194
195 public Exception getLastException()
196 {
197 return _lastException;
198 }
199 }
|