001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.client;
022
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025 import org.apache.qpid.AMQException;
026
027 import javax.jms.IllegalStateException;
028 import javax.jms.JMSException;
029 import javax.jms.Message;
030 import javax.jms.Queue;
031 import javax.jms.QueueBrowser;
032
033 import java.util.ArrayList;
034 import java.util.Enumeration;
035 import java.util.concurrent.atomic.AtomicBoolean;
036
037 public class AMQQueueBrowser implements QueueBrowser
038 {
039 private static final Logger _logger = LoggerFactory.getLogger(AMQQueueBrowser.class);
040
041 private AtomicBoolean _isClosed = new AtomicBoolean();
042 private final AMQSession _session;
043 private final AMQQueue _queue;
044 private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>();
045 private final String _messageSelector;
046
047 AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelector) throws JMSException
048 {
049 _session = session;
050 _queue = queue;
051 _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
052 // Create Consumer to verify message selector.
053 BasicMessageConsumer consumer =
054 (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
055 // Close this consumer as we are not looking to consume only to establish that, at least for now,
056 // the QB can be created
057 consumer.close();
058 }
059
060 public Queue getQueue() throws JMSException
061 {
062 checkState();
063
064 return _queue;
065 }
066
067 private void checkState() throws JMSException
068 {
069 if (_isClosed.get())
070 {
071 throw new IllegalStateException("Queue Browser");
072 }
073
074 if (_session.isClosed())
075 {
076 throw new IllegalStateException("Session is closed");
077 }
078
079 }
080
081 public String getMessageSelector() throws JMSException
082 {
083
084 checkState();
085
086 return _messageSelector;
087 }
088
089 public Enumeration getEnumeration() throws JMSException
090 {
091 checkState();
092 final BasicMessageConsumer consumer =
093 (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
094
095 _consumers.add(consumer);
096
097 return new Enumeration()
098 {
099
100 Message _nextMessage = consumer == null ? null : consumer.receive(1000);
101
102 public boolean hasMoreElements()
103 {
104 _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
105 return (_nextMessage != null);
106 }
107
108 public Object nextElement()
109 {
110 Message msg = _nextMessage;
111 try
112 {
113 _logger.info("QB:nextElement about to receive");
114 _nextMessage = consumer.receive(1000);
115 _logger.info("QB:nextElement received:" + _nextMessage);
116 }
117 catch (JMSException e)
118 {
119 _logger.warn("Exception caught while queue browsing", e);
120 _nextMessage = null;
121 }
122 return msg;
123 }
124 };
125 }
126
127 public void close() throws JMSException
128 {
129 for (BasicMessageConsumer consumer : _consumers)
130 {
131 consumer.close();
132 }
133
134 _consumers.clear();
135 }
136
137 }
|