MessageFactory.java
001 /*
002  *
003  * Licensed to the Apache Software Foundation (ASF) under one
004  * or more contributor license agreements.  See the NOTICE file
005  * distributed with this work for additional information
006  * regarding copyright ownership.  The ASF licenses this file
007  * to you under the Apache License, Version 2.0 (the
008  * "License"); you may not use this file except in compliance
009  * with the License.  You may obtain a copy of the License at
010  *
011  *   http://www.apache.org/licenses/LICENSE-2.0
012  *
013  * Unless required by applicable law or agreed to in writing,
014  * software distributed under the License is distributed on an
015  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016  * KIND, either express or implied.  See the License for the
017  * specific language governing permissions and limitations
018  * under the License.
019  *
020  */
021 package org.apache.qpid.topic;
022 
023 import javax.jms.*;
024 
025 import org.apache.qpid.client.AMQSession;
026 import org.apache.qpid.client.AMQTopic;
027 import org.apache.qpid.exchange.ExchangeDefaults;
028 
029 /**
030  */
031 class MessageFactory
032 {
033     private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
034 
035     private final Session _session;
036     private final Topic _topic;
037     private final Topic _control;
038     private final byte[] _payload;
039 
040     MessageFactory(Session sessionthrows JMSException
041     {
042         this(session, 256);
043     }
044 
045     MessageFactory(Session session, int sizethrows JMSException
046     {
047         _session = session;
048         if (session instanceof AMQSession)
049         {
050             _topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topic_control");
051             _control = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, "topictest.control");
052         }
053         else
054         {
055             _topic = session.createTopic("topic_control");
056             _control = session.createTopic("topictest.control");
057         }
058 
059         _payload = new byte[size];
060 
061         for (int i = 0; i < size; i++)
062         {
063             _payload[i(byteDATA[i % DATA.length];
064         }
065     }
066 
067     private static boolean checkText(Message m, String s)
068     {
069         try
070         {
071             return (instanceof TextMessage&& ((TextMessagem).getText().equals(s);
072         }
073         catch (JMSException e)
074         {
075             e.printStackTrace(System.out);
076 
077             return false;
078         }
079     }
080 
081     Topic getTopic()
082     {
083         return _topic;
084     }
085 
086     Message createEventMessage() throws JMSException
087     {
088         BytesMessage msg = _session.createBytesMessage();
089         msg.writeBytes(_payload);
090 
091         return msg;
092     }
093 
094     Message createShutdownMessage() throws JMSException
095     {
096         return _session.createTextMessage("SHUTDOWN");
097     }
098 
099     Message createReportRequestMessage() throws JMSException
100     {
101         return _session.createTextMessage("REPORT");
102     }
103 
104     Message createReportResponseMessage(String msgthrows JMSException
105     {
106         return _session.createTextMessage(msg);
107     }
108 
109     boolean isShutdown(Message m)
110     {
111         return checkText(m, "SHUTDOWN");
112     }
113 
114     boolean isReport(Message m)
115     {
116         return checkText(m, "REPORT");
117     }
118 
119     Object getReport(Message m)
120     {
121         try
122         {
123             return ((TextMessagem).getText();
124         }
125         catch (JMSException e)
126         {
127             e.printStackTrace(System.out);
128 
129             return e.toString();
130         }
131     }
132 
133     MessageConsumer createTopicConsumer() throws Exception
134     {
135         return _session.createConsumer(_topic);
136     }
137 
138     MessageConsumer createDurableTopicConsumer(String namethrows Exception
139     {
140         return _session.createDurableSubscriber(_topic, name);
141     }
142 
143     MessageConsumer createControlConsumer() throws Exception
144     {
145         return _session.createConsumer(_control);
146     }
147 
148     MessageProducer createTopicPublisher() throws Exception
149     {
150         return _session.createProducer(_topic);
151     }
152 
153     MessageProducer createControlPublisher() throws Exception
154     {
155         return _session.createProducer(_control);
156     }
157 }