MessageTokenizer.java
001 /*
002 *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 
022 package org.apache.qpid.management.domain.services;
023 
024 import java.io.IOException;
025 import java.util.Enumeration;
026 import java.util.Iterator;
027 import java.util.LinkedList;
028 
029 import org.apache.qpid.api.Message;
030 import org.apache.qpid.management.Messages;
031 import org.apache.qpid.management.Protocol;
032 import org.apache.qpid.nclient.util.ByteBufferMessage;
033 import org.apache.qpid.transport.codec.BBDecoder;
034 import org.apache.qpid.transport.util.Logger;
035 
036 /**
037  * The message tokenizer class allows a multi message listener to break a 
038  * message into tokens where each token is itself a valid AMQP message.
039  
040  @author  Andrea Gazzarini
041  @see QPID-1368
042  */
043 class MessageTokenizer implements Enumeration<Message> 
044 {
045   private final static Logger LOGGER = Logger.get(MessageTokenizer.class);
046   
047   static byte [] MAGIC_NUMBER_BYTES;
048   
049   private LinkedList<Message> _messages = new LinkedList<Message>();
050   private Iterator<Message> _iterator;
051   
052   static 
053   {
054     try 
055     {
056       MAGIC_NUMBER_BYTES = Protocol.MAGIC_NUMBER.getBytes("UTF-8");
057     catch(Exception exception
058     {
059       throw new ExceptionInInitializerError(exception);
060     }
061   }
062   
063   /**
064    * Builds a new Message tokenizer with the given message.
065    * Note that if the given message is not a "compound" message this tokenizer will producer only one token; 
066    * That is, the token is a message equals to the given message.
067    
068    @param compoundMessage the compound message
069    @throws IOException when it's not possible to read the given message content.
070    */
071   MessageTokenizer(Message compoundMessagethrows IOException
072   {
073     build(compoundMessage);
074   }
075   
076   public boolean hasMoreElements() 
077   {
078     return _iterator.hasNext();
079   }
080 
081   public Message nextElement() 
082   {
083     return _iterator.next();
084   }
085 
086   /**
087    * Retruns the number of the tokens produced by this tokenizer.
088    
089    @return the number of the tokens produced by this tokenizer.
090    */
091   public int countTokens() 
092   {
093     return _messages.size();
094   }
095 
096   // Internal methods used for splitting the multi message byte array.
097   int indexOf(byte[] source, int startIndex
098   {
099     int currentSourceIndex; 
100     int currentExampleIndex;
101     
102     if (startIndex + > source.length)
103       return -1;
104 
105     for (currentSourceIndex = startIndex; currentSourceIndex <= source.length - 3; currentSourceIndex++
106     {
107       for (currentExampleIndex = 0; currentExampleIndex < 3; currentExampleIndex++
108       {
109         if (source[currentSourceIndex + currentExampleIndex!= MAGIC_NUMBER_BYTES[currentExampleIndex])
110           break;
111       }
112       
113       if (currentExampleIndex == 3)
114         return currentSourceIndex;
115     }
116     return -1;
117   }  
118   
119   // Internal method used for building the tokens.
120   private void build(Message compoundMessagethrows IOException 
121   {      
122     int startIndex = 0;
123     int indexOfMagicNumber = 0;
124     
125     BBDecoder decoder = new BBDecoder();
126     decoder.init(compoundMessage.readData());
127     byte [] source = decoder.readReaminingBytes();      
128     
129     int howManyTokens = 1;
130     
131     while ((indexOfMagicNumber = indexOf(source, startIndex+1)) != -1)
132     {
133       addMessageToken(source, startIndex, (indexOfMagicNumber-startIndex));
134       startIndex = indexOfMagicNumber;
135       howManyTokens++;
136     }
137     addMessageToken(source, startIndex, (source.length-startIndex));
138     _iterator = _messages.iterator();
139     
140     LOGGER.debug(Messages.QMAN_200031_COMPOUND_MESSAGE_CONTAINS,howManyTokens);
141   };
142 
143   // Builds & adds a new "message" token
144   private void addMessageToken(byte [] source,int startIndex,int lengththrows IOException
145   {
146     byte [] messageData = new byte[length];
147     System.arraycopy(source, startIndex, messageData, 0, messageData.length);
148     Message message = new ByteBufferMessage();
149     message.appendData(messageData);
150     _messages.add(message);    
151   }
152 }