001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements. See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership. The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License. You may obtain a copy of the License at
010 *
011 * http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied. See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.flow;
022
023 import org.apache.qpid.server.queue.AMQMessage;
024
025 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
026 {
027
028 private volatile long _bytesCreditLimit;
029 private volatile long _messageCreditLimit;
030
031 private volatile long _bytesCredit;
032 private volatile long _messageCredit;
033
034 public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
035 {
036 _bytesCreditLimit = bytesCreditLimit;
037 _messageCreditLimit = messageCreditLimit;
038 _bytesCredit = bytesCreditLimit;
039 _messageCredit = messageCreditLimit;
040 }
041
042
043 public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
044 {
045 long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
046 long messageCreditChange = messageCreditLimit - _messageCreditLimit;
047
048
049
050 if(bytesCreditChange != 0L)
051 {
052 if(bytesCreditLimit == 0L)
053 {
054 _bytesCredit = 0;
055 }
056 else
057 {
058 _bytesCredit += bytesCreditChange;
059 }
060 }
061
062
063 if(messageCreditChange != 0L)
064 {
065 if(messageCreditLimit == 0L)
066 {
067 _messageCredit = 0;
068 }
069 else
070 {
071 _messageCredit += messageCreditChange;
072 }
073 }
074
075
076 _bytesCreditLimit = bytesCreditLimit;
077 _messageCreditLimit = messageCreditLimit;
078
079 setSuspended(!hasCredit());
080
081 }
082
083
084 public synchronized void addCredit(final long messageCredit, final long bytesCredit)
085 {
086 final long messageCreditLimit = _messageCreditLimit;
087 boolean notifyIncrease = true;
088 if(messageCreditLimit != 0L)
089 {
090 notifyIncrease = (_messageCredit != 0);
091 long newCredit = _messageCredit + messageCredit;
092 _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
093 }
094
095
096 final long bytesCreditLimit = _bytesCreditLimit;
097 if(bytesCreditLimit != 0L)
098 {
099 long newCredit = _bytesCredit + bytesCredit;
100 _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
101 if(notifyIncrease && bytesCredit>0)
102 {
103 notifyIncreaseBytesCredit();
104 }
105 }
106
107
108
109 setSuspended(!hasCredit());
110
111 }
112
113 public synchronized void removeAllCredit()
114 {
115 _bytesCredit = 0L;
116 _messageCredit = 0L;
117 setSuspended(!hasCredit());
118 }
119
120 public synchronized boolean hasCredit()
121 {
122 return (_bytesCreditLimit == 0L || _bytesCredit > 0)
123 && (_messageCreditLimit == 0L || _messageCredit > 0);
124 }
125
126 public synchronized boolean useCreditForMessage(final AMQMessage msg)
127 {
128 if(_messageCreditLimit != 0L)
129 {
130 if(_messageCredit != 0L)
131 {
132 if(_bytesCreditLimit == 0L)
133 {
134 _messageCredit--;
135
136 return true;
137 }
138 else
139 {
140 if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
141 {
142 _messageCredit--;
143 _bytesCredit -= msg.getSize();
144
145 return true;
146 }
147 else
148 {
149 //setSuspended(true);
150 return false;
151 }
152 }
153 }
154 else
155 {
156 setSuspended(true);
157 return false;
158 }
159 }
160 else
161 {
162 if(_bytesCreditLimit == 0L)
163 {
164
165 return true;
166 }
167 else
168 {
169 if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
170 {
171 _bytesCredit -= msg.getSize();
172
173 return true;
174 }
175 else
176 {
177 //setSuspended(true);
178 return false;
179 }
180 }
181
182 }
183
184 }
185 }
|