MessageAndBytesCreditManager.java
01 package org.apache.qpid.server.flow;
02 
03 import org.apache.qpid.server.queue.AMQMessage;
04 
05 /*
06 *
07 * Licensed to the Apache Software Foundation (ASF) under one
08 * or more contributor license agreements.  See the NOTICE file
09 * distributed with this work for additional information
10 * regarding copyright ownership.  The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License.  You may obtain a copy of the License at
14 *
15 *   http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing,
18 * software distributed under the License is distributed on an
19 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 * KIND, either express or implied.  See the License for the
21 * specific language governing permissions and limitations
22 * under the License.
23 *
24 */
25 public class MessageAndBytesCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
26 {
27     private long _messageCredit;
28     private long _bytesCredit;
29 
30     MessageAndBytesCreditManager(final long messageCredit, final long bytesCredit)
31     {
32         _messageCredit = messageCredit;
33         _bytesCredit = bytesCredit;
34     }
35 
36     public synchronized void addCredit(long messageCredit, long bytesCredit)
37     {
38         _messageCredit += messageCredit;
39         _bytesCredit += bytesCredit;
40         setSuspended(hasCredit());
41     }
42 
43     public synchronized void removeAllCredit()
44     {
45         _messageCredit = 0L;
46         _bytesCredit = 0L;
47         setSuspended(true);
48     }
49 
50     public synchronized boolean hasCredit()
51     {
52         return (_messageCredit > 0L&& _bytesCredit > 0L );
53     }
54 
55     public synchronized boolean useCreditForMessage(AMQMessage msg)
56     {
57         if(_messageCredit == 0L)
58         {
59             setSuspended(true);
60             return false;
61         }
62         else
63         {
64             final long msgSize = msg.getSize();
65             if(msgSize > _bytesCredit)
66             {
67                 setSuspended(true);
68                 return false;
69             }
70             _messageCredit--;
71             _bytesCredit -= msgSize;
72             setSuspended(false);
73             return true;
74         }
75         
76     }
77 }