SubscriptionImpl.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.server.subscription;
022 
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import java.util.concurrent.atomic.AtomicReference;
025 import java.util.concurrent.locks.Lock;
026 import java.util.concurrent.locks.ReentrantLock;
027 
028 import org.apache.log4j.Logger;
029 import org.apache.qpid.AMQException;
030 import org.apache.qpid.common.AMQPFilterTypes;
031 import org.apache.qpid.common.ClientProperties;
032 import org.apache.qpid.framing.AMQShortString;
033 import org.apache.qpid.framing.FieldTable;
034 import org.apache.qpid.server.AMQChannel;
035 import org.apache.qpid.server.queue.QueueEntry;
036 import org.apache.qpid.server.queue.AMQQueue;
037 import org.apache.qpid.server.subscription.Subscription;
038 import org.apache.qpid.server.flow.FlowCreditManager;
039 import org.apache.qpid.server.filter.FilterManager;
040 import org.apache.qpid.server.filter.FilterManagerFactory;
041 import org.apache.qpid.server.protocol.AMQProtocolSession;
042 import org.apache.qpid.server.store.StoreContext;
043 
044 /**
045  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
046  * that was given out by the broker and the channel id. <p/>
047  */
048 public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
049 {
050 
051     private StateListener _stateListener = new StateListener()
052                                             {
053 
054                                                 public void stateChange(Subscription sub, State oldState, State newState)
055                                                 {
056 
057                                                 }
058                                             };
059 
060 
061     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
062     private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
063     private final ClientDeliveryMethod _deliveryMethod;
064     private final RecordDeliveryMethod _recordMethod;
065     
066     private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
067     private final Lock _stateChangeLock;
068 
069     static final class BrowserSubscription extends SubscriptionImpl
070     {
071         public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
072                                    AMQShortString consumerTag, FieldTable filters,
073                                    boolean noLocal, FlowCreditManager creditManager,
074                                    ClientDeliveryMethod deliveryMethod,
075                                    RecordDeliveryMethod recordMethod)
076             throws AMQException
077         {
078             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
079         }
080 
081 
082         public boolean isBrowser()
083         {
084             return true;
085         }
086 
087         /**
088          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
089          * thread safe.
090          *
091          @param msg   The message to send
092          @throws AMQException
093          */
094         @Override
095         public void send(QueueEntry msgthrows AMQException
096         {
097             // We don't decrement the reference here as we don't want to consume the message
098             // but we do want to send it to the client.
099 
100             synchronized (getChannel())
101             {
102                 long deliveryTag = getChannel().getNextDeliveryTag();
103                 sendToClient(msg, deliveryTag);
104             }
105 
106         }
107 
108         @Override
109         public boolean wouldSuspend(QueueEntry msg)
110         {
111             return false;
112         }
113 
114     }
115 
116     public static class NoAckSubscription extends SubscriptionImpl
117     {
118         public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
119                                  AMQShortString consumerTag, FieldTable filters,
120                                  boolean noLocal, FlowCreditManager creditManager,
121                                    ClientDeliveryMethod deliveryMethod,
122                                    RecordDeliveryMethod recordMethod)
123             throws AMQException
124         {
125             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
126         }
127 
128 
129         public boolean isBrowser()
130         {
131             return false;
132         }
133 
134         /**
135          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
136          * thread safe.
137          *
138          @param entry   The message to send
139          @throws AMQException
140          */
141         @Override
142         public void send(QueueEntry entrythrows AMQException
143         {
144 
145             StoreContext storeContext = getChannel().getStoreContext();
146             try
147             {
148                 // if we do not need to wait for client acknowledgements
149                 // we can decrement the reference count immediately.
150 
151                 // By doing this _before_ the send we ensure that it
152                 // doesn't get sent if it can't be dequeued, preventing
153                 // duplicate delivery on recovery.
154 
155                 // The send may of course still fail, in which case, as
156                 // the message is unacked, it will be lost.
157                 entry.dequeueAndDelete(storeContext);
158 
159 
160                 synchronized (getChannel())
161                 {
162                     long deliveryTag = getChannel().getNextDeliveryTag();
163 
164                     sendToClient(entry, deliveryTag);
165 
166                 }
167             }
168             finally
169             {
170                 //Only set delivered if it actually was writen successfully..
171                 // using a try->finally would set it even if an error occured.
172                 // Is this what we want?
173 
174                 entry.setDeliveredToSubscription();
175             }
176         }
177 
178         @Override
179         public boolean wouldSuspend(QueueEntry msg)
180         {
181             return false;
182         }
183 
184     }
185 
186     static final class AckSubscription extends SubscriptionImpl
187     {
188         public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
189                                AMQShortString consumerTag, FieldTable filters,
190                                boolean noLocal, FlowCreditManager creditManager,
191                                    ClientDeliveryMethod deliveryMethod,
192                                    RecordDeliveryMethod recordMethod)
193             throws AMQException
194         {
195             super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
196         }
197 
198 
199         public boolean isBrowser()
200         {
201             return false;
202         }
203 
204 
205         /**
206          * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
207          * thread safe.
208          *
209          @param entry   The message to send
210          @throws AMQException
211          */
212         @Override
213         public void send(QueueEntry entrythrows AMQException
214         {
215 
216             try
217             // if we do not need to wait for client acknowledgements
218                 // we can decrement the reference count immediately.
219 
220                 // By doing this _before_ the send we ensure that it
221                 // doesn't get sent if it can't be dequeued, preventing
222                 // duplicate delivery on recovery.
223 
224                 // The send may of course still fail, in which case, as
225                 // the message is unacked, it will be lost.
226 
227                 synchronized (getChannel())
228                 {
229                     long deliveryTag = getChannel().getNextDeliveryTag();
230 
231 
232                     recordMessageDelivery(entry, deliveryTag);
233                     sendToClient(entry, deliveryTag);
234 
235 
236                 }
237             }
238             finally
239             {
240                 //Only set delivered if it actually was writen successfully..
241                 // using a try->finally would set it even if an error occured.
242                 // Is this what we want?
243 
244                 entry.setDeliveredToSubscription();
245             }
246         }
247 
248 
249     }
250 
251 
252     private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
253 
254     private final AMQChannel _channel;
255 
256     private final AMQShortString _consumerTag;
257 
258 
259     private final boolean _noLocal;
260 
261     private final FlowCreditManager _creditManager;
262 
263     private FilterManager _filters;
264 
265     private final Boolean _autoClose;
266 
267 
268     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
269 
270     private AMQQueue _queue;
271     private final AtomicBoolean _deleted = new AtomicBoolean(false);
272 
273 
274 
275     
276     public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
277                             AMQShortString consumerTag, FieldTable arguments,
278                             boolean noLocal, FlowCreditManager creditManager,
279                             ClientDeliveryMethod deliveryMethod,
280                             RecordDeliveryMethod recordMethod)
281             throws AMQException
282     {
283 
284         _channel = channel;
285         _consumerTag = consumerTag;
286 
287         _creditManager = creditManager;
288         creditManager.addStateListener(this);
289 
290         _noLocal = noLocal;
291 
292 
293         _filters = FilterManagerFactory.createManager(arguments);
294 
295         _deliveryMethod = deliveryMethod;
296         _recordMethod = recordMethod;
297 
298 
299         _stateChangeLock = new ReentrantLock();
300 
301 
302         if (arguments != null)
303         {
304             Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
305             if (autoClose != null)
306             {
307                 _autoClose = (BooleanautoClose;
308             }
309             else
310             {
311                 _autoClose = false;
312             }
313         }
314         else
315         {
316             _autoClose = false;
317         }
318 
319         _logger.info(debugIdentity()+" Created subscription:");
320     }
321 
322 
323 
324     public synchronized void setQueue(AMQQueue queue)
325     {
326         if(getQueue() != null)
327         {
328             throw new IllegalStateException("Attempt to set queue for subscription " this " to " + queue + "when already set to " + getQueue());
329         }
330         _queue = queue;
331     }
332 
333     public String toString()
334     {
335         String subscriber = "[channel=" + _channel +
336                             ", consumerTag=" + _consumerTag +
337                             ", session=" + getProtocolSession().getKey()  ;
338 
339         return subscriber + "]";
340     }
341 
342     /**
343      * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
344      * thread safe.
345      *
346      @param msg   The message to send
347      @throws AMQException
348      */
349     abstract public void send(QueueEntry msgthrows AMQException;
350 
351 
352     public boolean isSuspended()
353     {
354         return !isActive() || _channel.isSuspended() || _deleted.get();
355     }
356 
357     /**
358      * Callback indicating that a queue has been deleted.
359      *
360      @param queue The queue to delete
361      */
362     public void queueDeleted(AMQQueue queue)
363     {
364         _deleted.set(true);
365 //        _channel.queueDeleted(queue);
366     }
367 
368     public boolean filtersMessages()
369     {
370         return _filters != null || _noLocal;
371     }
372 
373     public boolean hasInterest(QueueEntry entry)
374     {
375         //check that the message hasn't been rejected
376         if (entry.isRejectedBy(this))
377         {
378             if (_logger.isDebugEnabled())
379             {
380                 _logger.debug("Subscription:" + debugIdentity() " rejected message:" + entry.debugIdentity());
381             }
382 //            return false;
383         }
384 
385 
386 
387         //todo - client id should be recoreded and this test removed but handled below
388         if (_noLocal)
389         {
390             final Object publisherId = entry.getMessage().getPublisherClientInstance();
391 
392             // We don't want local messages so check to see if message is one we sent
393             Object localInstance;
394 
395             if (publisherId != null && (getProtocolSession().getClientProperties() != null&&
396                 (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
397             {
398                 if(publisherId.equals(localInstance))
399                 {
400                     return false;
401                 }
402             }
403             else
404             {
405 
406                 localInstance = getProtocolSession().getClientIdentifier();
407                 //todo - client id should be recoreded and this test removed but handled here
408 
409 
410                 if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
411                 {
412                     return false;
413                 }
414             }
415 
416 
417         }
418 
419 
420         if (_logger.isDebugEnabled())
421         {
422             _logger.debug("(" + debugIdentity() ") checking filters for message (" + entry.debugIdentity());
423         }
424         return checkFilters(entry);
425 
426     }
427 
428     private String id = String.valueOf(System.identityHashCode(this));
429 
430     private String debugIdentity()
431     {
432         return id;
433     }
434 
435     private boolean checkFilters(QueueEntry msg)
436     {
437         return (_filters == null|| _filters.allAllow(msg);
438     }
439 
440     public boolean isAutoClose()
441     {
442         return _autoClose;
443     }
444 
445     public FlowCreditManager getCreditManager()
446     {
447         return _creditManager;
448     }
449 
450 
451     public void close()
452     {
453         boolean closed = false;
454         State state = getState();
455 
456         _stateChangeLock.lock();
457         try
458         {
459             while(!closed && state != State.CLOSED)
460             {
461                 closed = _state.compareAndSet(state, State.CLOSED);
462                 if(!closed)
463                 {
464                     state = getState();
465                 }
466                 else
467                 {
468                     _stateListener.stateChange(this,state, State.CLOSED);
469                 }
470             }
471             _creditManager.removeListener(this);
472         }
473         finally
474         {
475             _stateChangeLock.unlock();
476         }
477 
478 
479         if (closed)
480         {
481             if (_logger.isDebugEnabled())
482             {
483                 _logger.debug("Called close() on a closed subscription");
484             }
485 
486             return;
487         }
488 
489         if (_logger.isInfoEnabled())
490         {
491             _logger.info("Closing subscription (" + debugIdentity() "):" this);
492         }
493     }
494 
495     public boolean isClosed()
496     {
497         return getState() == State.CLOSED;
498     }
499 
500 
501     public boolean wouldSuspend(QueueEntry msg)
502     {
503         return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
504     }
505 
506     public void getSendLock()
507     {
508         _stateChangeLock.lock();
509     }
510 
511     public void releaseSendLock()
512     {
513         _stateChangeLock.unlock();
514     }
515 
516     public void resend(final QueueEntry entrythrows AMQException
517     {
518         _queue.resend(entry, this);
519     }
520 
521     public AMQChannel getChannel()
522     {
523         return _channel;
524     }
525 
526     public AMQShortString getConsumerTag()
527     {
528         return _consumerTag;
529     }
530 
531     public AMQProtocolSession getProtocolSession()
532     {
533         return _channel.getProtocolSession();
534     }
535 
536     public AMQQueue getQueue()
537     {
538         return _queue;        
539     }
540 
541     public void restoreCredit(final QueueEntry queueEntry)
542     {
543         _creditManager.addCredit(1, queueEntry.getSize());
544     }
545 
546 
547     public void creditStateChanged(boolean hasCredit)
548     {
549         
550         if(hasCredit)
551         {
552             if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
553             {
554                 _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
555             }
556             else
557             {
558                 // this is a hack to get round the issue of increasing bytes credit
559                 _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
560             }
561         }
562         else
563         {
564             if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
565             {
566                 _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
567             }
568         }
569     }
570 
571     public State getState()
572     {
573         return _state.get();
574     }
575 
576 
577     public void setStateListener(final StateListener listener)
578     {
579         _stateListener = listener;
580     }
581 
582 
583     public QueueEntry getLastSeenEntry()
584     {
585         return _queueContext.get();
586     }
587 
588     public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
589     {
590         return _queueContext.compareAndSet(expected,newvalue);
591     }
592 
593 
594     protected void sendToClient(final QueueEntry entry, final long deliveryTag)
595             throws AMQException
596     {
597         _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
598         _deliveryMethod.deliverToClient(this,entry,deliveryTag);
599     }
600 
601 
602     protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
603     {
604         _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
605     }
606 
607 
608     public boolean isActive()
609     {
610         return getState() == State.ACTIVE;
611     }
612 
613     public QueueEntry.SubscriptionAcquiredState getOwningState()
614     {
615         return _owningState;
616     }
617 
618 }