001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.management.domain.handler.impl;
022
023 import java.util.HashMap;
024 import java.util.Map;
025 import java.util.concurrent.BlockingQueue;
026
027 import org.apache.qpid.management.Messages;
028 import org.apache.qpid.management.domain.handler.base.BaseMessageHandler;
029 import org.apache.qpid.management.domain.model.DomainModel;
030 import org.apache.qpid.management.domain.model.InvocationEvent;
031 import org.apache.qpid.transport.codec.Decoder;
032 import org.apache.qpid.transport.util.Logger;
033
034 /**
035 * Message handler for method response messages.
036 * This handler is installed on domain model as a method invocation result listener.
037 * When a method is going to be invoked this listener is notified with the exchange channel that will be used between it and
038 * the event (method invocation) source object.
039 *
040 * @author Andrea Gazzarini
041 *
042 */
043 public class MethodResponseMessageHandler extends BaseMessageHandler
044 {
045 private final static Logger LOGGER = Logger.get(MethodResponseMessageHandler.class);
046
047 private Map<Integer, BlockingQueue<InvocationResult>> _exchangeChannels = new HashMap<Integer, BlockingQueue<InvocationResult>>();
048
049 /**
050 * This is the listener installed on domain model for method invocations.
051 */
052 private final IMethodInvocationListener methodInvocationListener = new IMethodInvocationListener()
053 {
054 /**
055 * Event source callback.
056 * A method is going to be invoked and this method lets this listener take the exchange channel that will be used
057 * with the event source for synchronous communication.
058 *
059 * @param event the operation invocation event.
060 */
061 public void operationIsGoingToBeInvoked (InvocationEvent event)
062 {
063 _exchangeChannels.put(event.getSequenceNumber(), event.getExchangeChannel());
064 }
065 };
066
067 /**
068 * Processes the incoming message.
069 *
070 * @param decoder the decoder used for parsing incoming data.
071 * @param sequenceNumber the sequence number of the incoming message.
072 */
073 public void process (Decoder decoder, int sequenceNumber)
074 {
075 InvocationResult result = new InvocationResult(decoder.readUint32(), decoder.readStr16(),decoder.readReaminingBytes());
076 BlockingQueue<InvocationResult> exchangeChannel = _exchangeChannels.remove(sequenceNumber);
077 if (exchangeChannel != null)
078 {
079 try
080 {
081 exchangeChannel.put(result);
082 } catch (InterruptedException exception)
083 {
084 LOGGER.error(exception,Messages.QMAN_100010_METHOD_INVOCATION_RESULT_FAILURE,sequenceNumber);
085 }
086 } else
087 {
088 LOGGER.warn(
089 "Unable to deal with incoming message because it contains a unknown sequence number (%s).",
090 sequenceNumber);
091 }
092 }
093
094 /**
095 * Sets the domain model on this handler.
096 * In addiction, this handler registers a method invocation listener on the domain model.
097 *
098 * @param domainModel the managed broker domain model.
099 */
100 @Override
101 public void setDomainModel (DomainModel domainModel)
102 {
103 super.setDomainModel(domainModel);
104 domainModel.setMethodInvocationListener(methodInvocationListener);
105 }
106 }
|