01 package org.apache.qpid.server.flow;
02
03 import org.apache.qpid.server.queue.AMQMessage;
04
05 import java.util.concurrent.atomic.AtomicLong;
06
07 /*
08 *
09 * Licensed to the Apache Software Foundation (ASF) under one
10 * or more contributor license agreements. See the NOTICE file
11 * distributed with this work for additional information
12 * regarding copyright ownership. The ASF licenses this file
13 * to you under the Apache License, Version 2.0 (the
14 * "License"); you may not use this file except in compliance
15 * with the License. You may obtain a copy of the License at
16 *
17 * http://www.apache.org/licenses/LICENSE-2.0
18 *
19 * Unless required by applicable law or agreed to in writing,
20 * software distributed under the License is distributed on an
21 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22 * KIND, either express or implied. See the License for the
23 * specific language governing permissions and limitations
24 * under the License.
25 *
26 */
27 public class MessageOnlyCreditManager extends AbstractFlowCreditManager implements FlowCreditManager
28 {
29 private final AtomicLong _messageCredit;
30
31 public MessageOnlyCreditManager(final long initialCredit)
32 {
33 _messageCredit = new AtomicLong(initialCredit);
34 }
35
36 public void addCredit(long messageCredit, long bytesCredit)
37 {
38 setSuspended(false);
39 _messageCredit.addAndGet(messageCredit);
40 }
41
42 public void removeAllCredit()
43 {
44 setSuspended(true);
45 _messageCredit.set(0L);
46 }
47
48 public boolean hasCredit()
49 {
50 return _messageCredit.get() > 0L;
51 }
52
53 public boolean useCreditForMessage(AMQMessage msg)
54 {
55 if(hasCredit())
56 {
57 if(_messageCredit.addAndGet(-1L) >= 0)
58 {
59 setSuspended(false);
60 return true;
61 }
62 else
63 {
64 _messageCredit.addAndGet(1L);
65 setSuspended(true);
66 return false;
67 }
68 }
69 else
70 {
71 setSuspended(true);
72 return false;
73 }
74
75 }
76 }
|