BytesOnlyCreditManager.java
01 package org.apache.qpid.server.flow;
02 
03 import org.apache.qpid.server.queue.AMQMessage;
04 
05 import java.util.concurrent.atomic.AtomicLong;
06 
07 /*
08 *
09 * Licensed to the Apache Software Foundation (ASF) under one
10 * or more contributor license agreements.  See the NOTICE file
11 * distributed with this work for additional information
12 * regarding copyright ownership.  The ASF licenses this file
13 * to you under the Apache License, Version 2.0 (the
14 * "License"); you may not use this file except in compliance
15 * with the License.  You may obtain a copy of the License at
16 *
17 *   http://www.apache.org/licenses/LICENSE-2.0
18 *
19 * Unless required by applicable law or agreed to in writing,
20 * software distributed under the License is distributed on an
21 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 * KIND, either express or implied.  See the License for the
23 * specific language governing permissions and limitations
24 * under the License.
25 *
26 */
27 public class BytesOnlyCreditManager extends AbstractFlowCreditManager
28 {
29     private final AtomicLong _bytesCredit;
30 
31     public BytesOnlyCreditManager(long initialCredit)
32     {
33         _bytesCredit = new AtomicLong(initialCredit);
34     }
35 
36     public void addCredit(long messageCredit, long bytesCredit)
37     {
38         _bytesCredit.addAndGet(bytesCredit);
39         setSuspended(false);
40     }
41 
42     public void removeAllCredit()
43     {
44         _bytesCredit.set(0L);
45     }
46 
47     public boolean hasCredit()
48     {
49         return _bytesCredit.get() 0L;
50     }
51 
52     public boolean useCreditForMessage(AMQMessage msg)
53     {
54         final long msgSize = msg.getSize();
55         if(hasCredit())
56         {
57             if(_bytesCredit.addAndGet(-msgSize>= 0)
58             {
59                 return true;
60             }
61             else
62             {
63                 _bytesCredit.addAndGet(msgSize);
64                 setSuspended(true);
65                 return false;
66             }
67         }
68         else
69         {
70             return false;
71         }
72 
73     }
74 }