MessageOnlyCreditManager.java
01 package org.apache.qpid.server.flow;
02 
03 import org.apache.qpid.server.queue.AMQMessage;
04 
05 import java.util.concurrent.atomic.AtomicLong;
06 
07 /*
08 *
09 * Licensed to the Apache Software Foundation (ASF) under one
10 * or more contributor license agreements.  See the NOTICE file
11 * distributed with this work for additional information
12 * regarding copyright ownership.  The ASF licenses this file
13 * to you under the Apache License, Version 2.0 (the
14 * "License"); you may not use this file except in compliance
15 * with the License.  You may obtain a copy of the License at
16 *
17 *   http://www.apache.org/licenses/LICENSE-2.0
18 *
19 * Unless required by applicable law or agreed to in writing,
20 * software distributed under the License is distributed on an
21 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 * KIND, either express or implied.  See the License for the
23 * specific language governing permissions and limitations
24 * under the License.
25 *
26 */
27 public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
28 {
29     private final AtomicLong _messageCredit;
30 
31     public MessageOnlyCreditManager(final long initialCredit)
32     {
33         _messageCredit = new AtomicLong(initialCredit);
34     }
35 
36     public void addCredit(long messageCredit, long bytesCredit)
37     {
38         setSuspended(false);
39         _messageCredit.addAndGet(messageCredit);
40     }
41 
42     public void removeAllCredit()
43     {
44         setSuspended(true);
45         _messageCredit.set(0L);
46     }
47 
48     public boolean hasCredit()
49     {
50         return _messageCredit.get() 0L;
51     }
52 
53     public boolean useCreditForMessage(AMQMessage msg)
54     {
55         if(hasCredit())
56         {
57             if(_messageCredit.addAndGet(-1L>= 0)
58             {
59                 setSuspended(false);
60                 return true;
61             }
62             else
63             {
64                 _messageCredit.addAndGet(1L);
65                 setSuspended(true);
66                 return false;
67             }
68         }
69         else
70         {
71             setSuspended(true);
72             return false;
73         }
74                 
75     }
76 }