AMQQueueBrowser.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.client;
022 
023 import org.slf4j.Logger;
024 import org.slf4j.LoggerFactory;
025 import org.apache.qpid.AMQException;
026 
027 import javax.jms.IllegalStateException;
028 import javax.jms.JMSException;
029 import javax.jms.Message;
030 import javax.jms.Queue;
031 import javax.jms.QueueBrowser;
032 
033 import java.util.ArrayList;
034 import java.util.Enumeration;
035 import java.util.concurrent.atomic.AtomicBoolean;
036 
037 public class AMQQueueBrowser implements QueueBrowser
038 {
039     private static final Logger _logger = LoggerFactory.getLogger(AMQQueueBrowser.class);
040 
041     private AtomicBoolean _isClosed = new AtomicBoolean();
042     private final AMQSession _session;
043     private final AMQQueue _queue;
044     private final ArrayList<BasicMessageConsumer> _consumers = new ArrayList<BasicMessageConsumer>();
045     private final String _messageSelector;
046 
047     AMQQueueBrowser(AMQSession session, AMQQueue queue, String messageSelectorthrows JMSException
048     {
049         _session = session;
050         _queue = queue;
051         _messageSelector = ((messageSelector == null|| (messageSelector.trim().length() == 0)) null : messageSelector;
052         // Create Consumer to verify message selector.
053         BasicMessageConsumer consumer =
054                 (BasicMessageConsumer_session.createBrowserConsumer(_queue, _messageSelector, false);
055         // Close this consumer as we are not looking to consume only to establish that, at least for now,
056         // the QB can be created
057         consumer.close();
058     }
059 
060     public Queue getQueue() throws JMSException
061     {
062         checkState();
063 
064         return _queue;
065     }
066 
067     private void checkState() throws JMSException
068     {
069         if (_isClosed.get())
070         {
071             throw new IllegalStateException("Queue Browser");
072         }
073 
074         if (_session.isClosed())
075         {
076             throw new IllegalStateException("Session is closed");
077         }
078 
079     }
080 
081     public String getMessageSelector() throws JMSException
082     {
083 
084         checkState();
085 
086         return _messageSelector;
087     }
088 
089     public Enumeration getEnumeration() throws JMSException
090     {
091         checkState();
092         final BasicMessageConsumer consumer =
093             (BasicMessageConsumer_session.createBrowserConsumer(_queue, _messageSelector, false);
094 
095         _consumers.add(consumer);
096 
097         return new Enumeration()
098         {
099 
100             Message _nextMessage = consumer == null null : consumer.receive(1000);
101 
102             public boolean hasMoreElements()
103             {
104                 _logger.info("QB:hasMoreElements:" (_nextMessage != null));
105                 return (_nextMessage != null);
106             }
107 
108             public Object nextElement()
109             {
110                 Message msg = _nextMessage;
111                 try
112                 {
113                     _logger.info("QB:nextElement about to receive");
114                     _nextMessage = consumer.receive(1000);
115                     _logger.info("QB:nextElement received:" + _nextMessage);
116                 }
117                 catch (JMSException e)
118                 {
119                     _logger.warn("Exception caught while queue browsing", e);
120                     _nextMessage = null;
121                 }
122                 return msg;
123             }
124         };
125     }
126 
127     public void close() throws JMSException
128     {
129         for (BasicMessageConsumer consumer : _consumers)
130         {
131             consumer.close();
132         }
133 
134         _consumers.clear();
135     }
136 
137 }