Pre0_10CreditManager.java
001 /*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *   http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing,
014 * software distributed under the License is distributed on an
015 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
016 * KIND, either express or implied.  See the License for the
017 * specific language governing permissions and limitations
018 * under the License.
019 *
020 */
021 package org.apache.qpid.server.flow;
022 
023 import org.apache.qpid.server.queue.AMQMessage;
024 
025 public class Pre0_10CreditManager extends AbstractFlowCreditManager implements FlowCreditManager
026 {
027 
028     private volatile long _bytesCreditLimit;
029     private volatile long _messageCreditLimit;
030 
031     private volatile long _bytesCredit;
032     private volatile long _messageCredit;
033 
034     public Pre0_10CreditManager(long bytesCreditLimit, long messageCreditLimit)
035     {
036         _bytesCreditLimit = bytesCreditLimit;
037         _messageCreditLimit = messageCreditLimit;
038         _bytesCredit = bytesCreditLimit;
039         _messageCredit = messageCreditLimit;
040     }
041 
042 
043     public synchronized void setCreditLimits(final long bytesCreditLimit, final long messageCreditLimit)
044     {
045         long bytesCreditChange = bytesCreditLimit - _bytesCreditLimit;
046         long messageCreditChange = messageCreditLimit - _messageCreditLimit;
047 
048 
049 
050         if(bytesCreditChange != 0L)
051         {
052             if(bytesCreditLimit == 0L)
053             {
054                 _bytesCredit = 0;
055             }
056             else
057             {
058                 _bytesCredit += bytesCreditChange;
059             }
060         }
061 
062 
063         if(messageCreditChange != 0L)
064         {
065             if(messageCreditLimit == 0L)
066             {
067                 _messageCredit = 0;
068             }
069             else
070             {
071                 _messageCredit += messageCreditChange;
072             }
073         }
074 
075 
076         _bytesCreditLimit = bytesCreditLimit;
077         _messageCreditLimit = messageCreditLimit;
078 
079         setSuspended(!hasCredit());
080 
081     }
082 
083 
084     public synchronized void addCredit(final long messageCredit, final long bytesCredit)
085     {
086         final long messageCreditLimit = _messageCreditLimit;
087         boolean notifyIncrease = true;
088         if(messageCreditLimit != 0L)
089         {
090             notifyIncrease = (_messageCredit != 0);
091             long newCredit = _messageCredit + messageCredit;
092             _messageCredit = newCredit > messageCreditLimit ? messageCreditLimit : newCredit;
093         }
094 
095 
096         final long bytesCreditLimit = _bytesCreditLimit;
097         if(bytesCreditLimit != 0L)
098         {
099             long newCredit = _bytesCredit + bytesCredit;
100             _bytesCredit = newCredit > bytesCreditLimit ? bytesCreditLimit : newCredit;
101             if(notifyIncrease && bytesCredit>0)
102             {
103                 notifyIncreaseBytesCredit();
104             }
105         }
106 
107 
108 
109         setSuspended(!hasCredit());
110 
111     }
112 
113     public synchronized void removeAllCredit()
114     {
115         _bytesCredit = 0L;
116         _messageCredit = 0L;
117         setSuspended(!hasCredit());
118     }
119 
120     public synchronized boolean hasCredit()
121     {
122         return (_bytesCreditLimit == 0L || _bytesCredit > 0)
123                 && (_messageCreditLimit == 0L || _messageCredit > 0);
124     }
125 
126     public synchronized boolean useCreditForMessage(final AMQMessage msg)
127     {
128         if(_messageCreditLimit != 0L)
129         {
130             if(_messageCredit != 0L)
131             {
132                 if(_bytesCreditLimit == 0L)
133                 {
134                     _messageCredit--;
135 
136                     return true;
137                 }
138                 else
139                 {
140                     if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
141                     {
142                         _messageCredit--;
143                         _bytesCredit -= msg.getSize();
144 
145                         return true;
146                     }
147                     else
148                     {
149                         //setSuspended(true);
150                         return false;
151                     }
152                 }
153             }
154             else
155             {
156                 setSuspended(true);
157                 return false;
158             }
159         }
160         else
161         {
162             if(_bytesCreditLimit == 0L)
163             {
164 
165                 return true;
166             }
167             else
168             {
169                 if((_bytesCredit >= msg.getSize()) || (_bytesCredit == _bytesCreditLimit))
170                 {
171                     _bytesCredit -= msg.getSize();
172 
173                     return true;
174                 }
175                 else
176                 {
177                     //setSuspended(true);
178                     return false;
179                 }
180             }
181 
182         }
183 
184     }
185 }