001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.state;
022
023 import java.util.EnumMap;
024 import java.util.HashMap;
025 import java.util.Map;
026 import java.util.concurrent.CopyOnWriteArraySet;
027
028 import org.apache.log4j.Logger;
029
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.AMQConnectionException;
032 import org.apache.qpid.framing.*;
033 import org.apache.qpid.protocol.AMQMethodEvent;
034 import org.apache.qpid.protocol.AMQMethodListener;
035 import org.apache.qpid.protocol.AMQConstant;
036 import org.apache.qpid.server.handler.BasicAckMethodHandler;
037 import org.apache.qpid.server.handler.BasicCancelMethodHandler;
038 import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
039 import org.apache.qpid.server.handler.BasicGetMethodHandler;
040 import org.apache.qpid.server.handler.BasicPublishMethodHandler;
041 import org.apache.qpid.server.handler.BasicQosHandler;
042 import org.apache.qpid.server.handler.BasicRecoverMethodHandler;
043 import org.apache.qpid.server.handler.BasicRejectMethodHandler;
044 import org.apache.qpid.server.handler.ChannelCloseHandler;
045 import org.apache.qpid.server.handler.ChannelCloseOkHandler;
046 import org.apache.qpid.server.handler.ChannelFlowHandler;
047 import org.apache.qpid.server.handler.ChannelOpenHandler;
048 import org.apache.qpid.server.handler.ConnectionCloseMethodHandler;
049 import org.apache.qpid.server.handler.ConnectionCloseOkMethodHandler;
050 import org.apache.qpid.server.handler.ConnectionOpenMethodHandler;
051 import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler;
052 import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
053 import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
054 import org.apache.qpid.server.handler.ExchangeBoundHandler;
055 import org.apache.qpid.server.handler.ExchangeDeclareHandler;
056 import org.apache.qpid.server.handler.ExchangeDeleteHandler;
057 import org.apache.qpid.server.handler.QueueBindHandler;
058 import org.apache.qpid.server.handler.QueueDeclareHandler;
059 import org.apache.qpid.server.handler.QueueDeleteHandler;
060 import org.apache.qpid.server.handler.QueuePurgeHandler;
061 import org.apache.qpid.server.handler.TxCommitHandler;
062 import org.apache.qpid.server.handler.TxRollbackHandler;
063 import org.apache.qpid.server.handler.TxSelectHandler;
064 import org.apache.qpid.server.protocol.AMQProtocolSession;
065 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
066
067 /**
068 * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
069 * there is a separate state manager.
070 */
071 public class AMQStateManager implements AMQMethodListener
072 {
073 private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
074
075 private final VirtualHostRegistry _virtualHostRegistry;
076 private final AMQProtocolSession _protocolSession;
077 /** The current state */
078 private AMQState _currentState;
079
080 /**
081 * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
082 * AMQFrame.
083 */
084 /* private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
085 new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
086 AMQState.class);
087 */
088
089
090 private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
091
092 public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
093 {
094
095 _virtualHostRegistry = virtualHostRegistry;
096 _protocolSession = protocolSession;
097 _currentState = AMQState.CONNECTION_NOT_STARTED;
098
099 }
100
101 /*
102 protected void registerListeners()
103 {
104 Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
105
106 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
107 _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
108
109 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
110 _state2HandlersMap.put(AMQState.CONNECTION_NOT_AUTH, frame2handlerMap);
111
112 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
113 _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
114
115 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
116 frame2handlerMap.put(ConnectionOpenBody.class, ConnectionOpenMethodHandler.getInstance());
117 _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
118
119 //
120 // ConnectionOpen handlers
121 //
122 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
123 ChannelOpenHandler.getInstance();
124 ChannelCloseHandler.getInstance();
125 ChannelCloseOkHandler.getInstance();
126 ConnectionCloseMethodHandler.getInstance();
127 ConnectionCloseOkMethodHandler.getInstance();
128 ConnectionTuneOkMethodHandler.getInstance();
129 ConnectionSecureOkMethodHandler.getInstance();
130 ConnectionStartOkMethodHandler.getInstance();
131 ExchangeDeclareHandler.getInstance();
132 ExchangeDeleteHandler.getInstance();
133 ExchangeBoundHandler.getInstance();
134 BasicAckMethodHandler.getInstance();
135 BasicRecoverMethodHandler.getInstance();
136 BasicConsumeMethodHandler.getInstance();
137 BasicGetMethodHandler.getInstance();
138 BasicCancelMethodHandler.getInstance();
139 BasicPublishMethodHandler.getInstance();
140 BasicQosHandler.getInstance();
141 QueueBindHandler.getInstance();
142 QueueDeclareHandler.getInstance();
143 QueueDeleteHandler.getInstance();
144 QueuePurgeHandler.getInstance();
145 ChannelFlowHandler.getInstance();
146 TxSelectHandler.getInstance();
147 TxCommitHandler.getInstance();
148 TxRollbackHandler.getInstance();
149 BasicRejectMethodHandler.getInstance();
150
151 _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
152
153 frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
154
155 _state2HandlersMap.put(AMQState.CONNECTION_CLOSING, frame2handlerMap);
156
157 } */
158
159 public AMQState getCurrentState()
160 {
161 return _currentState;
162 }
163
164 public void changeState(AMQState newState) throws AMQException
165 {
166 _logger.debug("State changing to " + newState + " from old state " + _currentState);
167 final AMQState oldState = _currentState;
168 _currentState = newState;
169
170 for (StateListener l : _stateListeners)
171 {
172 l.stateChanged(oldState, newState);
173 }
174 }
175
176 public void error(Exception e)
177 {
178 _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
179 for (StateListener l : _stateListeners)
180 {
181 l.error(e);
182 }
183 }
184
185 public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
186 {
187 MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
188
189 final int channelId = evt.getChannelId();
190 B body = evt.getMethod();
191
192 if(channelId != 0 && _protocolSession.getChannel(channelId)== null)
193 {
194
195 if(! ((body instanceof ChannelOpenBody)
196 || (body instanceof ChannelCloseOkBody)
197 || (body instanceof ChannelCloseBody)))
198 {
199 throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
200 }
201
202 }
203
204 return body.execute(dispatcher, channelId);
205
206 }
207
208 private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
209 throws AMQException
210 {
211 if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody)
212 && (protocolSession.getChannel(evt.getChannelId()) == null)
213 && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
214 {
215 throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
216 }
217 }
218
219 /*
220 protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
221 B frame)
222 // throws IllegalStateTransitionException
223 {
224 final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap =
225 _state2HandlersMap.get(currentState);
226
227 final StateAwareMethodListener<B> handler =
228 (classToHandlerMap == null) ? null : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
229
230 if (handler == null)
231 {
232 _logger.debug("No state transition handler defined for receiving frame " + frame);
233
234 return null;
235 }
236 else
237 {
238 return handler;
239 }
240 }
241 */
242
243 public void addStateListener(StateListener listener)
244 {
245 _logger.debug("Adding state listener");
246 _stateListeners.add(listener);
247 }
248
249 public void removeStateListener(StateListener listener)
250 {
251 _stateListeners.remove(listener);
252 }
253
254 public VirtualHostRegistry getVirtualHostRegistry()
255 {
256 return _virtualHostRegistry;
257 }
258
259 public AMQProtocolSession getProtocolSession()
260 {
261 return _protocolSession;
262 }
263 }
|