001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.subscription;
022
023 import org.apache.qpid.server.protocol.AMQProtocolSession;
024 import org.apache.qpid.server.flow.FlowCreditManager;
025 import org.apache.qpid.server.subscription.Subscription;
026 import org.apache.qpid.server.subscription.SubscriptionFactory;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.FieldTable;
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.protocol.AMQConstant;
032 import org.apache.qpid.common.AMQPFilterTypes;
033
034 public class SubscriptionFactoryImpl implements SubscriptionFactory
035 {
036
037 /* private SubscriptionFactoryImpl()
038 {
039
040 }*/
041
042 public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
043 AMQShortString consumerTag, boolean acks, FieldTable filters,
044 boolean noLocal, FlowCreditManager creditManager) throws AMQException
045 {
046 AMQChannel channel = protocolSession.getChannel(channelId);
047 if (channel == null)
048 {
049 throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
050 }
051 ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod();
052 RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod();
053
054
055 return createSubscription(channel, protocolSession, consumerTag, acks, filters,
056 noLocal,
057 creditManager,
058 clientMethod,
059 recordMethod
060 );
061 }
062
063 public Subscription createSubscription(final AMQChannel channel,
064 final AMQProtocolSession protocolSession,
065 final AMQShortString consumerTag,
066 final boolean acks,
067 final FieldTable filters,
068 final boolean noLocal,
069 final FlowCreditManager creditManager,
070 final ClientDeliveryMethod clientMethod,
071 final RecordDeliveryMethod recordMethod
072 )
073 throws AMQException
074 {
075 boolean isBrowser;
076
077 if (filters != null)
078 {
079 Boolean isBrowserObj = (Boolean) filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
080 isBrowser = (isBrowserObj != null) && isBrowserObj.booleanValue();
081 }
082 else
083 {
084 isBrowser = false;
085 }
086
087 if(isBrowser)
088 {
089 return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
090 }
091 else if(acks)
092 {
093 return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
094 }
095 else
096 {
097 return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag, filters, noLocal, creditManager, clientMethod, recordMethod);
098 }
099 }
100
101
102 public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
103 }
|