SubscriptionFactoryImpl.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.subscription;
022 
023 import org.apache.qpid.server.protocol.AMQProtocolSession;
024 import org.apache.qpid.server.flow.FlowCreditManager;
025 import org.apache.qpid.server.subscription.Subscription;
026 import org.apache.qpid.server.subscription.SubscriptionFactory;
027 import org.apache.qpid.server.AMQChannel;
028 import org.apache.qpid.framing.AMQShortString;
029 import org.apache.qpid.framing.FieldTable;
030 import org.apache.qpid.AMQException;
031 import org.apache.qpid.protocol.AMQConstant;
032 import org.apache.qpid.common.AMQPFilterTypes;
033 
034 public class SubscriptionFactoryImpl implements SubscriptionFactory
035 {
036 
037   /*  private SubscriptionFactoryImpl()
038     {
039 
040     }*/
041 
042     public Subscription createSubscription(int channelId, AMQProtocolSession protocolSession,
043                                            AMQShortString consumerTag, boolean acks, FieldTable filters,
044                                            boolean noLocal, FlowCreditManager creditManagerthrows AMQException
045     {
046         AMQChannel channel = protocolSession.getChannel(channelId);
047         if (channel == null)
048         {
049             throw new AMQException(AMQConstant.NOT_FOUND, "channel :" + channelId + " not found in protocol session");
050         }
051         ClientDeliveryMethod clientMethod = channel.getClientDeliveryMethod();
052         RecordDeliveryMethod recordMethod = channel.getRecordDeliveryMethod();
053 
054 
055         return createSubscription(channel, protocolSession, consumerTag, acks, filters,
056                                   noLocal,
057                                   creditManager,
058                                   clientMethod,
059                                   recordMethod
060         );
061     }
062 
063     public Subscription createSubscription(final AMQChannel channel,
064                                             final AMQProtocolSession protocolSession,
065                                             final AMQShortString consumerTag,
066                                             final boolean acks,
067                                             final FieldTable filters,
068                                             final boolean noLocal,
069                                             final FlowCreditManager creditManager,
070                                             final ClientDeliveryMethod clientMethod,
071                                             final RecordDeliveryMethod recordMethod
072     )
073             throws AMQException
074     {
075         boolean isBrowser;
076 
077         if (filters != null)
078         {
079             Boolean isBrowserObj = (Booleanfilters.get(AMQPFilterTypes.NO_CONSUME.getValue());
080             isBrowser = (isBrowserObj != null&& isBrowserObj.booleanValue();
081         }
082         else
083         {
084             isBrowser = false;
085         }
086 
087         if(isBrowser)
088         {
089             return new SubscriptionImpl.BrowserSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
090         }
091         else if(acks)
092         {
093             return new SubscriptionImpl.AckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
094         }
095         else
096         {
097             return new SubscriptionImpl.NoAckSubscription(channel, protocolSession, consumerTag,  filters, noLocal, creditManager, clientMethod, recordMethod);
098         }
099     }
100 
101 
102     public static final SubscriptionFactoryImpl INSTANCE = new SubscriptionFactoryImpl();
103 }