AMQStateManager.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.state;
022 
023 import java.util.EnumMap;
024 import java.util.HashMap;
025 import java.util.Map;
026 import java.util.concurrent.CopyOnWriteArraySet;
027 
028 import org.apache.log4j.Logger;
029 
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.AMQConnectionException;
032 import org.apache.qpid.framing.*;
033 import org.apache.qpid.protocol.AMQMethodEvent;
034 import org.apache.qpid.protocol.AMQMethodListener;
035 import org.apache.qpid.protocol.AMQConstant;
036 import org.apache.qpid.server.handler.BasicAckMethodHandler;
037 import org.apache.qpid.server.handler.BasicCancelMethodHandler;
038 import org.apache.qpid.server.handler.BasicConsumeMethodHandler;
039 import org.apache.qpid.server.handler.BasicGetMethodHandler;
040 import org.apache.qpid.server.handler.BasicPublishMethodHandler;
041 import org.apache.qpid.server.handler.BasicQosHandler;
042 import org.apache.qpid.server.handler.BasicRecoverMethodHandler;
043 import org.apache.qpid.server.handler.BasicRejectMethodHandler;
044 import org.apache.qpid.server.handler.ChannelCloseHandler;
045 import org.apache.qpid.server.handler.ChannelCloseOkHandler;
046 import org.apache.qpid.server.handler.ChannelFlowHandler;
047 import org.apache.qpid.server.handler.ChannelOpenHandler;
048 import org.apache.qpid.server.handler.ConnectionCloseMethodHandler;
049 import org.apache.qpid.server.handler.ConnectionCloseOkMethodHandler;
050 import org.apache.qpid.server.handler.ConnectionOpenMethodHandler;
051 import org.apache.qpid.server.handler.ConnectionSecureOkMethodHandler;
052 import org.apache.qpid.server.handler.ConnectionStartOkMethodHandler;
053 import org.apache.qpid.server.handler.ConnectionTuneOkMethodHandler;
054 import org.apache.qpid.server.handler.ExchangeBoundHandler;
055 import org.apache.qpid.server.handler.ExchangeDeclareHandler;
056 import org.apache.qpid.server.handler.ExchangeDeleteHandler;
057 import org.apache.qpid.server.handler.QueueBindHandler;
058 import org.apache.qpid.server.handler.QueueDeclareHandler;
059 import org.apache.qpid.server.handler.QueueDeleteHandler;
060 import org.apache.qpid.server.handler.QueuePurgeHandler;
061 import org.apache.qpid.server.handler.TxCommitHandler;
062 import org.apache.qpid.server.handler.TxRollbackHandler;
063 import org.apache.qpid.server.handler.TxSelectHandler;
064 import org.apache.qpid.server.protocol.AMQProtocolSession;
065 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
066 
067 /**
068  * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
069  * there is a separate state manager.
070  */
071 public class AMQStateManager implements AMQMethodListener
072 {
073     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
074 
075     private final VirtualHostRegistry _virtualHostRegistry;
076     private final AMQProtocolSession _protocolSession;
077     /** The current state */
078     private AMQState _currentState;
079 
080     /**
081      * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
082      * AMQFrame.
083      */
084 /*    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
085         new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(
086             AMQState.class);
087   */
088 
089 
090     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
091 
092     public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
093     {
094 
095         _virtualHostRegistry = virtualHostRegistry;
096         _protocolSession = protocolSession;
097         _currentState = AMQState.CONNECTION_NOT_STARTED;
098 
099     }
100 
101     /*
102     protected void registerListeners()
103     {
104         Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
105 
106         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
107         _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
108 
109         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
110         _state2HandlersMap.put(AMQState.CONNECTION_NOT_AUTH, frame2handlerMap);
111 
112         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
113         _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
114 
115         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
116         frame2handlerMap.put(ConnectionOpenBody.class, ConnectionOpenMethodHandler.getInstance());
117         _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
118 
119         //
120         // ConnectionOpen handlers
121         //
122         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
123         ChannelOpenHandler.getInstance();
124         ChannelCloseHandler.getInstance();
125         ChannelCloseOkHandler.getInstance();
126         ConnectionCloseMethodHandler.getInstance();
127         ConnectionCloseOkMethodHandler.getInstance();
128         ConnectionTuneOkMethodHandler.getInstance();
129         ConnectionSecureOkMethodHandler.getInstance();
130         ConnectionStartOkMethodHandler.getInstance();
131         ExchangeDeclareHandler.getInstance();
132         ExchangeDeleteHandler.getInstance();
133         ExchangeBoundHandler.getInstance();
134         BasicAckMethodHandler.getInstance();
135         BasicRecoverMethodHandler.getInstance();
136         BasicConsumeMethodHandler.getInstance();
137         BasicGetMethodHandler.getInstance();
138         BasicCancelMethodHandler.getInstance();
139         BasicPublishMethodHandler.getInstance();
140         BasicQosHandler.getInstance();
141         QueueBindHandler.getInstance();
142         QueueDeclareHandler.getInstance();
143         QueueDeleteHandler.getInstance();
144         QueuePurgeHandler.getInstance();
145         ChannelFlowHandler.getInstance();
146         TxSelectHandler.getInstance();
147         TxCommitHandler.getInstance();
148         TxRollbackHandler.getInstance();
149         BasicRejectMethodHandler.getInstance();
150 
151         _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
152 
153         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
154 
155         _state2HandlersMap.put(AMQState.CONNECTION_CLOSING, frame2handlerMap);
156 
157     } */
158 
159     public AMQState getCurrentState()
160     {
161         return _currentState;
162     }
163 
164     public void changeState(AMQState newStatethrows AMQException
165     {
166         _logger.debug("State changing to " + newState + " from old state " + _currentState);
167         final AMQState oldState = _currentState;
168         _currentState = newState;
169 
170         for (StateListener l : _stateListeners)
171         {
172             l.stateChanged(oldState, newState);
173         }
174     }
175 
176     public void error(Exception e)
177     {
178         _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
179         for (StateListener l : _stateListeners)
180         {
181             l.error(e);
182         }
183     }
184 
185     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evtthrows AMQException
186     {
187         MethodDispatcher dispatcher = _protocolSession.getMethodDispatcher();
188 
189         final int channelId = evt.getChannelId();
190         B body = evt.getMethod();
191 
192         if(channelId != && _protocolSession.getChannel(channelId)== null)
193         {
194 
195             if(((body instanceof ChannelOpenBody)
196                   || (body instanceof ChannelCloseOkBody)
197                   || (body instanceof ChannelCloseBody)))
198             {
199                 throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "channel is closed");
200             }
201 
202         }
203 
204         return body.execute(dispatcher, channelId);
205 
206     }
207 
208     private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
209         throws AMQException
210     {
211         if ((evt.getChannelId() != 0&& !(evt.getMethod() instanceof ChannelOpenBody)
212                 && (protocolSession.getChannel(evt.getChannelId()) == null)
213                 && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
214         {
215             throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
216         }
217     }
218 
219 /*
220     protected <B extends AMQMethodBody> StateAwareMethodListener<B> findStateTransitionHandler(AMQState currentState,
221         B frame)
222     // throws IllegalStateTransitionException
223     {
224         final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> classToHandlerMap =
225             _state2HandlersMap.get(currentState);
226 
227         final StateAwareMethodListener<B> handler =
228             (classToHandlerMap == null) ? null : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
229 
230         if (handler == null)
231         {
232             _logger.debug("No state transition handler defined for receiving frame " + frame);
233 
234             return null;
235         }
236         else
237         {
238             return handler;
239         }
240     }
241 */
242 
243     public void addStateListener(StateListener listener)
244     {
245         _logger.debug("Adding state listener");
246         _stateListeners.add(listener);
247     }
248 
249     public void removeStateListener(StateListener listener)
250     {
251         _stateListeners.remove(listener);
252     }
253 
254     public VirtualHostRegistry getVirtualHostRegistry()
255     {
256         return _virtualHostRegistry;
257     }
258 
259     public AMQProtocolSession getProtocolSession()
260     {
261         return _protocolSession;
262     }
263 }