001 /* Licensed to the Apache Software Foundation (ASF) under one
002 * or more contributor license agreements. See the NOTICE file
003 * distributed with this work for additional information
004 * regarding copyright ownership. The ASF licenses this file
005 * to you under the Apache License, Version 2.0 (the
006 * "License"); you may not use this file except in compliance
007 * with the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing,
012 * software distributed under the License is distributed on an
013 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014 * KIND, either express or implied. See the License for the
015 * specific language governing permissions and limitations
016 * under the License.
017 */
018 package org.apache.qpid.client;
019
020 import javax.transaction.xa.XAException;
021 import javax.transaction.xa.XAResource;
022 import javax.transaction.xa.Xid;
023
024 import org.apache.qpid.QpidException;
025 import org.apache.qpid.dtx.XidImpl;
026 import org.apache.qpid.transport.*;
027
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * This is an implementation of javax.njms.XAResource.
033 */
034 public class XAResourceImpl implements XAResource
035 {
036 /**
037 * this XAResourceImpl's logger
038 */
039 private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
040
041 /**
042 * Reference to the associated XASession
043 */
044 private XASessionImpl _xaSession = null;
045
046 /**
047 * The XID of this resource
048 */
049 private Xid _xid;
050
051 //--- constructor
052
053 /**
054 * Create an XAResource associated with a XASession
055 *
056 * @param xaSession The session XAresource
057 */
058 protected XAResourceImpl(XASessionImpl xaSession)
059 {
060 _xaSession = xaSession;
061 }
062
063 //--- The XAResource
064 /**
065 * Commits the global transaction specified by xid.
066 *
067 * @param xid A global transaction identifier
068 * @param b If true, use a one-phase commit protocol to commit the work done on behalf of xid.
069 * @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
070 * XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
071 */
072 public void commit(Xid xid, boolean b) throws XAException
073 {
074 if (_logger.isDebugEnabled())
075 {
076 _logger.debug("commit tx branch with xid: ", xid);
077 }
078 Future<XaResult> future =
079 _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
080
081 // now wait on the future for the result
082 XaResult result = null;
083 try
084 {
085 result = future.get();
086 }
087 catch (SessionException e)
088 {
089 // we need to restore the qpid session that has been closed
090 _xaSession.createSession();
091 convertExecutionErrorToXAErr(e.getException().getErrorCode());
092 }
093 checkStatus(result.getStatus());
094 }
095
096 /**
097 * Ends the work performed on behalf of a transaction branch.
098 * The resource manager disassociates the XA resource from the transaction branch specified
099 * and lets the transaction complete.
100 * <ul>
101 * <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
102 * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
103 * <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
104 * <li> If TMSUCCESS is specified, the portion of work has completed successfully.
105 * /ul>
106 *
107 * @param xid A global transaction identifier that is the same as the identifier used previously in the start method
108 * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
109 * @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
110 * XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
111 */
112 public void end(Xid xid, int flag) throws XAException
113 {
114 if (_logger.isDebugEnabled())
115 {
116 _logger.debug("end tx branch with xid: ", xid);
117 }
118 switch (flag)
119 {
120 case(XAResource.TMSUCCESS):
121 break;
122 case(XAResource.TMFAIL):
123 break;
124 case(XAResource.TMSUSPEND):
125 break;
126 default:
127 throw new XAException(XAException.XAER_INVAL);
128 }
129 _xaSession.flushAcknowledgments();
130 Future<XaResult> future = _xaSession.getQpidSession()
131 .dtxEnd(convertXid(xid),
132 flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
133 flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
134 // now wait on the future for the result
135 XaResult result = null;
136 try
137 {
138 result = future.get();
139 }
140 catch (SessionException e)
141 {
142 // we need to restore the qpid session that has been closed
143 _xaSession.createSession();
144 convertExecutionErrorToXAErr(e.getException().getErrorCode());
145 }
146 checkStatus(result.getStatus());
147 }
148
149
150 /**
151 * Tells the resource manager to forget about a heuristically completed transaction branch.
152 *
153 * @param xid String(xid.getGlobalTransactionId() A global transaction identifier
154 * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
155 * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
156 */
157 public void forget(Xid xid) throws XAException
158 {
159 if (_logger.isDebugEnabled())
160 {
161 _logger.debug("forget tx branch with xid: ", xid);
162 }
163 _xaSession.getQpidSession().dtxForget(convertXid(xid));
164 try
165 {
166 _xaSession.getQpidSession().sync();
167 }
168 catch (SessionException e)
169 {
170 // we need to restore the qpid session that has been closed
171 _xaSession.createSession();
172 convertExecutionErrorToXAErr(e.getException().getErrorCode());
173 }
174 }
175
176
177 /**
178 * Obtains the current transaction timeout value set for this XAResource instance.
179 * If XAResource.setTransactionTimeout was not used prior to invoking this method,
180 * the return value is the default timeout i.e. 0;
181 * otherwise, the value used in the previous setTransactionTimeout call is returned.
182 *
183 * @return The transaction timeout value in seconds.
184 * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
185 */
186 public int getTransactionTimeout() throws XAException
187 {
188 int result = 0;
189 if (_xid != null)
190 {
191 Future<GetTimeoutResult> future =
192 _xaSession.getQpidSession().dtxGetTimeout(convertXid(_xid));
193 try
194 {
195 result = (int) future.get().getTimeout();
196 }
197 catch (SessionException e)
198 {
199 // we need to restore the qpid session that has been closed
200 _xaSession.createSession();
201 convertExecutionErrorToXAErr(e.getException().getErrorCode());
202 }
203 }
204 return result;
205 }
206
207 /**
208 * This method is called to determine if the resource manager instance represented
209 * by the target object is the same as the resouce manager instance represented by
210 * the parameter xaResource.
211 *
212 * @param xaResource An XAResource object whose resource manager instance is to
213 * be compared with the resource manager instance of the target object
214 * @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
215 * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
216 */
217 public boolean isSameRM(XAResource xaResource) throws XAException
218 {
219 // TODO : get the server identity of xaResource and compare it with our own one
220 return false;
221 }
222
223 /**
224 * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
225 *
226 * @param xid A global transaction identifier.
227 * @return A value indicating the resource manager's vote on the outcome of the transaction.
228 * The possible values are: XA_RDONLY or XA_OK.
229 * @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
230 */
231 public int prepare(Xid xid) throws XAException
232 {
233 if (_logger.isDebugEnabled())
234 {
235 _logger.debug("prepare ", xid);
236 }
237 Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
238 XaResult result = null;
239 try
240 {
241 result = future.get();
242 }
243 catch (SessionException e)
244 {
245 // we need to restore the qpid session that has been closed
246 _xaSession.createSession();
247 convertExecutionErrorToXAErr(e.getException().getErrorCode());
248 }
249 DtxXaStatus status = result.getStatus();
250 int outcome = XAResource.XA_OK;
251 switch (status)
252 {
253 case XA_OK:
254 break;
255 case XA_RDONLY:
256 outcome = XAResource.XA_RDONLY;
257 break;
258 default:
259 checkStatus(status);
260 }
261 return outcome;
262 }
263
264 /**
265 * Obtains a list of prepared transaction branches.
266 * <p/>
267 * The transaction manager calls this method during recovery to obtain the list of transaction branches
268 * that are currently in prepared or heuristically completed states.
269 *
270 * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
271 * TMNOFLAGS must be used when no other flags are set in the parameter.
272 * @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
273 * completed state.
274 * @throws XAException An error has occurred. Possible value is XAER_INVAL.
275 */
276 public Xid[] recover(int flag) throws XAException
277 {
278 // the flag is ignored
279 Future<RecoverResult> future = _xaSession.getQpidSession().dtxRecover();
280 RecoverResult res = null;
281 try
282 {
283 res = future.get();
284 }
285 catch (SessionException e)
286 {
287 // we need to restore the qpid session that has been closed
288 _xaSession.createSession();
289 convertExecutionErrorToXAErr( e.getException().getErrorCode());
290 }
291 Xid[] result = new Xid[res.getInDoubt().size()];
292 int i = 0;
293 for (Object obj : res.getInDoubt())
294 {
295 org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
296 result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
297 i++;
298 }
299 return result;
300 }
301
302 /**
303 * Informs the resource manager to roll back work done on behalf of a transaction branch
304 *
305 * @param xid A global transaction identifier.
306 * @throws XAException An error has occurred.
307 */
308 public void rollback(Xid xid) throws XAException
309 {
310 if (_logger.isDebugEnabled())
311 {
312 _logger.debug("rollback tx branch with xid: ", xid);
313 }
314
315 Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
316 // now wait on the future for the result
317 XaResult result = null;
318 try
319 {
320 result = future.get();
321 }
322 catch (SessionException e)
323 {
324 // we need to restore the qpid session that has been closed
325 _xaSession.createSession();
326 convertExecutionErrorToXAErr( e.getException().getErrorCode());
327 }
328 checkStatus(result.getStatus());
329 }
330
331 /**
332 * Sets the current transaction timeout value for this XAResource instance.
333 * Once set, this timeout value is effective until setTransactionTimeout is
334 * invoked again with a different value.
335 * To reset the timeout value to the default value used by the resource manager, set the value to zero.
336 *
337 * @param timeout The transaction timeout value in seconds.
338 * @return true if transaction timeout value is set successfully; otherwise false.
339 * @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
340 */
341 public boolean setTransactionTimeout(int timeout) throws XAException
342 {
343 boolean result = false;
344 if (_xid != null)
345 {
346 try
347 {
348 _xaSession.getQpidSession()
349 .dtxSetTimeout(XidImpl.convert(_xid), timeout);
350 }
351 catch (QpidException e)
352 {
353 if (_logger.isDebugEnabled())
354 {
355 _logger.debug("Cannot convert Xid into String format ", e);
356 }
357 throw new XAException(XAException.XAER_PROTO);
358 }
359 result = true;
360 }
361 return result;
362 }
363
364 /**
365 * Starts work on behalf of a transaction branch specified in xid.
366 * <ul>
367 * <li> If TMJOIN is specified, an exception is thrown as it is not supported
368 * <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
369 * <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
370 * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
371 * </ul>
372 *
373 * @param xid A global transaction identifier to be associated with the resource
374 * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
375 * @throws XAException An error has occurred. Possible exceptions
376 * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
377 */
378 public void start(Xid xid, int flag) throws XAException
379 {
380 if (_logger.isDebugEnabled())
381 {
382 _logger.debug("start tx branch with xid: ", xid);
383 }
384 switch (flag)
385 {
386 case(XAResource.TMNOFLAGS):
387 break;
388 case(XAResource.TMJOIN):
389 break;
390 case(XAResource.TMRESUME):
391 break;
392 default:
393 throw new XAException(XAException.XAER_INVAL);
394 }
395 Future<XaResult> future = _xaSession.getQpidSession()
396 .dtxStart(convertXid(xid),
397 flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
398 flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
399 // now wait on the future for the result
400 XaResult result = null;
401 try
402 {
403 result = future.get();
404 }
405 catch (SessionException e)
406 {
407 // we need to restore the qpid session that has been closed
408 _xaSession.createSession();
409 convertExecutionErrorToXAErr(e.getException().getErrorCode());
410 // TODO: The amqp spec does not allow to make the difference
411 // between an already known XID and a wrong arguments (join and resume are set)
412 // TODO: make sure amqp addresses that
413 }
414 checkStatus(result.getStatus());
415 _xid = xid;
416 }
417
418 //------------------------------------------------------------------------
419 // Private methods
420 //------------------------------------------------------------------------
421
422 /**
423 * Check xa method outcome and, when required, convert the status into the corresponding xa exception
424 * @param status method status code
425 * @throws XAException corresponding XA Exception when required
426 */
427 private void checkStatus(DtxXaStatus status) throws XAException
428 {
429 switch (status)
430 {
431 case XA_OK:
432 // Do nothing this ok
433 break;
434 case XA_RBROLLBACK:
435 // The tx has been rolled back for an unspecified reason.
436 throw new XAException(XAException.XA_RBROLLBACK);
437 case XA_RBTIMEOUT:
438 // The transaction branch took too long.
439 throw new XAException(XAException.XA_RBTIMEOUT);
440 case XA_HEURHAZ:
441 // The transaction branch may have been heuristically completed.
442 throw new XAException(XAException.XA_HEURHAZ);
443 case XA_HEURCOM:
444 // The transaction branch has been heuristically committed.
445 throw new XAException(XAException.XA_HEURCOM);
446 case XA_HEURRB:
447 // The transaction branch has been heuristically rolled back.
448 throw new XAException(XAException.XA_HEURRB);
449 case XA_HEURMIX:
450 // The transaction branch has been heuristically committed and rolled back.
451 throw new XAException(XAException.XA_HEURMIX);
452 case XA_RDONLY:
453 // The transaction branch was read-only and has been committed.
454 throw new XAException(XAException.XA_RDONLY);
455 default:
456 // this should not happen
457 if (_logger.isDebugEnabled())
458 {
459 _logger.debug("got unexpected status value: ", status);
460 }
461 //A resource manager error has occured in the transaction branch.
462 throw new XAException(XAException.XAER_RMERR);
463 }
464 }
465
466 /**
467 * Convert execution error to xa exception.
468 * @param error the execution error code
469 * @throws XAException
470 */
471 private void convertExecutionErrorToXAErr(ExecutionErrorCode error) throws XAException
472 {
473 switch (error)
474 {
475 case NOT_ALLOWED:
476 // The XID already exists.
477 throw new XAException(XAException.XAER_DUPID);
478 case NOT_FOUND:
479 // The XID is not valid.
480 throw new XAException(XAException.XAER_NOTA);
481 case ILLEGAL_STATE:
482 // Routine was invoked in an inproper context.
483 throw new XAException(XAException.XAER_PROTO);
484 case NOT_IMPLEMENTED:
485 // the command is not implemented
486 throw new XAException(XAException.XAER_RMERR);
487 case COMMAND_INVALID:
488 // Invalid call
489 throw new XAException(XAException.XAER_INVAL);
490 default:
491 // this should not happen
492 if (_logger.isDebugEnabled())
493 {
494 _logger.debug("Got unexpected error: " + error);
495 }
496 //A resource manager error has occured in the transaction branch.
497 throw new XAException(XAException.XAER_RMERR);
498 }
499 }
500
501 /**
502 * convert a generic xid into qpid format
503 * @param xid xid to be converted
504 * @return the qpid formated xid
505 * @throws XAException when xid is null or when it cannot be converted.
506 */
507 private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
508 {
509 if (xid == null)
510 {
511 // Invalid arguments were given.
512 throw new XAException(XAException.XAER_INVAL);
513 }
514 try
515 {
516 return XidImpl.convert(xid);
517 }
518 catch (QpidException e)
519 {
520 if (_logger.isDebugEnabled())
521 {
522 _logger.debug("Cannot convert Xid into String format ", e);
523 }
524 //A resource manager error has occured in the transaction branch.
525 throw new XAException(XAException.XAER_RMERR);
526 }
527 }
528
529 }
|