BasicPublishMethodHandler.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.handler;
022 
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.exchange.ExchangeDefaults;
026 import org.apache.qpid.framing.BasicPublishBody;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.AMQChannel;
030 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.security.access.Permission;
034 import org.apache.qpid.server.state.AMQStateManager;
035 import org.apache.qpid.server.state.StateAwareMethodListener;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037 
038 public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
039 {
040     private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
041 
042     private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
043 
044 
045     public static BasicPublishMethodHandler getInstance()
046     {
047         return _instance;
048     }
049 
050     private BasicPublishMethodHandler()
051     {
052     }
053 
054     public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelIdthrows AMQException
055     {
056         AMQProtocolSession session = stateManager.getProtocolSession();
057         if (_logger.isDebugEnabled())
058         {
059             _logger.debug("Publish received on channel " + channelId);
060         }
061 
062         AMQShortString exchange = body.getExchange();
063         // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
064         if (exchange == null)
065         {
066             exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
067 
068         }
069 
070         VirtualHost vHost = session.getVirtualHost();
071         Exchange e = vHost.getExchangeRegistry().getExchange(exchange);
072         // if the exchange does not exist we raise a channel exception
073         if (e == null)
074         {
075             throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
076         }
077         else
078         {
079             // The partially populated BasicDeliver frame plus the received route body
080             // is stored in the channel. Once the final body frame has been received
081             // it is routed to the exchange.
082             AMQChannel channel = session.getChannel(channelId);
083 
084             if (channel == null)
085             {
086                 throw body.getChannelNotFoundException(channelId);
087             }
088 
089             //Access Control
090             if (!vHost.getAccessManager().authorisePublish(session, 
091                     body.getImmediate(), body.getMandatory()
092                     body.getRoutingKey(), e))
093             {
094                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
095             }
096 
097             MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
098             info.setExchange(exchange);
099             channel.setPublishFrame(info, e);
100         }
101     }
102 
103 }
104 
105