01 package org.apache.qpid.server.flow;
02
03 import org.apache.qpid.server.queue.AMQMessage;
04
05 import java.util.concurrent.atomic.AtomicLong;
06
07 /*
08 *
09 * Licensed to the Apache Software Foundation (ASF) under one
10 * or more contributor license agreements. See the NOTICE file
11 * distributed with this work for additional information
12 * regarding copyright ownership. The ASF licenses this file
13 * to you under the Apache License, Version 2.0 (the
14 * "License"); you may not use this file except in compliance
15 * with the License. You may obtain a copy of the License at
16 *
17 * http://www.apache.org/licenses/LICENSE-2.0
18 *
19 * Unless required by applicable law or agreed to in writing,
20 * software distributed under the License is distributed on an
21 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 * KIND, either express or implied. See the License for the
23 * specific language governing permissions and limitations
24 * under the License.
25 *
26 */
27 public class BytesOnlyCreditManager extends AbstractFlowCreditManager
28 {
29 private final AtomicLong _bytesCredit;
30
31 public BytesOnlyCreditManager(long initialCredit)
32 {
33 _bytesCredit = new AtomicLong(initialCredit);
34 }
35
36 public void addCredit(long messageCredit, long bytesCredit)
37 {
38 _bytesCredit.addAndGet(bytesCredit);
39 setSuspended(false);
40 }
41
42 public void removeAllCredit()
43 {
44 _bytesCredit.set(0L);
45 }
46
47 public boolean hasCredit()
48 {
49 return _bytesCredit.get() > 0L;
50 }
51
52 public boolean useCreditForMessage(AMQMessage msg)
53 {
54 final long msgSize = msg.getSize();
55 if(hasCredit())
56 {
57 if(_bytesCredit.addAndGet(-msgSize) >= 0)
58 {
59 return true;
60 }
61 else
62 {
63 _bytesCredit.addAndGet(msgSize);
64 setSuspended(true);
65 return false;
66 }
67 }
68 else
69 {
70 return false;
71 }
72
73 }
74 }
|