001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.subscription;
022
023 import java.util.concurrent.atomic.AtomicBoolean;
024 import java.util.concurrent.atomic.AtomicReference;
025 import java.util.concurrent.locks.Lock;
026 import java.util.concurrent.locks.ReentrantLock;
027
028 import org.apache.log4j.Logger;
029 import org.apache.qpid.AMQException;
030 import org.apache.qpid.common.AMQPFilterTypes;
031 import org.apache.qpid.common.ClientProperties;
032 import org.apache.qpid.framing.AMQShortString;
033 import org.apache.qpid.framing.FieldTable;
034 import org.apache.qpid.server.AMQChannel;
035 import org.apache.qpid.server.queue.QueueEntry;
036 import org.apache.qpid.server.queue.AMQQueue;
037 import org.apache.qpid.server.subscription.Subscription;
038 import org.apache.qpid.server.flow.FlowCreditManager;
039 import org.apache.qpid.server.filter.FilterManager;
040 import org.apache.qpid.server.filter.FilterManagerFactory;
041 import org.apache.qpid.server.protocol.AMQProtocolSession;
042 import org.apache.qpid.server.store.StoreContext;
043
044 /**
045 * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
046 * that was given out by the broker and the channel id. <p/>
047 */
048 public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
049 {
050
051 private StateListener _stateListener = new StateListener()
052 {
053
054 public void stateChange(Subscription sub, State oldState, State newState)
055 {
056
057 }
058 };
059
060
061 private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
062 private final AtomicReference<QueueEntry> _queueContext = new AtomicReference<QueueEntry>(null);
063 private final ClientDeliveryMethod _deliveryMethod;
064 private final RecordDeliveryMethod _recordMethod;
065
066 private QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
067 private final Lock _stateChangeLock;
068
069 static final class BrowserSubscription extends SubscriptionImpl
070 {
071 public BrowserSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
072 AMQShortString consumerTag, FieldTable filters,
073 boolean noLocal, FlowCreditManager creditManager,
074 ClientDeliveryMethod deliveryMethod,
075 RecordDeliveryMethod recordMethod)
076 throws AMQException
077 {
078 super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
079 }
080
081
082 public boolean isBrowser()
083 {
084 return true;
085 }
086
087 /**
088 * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
089 * thread safe.
090 *
091 * @param msg The message to send
092 * @throws AMQException
093 */
094 @Override
095 public void send(QueueEntry msg) throws AMQException
096 {
097 // We don't decrement the reference here as we don't want to consume the message
098 // but we do want to send it to the client.
099
100 synchronized (getChannel())
101 {
102 long deliveryTag = getChannel().getNextDeliveryTag();
103 sendToClient(msg, deliveryTag);
104 }
105
106 }
107
108 @Override
109 public boolean wouldSuspend(QueueEntry msg)
110 {
111 return false;
112 }
113
114 }
115
116 public static class NoAckSubscription extends SubscriptionImpl
117 {
118 public NoAckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
119 AMQShortString consumerTag, FieldTable filters,
120 boolean noLocal, FlowCreditManager creditManager,
121 ClientDeliveryMethod deliveryMethod,
122 RecordDeliveryMethod recordMethod)
123 throws AMQException
124 {
125 super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
126 }
127
128
129 public boolean isBrowser()
130 {
131 return false;
132 }
133
134 /**
135 * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
136 * thread safe.
137 *
138 * @param entry The message to send
139 * @throws AMQException
140 */
141 @Override
142 public void send(QueueEntry entry) throws AMQException
143 {
144
145 StoreContext storeContext = getChannel().getStoreContext();
146 try
147 {
148 // if we do not need to wait for client acknowledgements
149 // we can decrement the reference count immediately.
150
151 // By doing this _before_ the send we ensure that it
152 // doesn't get sent if it can't be dequeued, preventing
153 // duplicate delivery on recovery.
154
155 // The send may of course still fail, in which case, as
156 // the message is unacked, it will be lost.
157 entry.dequeueAndDelete(storeContext);
158
159
160 synchronized (getChannel())
161 {
162 long deliveryTag = getChannel().getNextDeliveryTag();
163
164 sendToClient(entry, deliveryTag);
165
166 }
167 }
168 finally
169 {
170 //Only set delivered if it actually was writen successfully..
171 // using a try->finally would set it even if an error occured.
172 // Is this what we want?
173
174 entry.setDeliveredToSubscription();
175 }
176 }
177
178 @Override
179 public boolean wouldSuspend(QueueEntry msg)
180 {
181 return false;
182 }
183
184 }
185
186 static final class AckSubscription extends SubscriptionImpl
187 {
188 public AckSubscription(AMQChannel channel, AMQProtocolSession protocolSession,
189 AMQShortString consumerTag, FieldTable filters,
190 boolean noLocal, FlowCreditManager creditManager,
191 ClientDeliveryMethod deliveryMethod,
192 RecordDeliveryMethod recordMethod)
193 throws AMQException
194 {
195 super(channel, protocolSession, consumerTag, filters, noLocal, creditManager, deliveryMethod, recordMethod);
196 }
197
198
199 public boolean isBrowser()
200 {
201 return false;
202 }
203
204
205 /**
206 * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
207 * thread safe.
208 *
209 * @param entry The message to send
210 * @throws AMQException
211 */
212 @Override
213 public void send(QueueEntry entry) throws AMQException
214 {
215
216 try
217 { // if we do not need to wait for client acknowledgements
218 // we can decrement the reference count immediately.
219
220 // By doing this _before_ the send we ensure that it
221 // doesn't get sent if it can't be dequeued, preventing
222 // duplicate delivery on recovery.
223
224 // The send may of course still fail, in which case, as
225 // the message is unacked, it will be lost.
226
227 synchronized (getChannel())
228 {
229 long deliveryTag = getChannel().getNextDeliveryTag();
230
231
232 recordMessageDelivery(entry, deliveryTag);
233 sendToClient(entry, deliveryTag);
234
235
236 }
237 }
238 finally
239 {
240 //Only set delivered if it actually was writen successfully..
241 // using a try->finally would set it even if an error occured.
242 // Is this what we want?
243
244 entry.setDeliveredToSubscription();
245 }
246 }
247
248
249 }
250
251
252 private static final Logger _logger = Logger.getLogger(SubscriptionImpl.class);
253
254 private final AMQChannel _channel;
255
256 private final AMQShortString _consumerTag;
257
258
259 private final boolean _noLocal;
260
261 private final FlowCreditManager _creditManager;
262
263 private FilterManager _filters;
264
265 private final Boolean _autoClose;
266
267
268 private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
269
270 private AMQQueue _queue;
271 private final AtomicBoolean _deleted = new AtomicBoolean(false);
272
273
274
275
276 public SubscriptionImpl(AMQChannel channel , AMQProtocolSession protocolSession,
277 AMQShortString consumerTag, FieldTable arguments,
278 boolean noLocal, FlowCreditManager creditManager,
279 ClientDeliveryMethod deliveryMethod,
280 RecordDeliveryMethod recordMethod)
281 throws AMQException
282 {
283
284 _channel = channel;
285 _consumerTag = consumerTag;
286
287 _creditManager = creditManager;
288 creditManager.addStateListener(this);
289
290 _noLocal = noLocal;
291
292
293 _filters = FilterManagerFactory.createManager(arguments);
294
295 _deliveryMethod = deliveryMethod;
296 _recordMethod = recordMethod;
297
298
299 _stateChangeLock = new ReentrantLock();
300
301
302 if (arguments != null)
303 {
304 Object autoClose = arguments.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
305 if (autoClose != null)
306 {
307 _autoClose = (Boolean) autoClose;
308 }
309 else
310 {
311 _autoClose = false;
312 }
313 }
314 else
315 {
316 _autoClose = false;
317 }
318
319 _logger.info(debugIdentity()+" Created subscription:");
320 }
321
322
323
324 public synchronized void setQueue(AMQQueue queue)
325 {
326 if(getQueue() != null)
327 {
328 throw new IllegalStateException("Attempt to set queue for subscription " + this + " to " + queue + "when already set to " + getQueue());
329 }
330 _queue = queue;
331 }
332
333 public String toString()
334 {
335 String subscriber = "[channel=" + _channel +
336 ", consumerTag=" + _consumerTag +
337 ", session=" + getProtocolSession().getKey() ;
338
339 return subscriber + "]";
340 }
341
342 /**
343 * This method can be called by each of the publisher threads. As a result all changes to the channel object must be
344 * thread safe.
345 *
346 * @param msg The message to send
347 * @throws AMQException
348 */
349 abstract public void send(QueueEntry msg) throws AMQException;
350
351
352 public boolean isSuspended()
353 {
354 return !isActive() || _channel.isSuspended() || _deleted.get();
355 }
356
357 /**
358 * Callback indicating that a queue has been deleted.
359 *
360 * @param queue The queue to delete
361 */
362 public void queueDeleted(AMQQueue queue)
363 {
364 _deleted.set(true);
365 // _channel.queueDeleted(queue);
366 }
367
368 public boolean filtersMessages()
369 {
370 return _filters != null || _noLocal;
371 }
372
373 public boolean hasInterest(QueueEntry entry)
374 {
375 //check that the message hasn't been rejected
376 if (entry.isRejectedBy(this))
377 {
378 if (_logger.isDebugEnabled())
379 {
380 _logger.debug("Subscription:" + debugIdentity() + " rejected message:" + entry.debugIdentity());
381 }
382 // return false;
383 }
384
385
386
387 //todo - client id should be recoreded and this test removed but handled below
388 if (_noLocal)
389 {
390 final Object publisherId = entry.getMessage().getPublisherClientInstance();
391
392 // We don't want local messages so check to see if message is one we sent
393 Object localInstance;
394
395 if (publisherId != null && (getProtocolSession().getClientProperties() != null) &&
396 (localInstance = getProtocolSession().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
397 {
398 if(publisherId.equals(localInstance))
399 {
400 return false;
401 }
402 }
403 else
404 {
405
406 localInstance = getProtocolSession().getClientIdentifier();
407 //todo - client id should be recoreded and this test removed but handled here
408
409
410 if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
411 {
412 return false;
413 }
414 }
415
416
417 }
418
419
420 if (_logger.isDebugEnabled())
421 {
422 _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
423 }
424 return checkFilters(entry);
425
426 }
427
428 private String id = String.valueOf(System.identityHashCode(this));
429
430 private String debugIdentity()
431 {
432 return id;
433 }
434
435 private boolean checkFilters(QueueEntry msg)
436 {
437 return (_filters == null) || _filters.allAllow(msg);
438 }
439
440 public boolean isAutoClose()
441 {
442 return _autoClose;
443 }
444
445 public FlowCreditManager getCreditManager()
446 {
447 return _creditManager;
448 }
449
450
451 public void close()
452 {
453 boolean closed = false;
454 State state = getState();
455
456 _stateChangeLock.lock();
457 try
458 {
459 while(!closed && state != State.CLOSED)
460 {
461 closed = _state.compareAndSet(state, State.CLOSED);
462 if(!closed)
463 {
464 state = getState();
465 }
466 else
467 {
468 _stateListener.stateChange(this,state, State.CLOSED);
469 }
470 }
471 _creditManager.removeListener(this);
472 }
473 finally
474 {
475 _stateChangeLock.unlock();
476 }
477
478
479 if (closed)
480 {
481 if (_logger.isDebugEnabled())
482 {
483 _logger.debug("Called close() on a closed subscription");
484 }
485
486 return;
487 }
488
489 if (_logger.isInfoEnabled())
490 {
491 _logger.info("Closing subscription (" + debugIdentity() + "):" + this);
492 }
493 }
494
495 public boolean isClosed()
496 {
497 return getState() == State.CLOSED;
498 }
499
500
501 public boolean wouldSuspend(QueueEntry msg)
502 {
503 return !_creditManager.useCreditForMessage(msg.getMessage());//_channel.wouldSuspend(msg.getMessage());
504 }
505
506 public void getSendLock()
507 {
508 _stateChangeLock.lock();
509 }
510
511 public void releaseSendLock()
512 {
513 _stateChangeLock.unlock();
514 }
515
516 public void resend(final QueueEntry entry) throws AMQException
517 {
518 _queue.resend(entry, this);
519 }
520
521 public AMQChannel getChannel()
522 {
523 return _channel;
524 }
525
526 public AMQShortString getConsumerTag()
527 {
528 return _consumerTag;
529 }
530
531 public AMQProtocolSession getProtocolSession()
532 {
533 return _channel.getProtocolSession();
534 }
535
536 public AMQQueue getQueue()
537 {
538 return _queue;
539 }
540
541 public void restoreCredit(final QueueEntry queueEntry)
542 {
543 _creditManager.addCredit(1, queueEntry.getSize());
544 }
545
546
547 public void creditStateChanged(boolean hasCredit)
548 {
549
550 if(hasCredit)
551 {
552 if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
553 {
554 _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
555 }
556 else
557 {
558 // this is a hack to get round the issue of increasing bytes credit
559 _stateListener.stateChange(this, State.ACTIVE, State.ACTIVE);
560 }
561 }
562 else
563 {
564 if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
565 {
566 _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
567 }
568 }
569 }
570
571 public State getState()
572 {
573 return _state.get();
574 }
575
576
577 public void setStateListener(final StateListener listener)
578 {
579 _stateListener = listener;
580 }
581
582
583 public QueueEntry getLastSeenEntry()
584 {
585 return _queueContext.get();
586 }
587
588 public boolean setLastSeenEntry(QueueEntry expected, QueueEntry newvalue)
589 {
590 return _queueContext.compareAndSet(expected,newvalue);
591 }
592
593
594 protected void sendToClient(final QueueEntry entry, final long deliveryTag)
595 throws AMQException
596 {
597 _logger.info("Sending Message(" + entry + ") DTag:" + deliveryTag + " to subscription:" + debugIdentity());
598 _deliveryMethod.deliverToClient(this,entry,deliveryTag);
599 }
600
601
602 protected void recordMessageDelivery(final QueueEntry entry, final long deliveryTag)
603 {
604 _recordMethod.recordMessageDelivery(this,entry,deliveryTag);
605 }
606
607
608 public boolean isActive()
609 {
610 return getState() == State.ACTIVE;
611 }
612
613 public QueueEntry.SubscriptionAcquiredState getOwningState()
614 {
615 return _owningState;
616 }
617
618 }
|