DiagnosticExchange.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.extras.exchanges.diagnostic;
022 
023 import java.util.List;
024 import java.util.Map;
025 import java.util.ArrayList;
026 import java.util.Collection;
027 
028 import javax.management.JMException;
029 import javax.management.openmbean.OpenDataException;
030 import javax.management.openmbean.TabularData;
031 
032 import org.apache.qpid.AMQException;
033 import org.apache.qpid.framing.AMQShortString;
034 import org.apache.qpid.framing.BasicContentHeaderProperties;
035 import org.apache.qpid.framing.FieldTable;
036 import org.apache.qpid.server.exchange.AbstractExchange;
037 import org.apache.qpid.server.management.MBeanConstructor;
038 import org.apache.qpid.server.management.MBeanDescription;
039 import org.apache.qpid.server.queue.IncomingMessage;
040 import org.apache.qpid.server.queue.AMQQueue;
041 
042 import org.apache.qpid.junit.extensions.util.SizeOf;
043 
044 /**
045  
046  * This is a special diagnostic exchange type which doesn't actually do anything
047  * with messages. When it receives a message, it writes information about the
048  * current memory usage to the "memory" property of the message and places it on the
049  * diagnosticqueue for retrieval 
050  
051  @author Aidan Skinner
052  
053  */
054 
055 public class DiagnosticExchange extends AbstractExchange
056 {
057    
058     public static final AMQShortString DIAGNOSTIC_EXCHANGE_CLASS = new AMQShortString("x-diagnostic");
059     public static final AMQShortString DIAGNOSTIC_EXCHANGE_NAME = new AMQShortString("diagnostic");
060 
061     /**
062      * the logger.
063      */
064    //private static final Logger _logger = Logger.getLogger(DiagnosticExchange.class);
065 
066     /**
067      * MBean class implementing the management interfaces.
068      */
069     @MBeanDescription("Management Bean for Diagnostic Exchange")
070     private final class DiagnosticExchangeMBean extends ExchangeMBean
071     {
072 
073         /**
074          * Usual constructor.
075          
076          @throws JMException
077          */
078         @MBeanConstructor("Creates an MBean for AMQ Diagnostic exchange")
079         public DiagnosticExchangeMBean() throws JMException
080         {
081             super();
082             _exchangeType = "diagnostic";
083             init();
084         }
085 
086         /**
087          * Returns nothing, there can be no tabular data for this...
088          
089          @throws OpenDataException
090          * @returns null
091          * @todo ... or can there? Could this actually return all the
092          *       information in one easy to read table?
093          */
094         public TabularData bindings() throws OpenDataException
095         {
096             return null;
097         }
098 
099         /**
100          * This exchange type doesn't support queues, so this method does
101          * nothing.
102          
103          @param queueName
104          *            the queue you'll fail to create
105          @param binding
106          *            the binding you'll fail to create
107          @throws JMException
108          *             an exception that will never be thrown
109          */
110         public void createNewBinding(String queueName, String bindingthrows JMException
111         {
112             // No Op
113         }
114 
115     // End of MBean class
116 
117     /**
118      * Creates a new MBean instance
119      
120      @return the newly created MBean
121      @throws AMQException
122      *             if something goes wrong
123      */
124     protected ExchangeMBean createMBean() throws AMQException
125     {
126         try
127         {
128             return new DiagnosticExchange.DiagnosticExchangeMBean();
129         }
130         catch (JMException ex)
131         {
132          //   _logger.error("Exception occured in creating the direct exchange mbean", ex);
133             throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
134         }
135     }
136 
137     public AMQShortString getType()
138     {
139         return DIAGNOSTIC_EXCHANGE_CLASS;
140     }
141 
142     /**
143      * Does nothing.
144      
145      @param routingKey
146      *            pointless
147      @param queue
148      *            pointless
149      @param args
150      *            pointless
151      @throws AMQException
152      *             never
153      */
154     public void registerQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
155     {
156         // No op
157     }
158 
159     /**
160      * Does nothing.
161      
162      @param routingKey
163      *            pointless
164      @param queue
165      *            pointless
166      @param args
167      *            pointless
168      @throws AMQException
169      *             never
170      */
171     public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable argsthrows AMQException
172     {
173         // No op
174     }
175 
176     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
177     {
178         return false;
179     }
180 
181     public boolean isBound(AMQShortString routingKey)
182     {
183         return false;
184     }
185 
186     public boolean isBound(AMQQueue queue)
187     {
188         return false;
189     }
190 
191     public boolean hasBindings()
192     {
193         return false;
194     }
195 
196     public void route(IncomingMessage payloadthrows AMQException
197     {
198         
199         Long value = new Long(SizeOf.getUsedMemory());
200         AMQShortString key = new AMQShortString("memory");
201         
202         FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
203         headers.put(key, value);
204         ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
205         AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
206 
207         ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();
208         queues.add(q);
209         payload.enqueue(queues);
210         
211     }
212 
213   
214   public boolean isBound(AMQShortString routingKey, FieldTable arguments,
215       AMQQueue queue) {
216     // TODO Auto-generated method stub
217     return false;
218   }
219 }