001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.handler;
022
023 import org.apache.log4j.Logger;
024 import org.apache.qpid.AMQException;
025 import org.apache.qpid.exchange.ExchangeDefaults;
026 import org.apache.qpid.framing.BasicPublishBody;
027 import org.apache.qpid.framing.AMQShortString;
028 import org.apache.qpid.protocol.AMQConstant;
029 import org.apache.qpid.server.AMQChannel;
030 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
031 import org.apache.qpid.server.exchange.Exchange;
032 import org.apache.qpid.server.protocol.AMQProtocolSession;
033 import org.apache.qpid.server.security.access.Permission;
034 import org.apache.qpid.server.state.AMQStateManager;
035 import org.apache.qpid.server.state.StateAwareMethodListener;
036 import org.apache.qpid.server.virtualhost.VirtualHost;
037
038 public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
039 {
040 private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
041
042 private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
043
044
045 public static BasicPublishMethodHandler getInstance()
046 {
047 return _instance;
048 }
049
050 private BasicPublishMethodHandler()
051 {
052 }
053
054 public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
055 {
056 AMQProtocolSession session = stateManager.getProtocolSession();
057 if (_logger.isDebugEnabled())
058 {
059 _logger.debug("Publish received on channel " + channelId);
060 }
061
062 AMQShortString exchange = body.getExchange();
063 // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
064 if (exchange == null)
065 {
066 exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
067
068 }
069
070 VirtualHost vHost = session.getVirtualHost();
071 Exchange e = vHost.getExchangeRegistry().getExchange(exchange);
072 // if the exchange does not exist we raise a channel exception
073 if (e == null)
074 {
075 throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
076 }
077 else
078 {
079 // The partially populated BasicDeliver frame plus the received route body
080 // is stored in the channel. Once the final body frame has been received
081 // it is routed to the exchange.
082 AMQChannel channel = session.getChannel(channelId);
083
084 if (channel == null)
085 {
086 throw body.getChannelNotFoundException(channelId);
087 }
088
089 //Access Control
090 if (!vHost.getAccessManager().authorisePublish(session,
091 body.getImmediate(), body.getMandatory(),
092 body.getRoutingKey(), e))
093 {
094 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
095 }
096
097 MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
098 info.setExchange(exchange);
099 channel.setPublishFrame(info, e);
100 }
101 }
102
103 }
104
105
|