XAResourceImpl.java
001 /* Licensed to the Apache Software Foundation (ASF) under one
002  * or more contributor license agreements.  See the NOTICE file
003  * distributed with this work for additional information
004  * regarding copyright ownership.  The ASF licenses this file
005  * to you under the Apache License, Version 2.0 (the
006  * "License"); you may not use this file except in compliance
007  * with the License.  You may obtain a copy of the License at
008  *
009  *   http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing,
012  * software distributed under the License is distributed on an
013  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
014  * KIND, either express or implied.  See the License for the
015  * specific language governing permissions and limitations
016  * under the License.
017  */
018 package org.apache.qpid.client;
019 
020 import javax.transaction.xa.XAException;
021 import javax.transaction.xa.XAResource;
022 import javax.transaction.xa.Xid;
023 
024 import org.apache.qpid.QpidException;
025 import org.apache.qpid.dtx.XidImpl;
026 import org.apache.qpid.transport.*;
027 
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030 
031 /**
032  * This is an implementation of javax.njms.XAResource.
033  */
034 public class XAResourceImpl implements XAResource
035 {
036     /**
037      * this XAResourceImpl's logger
038      */
039     private static final Logger _logger = LoggerFactory.getLogger(XAResourceImpl.class);
040 
041     /**
042      * Reference to the associated XASession
043      */
044     private XASessionImpl _xaSession = null;
045 
046     /**
047      * The XID of this resource
048      */
049     private Xid _xid;
050 
051     //--- constructor
052 
053     /**
054      * Create an XAResource associated with a XASession
055      *
056      @param xaSession The session XAresource
057      */
058     protected XAResourceImpl(XASessionImpl xaSession)
059     {
060         _xaSession = xaSession;
061     }
062 
063     //--- The XAResource
064     /**
065      * Commits the global transaction specified by xid.
066      *
067      @param xid A global transaction identifier
068      @param b   If true, use a one-phase commit protocol to commit the work done on behalf of xid.
069      @throws XAException An error has occurred. An error has occurred. Possible XAExceptions are XA_HEURHAZ,
070      *                     XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
071      */
072     public void commit(Xid xid, boolean bthrows XAException
073     {
074         if (_logger.isDebugEnabled())
075         {
076             _logger.debug("commit tx branch with xid:  ", xid);
077         }
078         Future<XaResult> future =
079                 _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
080 
081         // now wait on the future for the result
082         XaResult result = null;
083         try
084         {
085             result = future.get();
086         }
087         catch (SessionException e)
088         {
089             // we need to restore the qpid session that has been closed
090             _xaSession.createSession();
091             convertExecutionErrorToXAErr(e.getException().getErrorCode());
092         }
093         checkStatus(result.getStatus());
094     }
095 
096     /**
097      * Ends the work performed on behalf of a transaction branch.
098      * The resource manager disassociates the XA resource from the transaction branch specified
099      * and lets the transaction complete.
100      <ul>
101      <li> If TMSUSPEND is specified in the flags, the transaction branch is temporarily suspended in an incomplete state.
102      * The transaction context is in a suspended state and must be resumed via the start method with TMRESUME specified.
103      <li> If TMFAIL is specified, the portion of work has failed. The resource manager may mark the transaction as rollback-only
104      <li> If TMSUCCESS is specified, the portion of work has completed successfully.
105      * /ul>
106      *
107      @param xid  A global transaction identifier that is the same as the identifier used previously in the start method
108      @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
109      @throws XAException An error has occurred. An error has occurred. Possible XAException values are XAER_RMERR,
110      *                     XAER_RMFAILED, XAER_NOTA, XAER_INVAL, XAER_PROTO, or XA_RB*.
111      */
112     public void end(Xid xid, int flagthrows XAException
113     {
114         if (_logger.isDebugEnabled())
115         {
116             _logger.debug("end tx branch with xid: ", xid);
117         }
118         switch (flag)
119         {
120             case(XAResource.TMSUCCESS):
121                 break;
122             case(XAResource.TMFAIL):
123                 break;
124             case(XAResource.TMSUSPEND):
125                 break;
126             default:
127                  throw new XAException(XAException.XAER_INVAL);
128         }
129         _xaSession.flushAcknowledgments();
130         Future<XaResult> future = _xaSession.getQpidSession()
131                 .dtxEnd(convertXid(xid),
132                         flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
133                         flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
134         // now wait on the future for the result
135         XaResult result = null;
136         try
137         {
138             result = future.get();
139         }
140         catch (SessionException e)
141         {
142             // we need to restore the qpid session that has been closed
143             _xaSession.createSession();
144             convertExecutionErrorToXAErr(e.getException().getErrorCode());
145         }
146         checkStatus(result.getStatus());
147     }
148 
149 
150     /**
151      * Tells the resource manager to forget about a heuristically completed transaction branch.
152      *
153      @param xid String(xid.getGlobalTransactionId() A global transaction identifier
154      @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL,
155      *                     XAER_NOTA, XAER_INVAL, or XAER_PROTO.
156      */
157     public void forget(Xid xidthrows XAException
158     {
159         if (_logger.isDebugEnabled())
160         {
161             _logger.debug("forget tx branch with xid: ", xid);
162         }
163         _xaSession.getQpidSession().dtxForget(convertXid(xid));
164         try
165         {
166             _xaSession.getQpidSession().sync();
167         }
168         catch (SessionException e)
169         {
170             // we need to restore the qpid session that has been closed
171             _xaSession.createSession();
172             convertExecutionErrorToXAErr(e.getException().getErrorCode());
173         }
174     }
175 
176 
177     /**
178      * Obtains the current transaction timeout value set for this XAResource instance.
179      * If XAResource.setTransactionTimeout was not used prior to invoking this method,
180      * the return value is the default timeout i.e. 0;
181      * otherwise, the value used in the previous setTransactionTimeout call is returned.
182      *
183      @return The transaction timeout value in seconds.
184      @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
185      */
186     public int getTransactionTimeout() throws XAException
187     {
188         int result = 0;
189         if (_xid != null)
190         {
191             Future<GetTimeoutResult> future =
192                     _xaSession.getQpidSession().dtxGetTimeout(convertXid(_xid));
193             try
194             {
195                 result = (intfuture.get().getTimeout();
196             }
197             catch (SessionException e)
198             {
199                 // we need to restore the qpid session that has been closed
200                 _xaSession.createSession();
201                 convertExecutionErrorToXAErr(e.getException().getErrorCode());
202             }
203         }
204         return result;
205     }
206 
207     /**
208      * This method is called to determine if the resource manager instance represented
209      * by the target object is the same as the resouce manager instance represented by
210      * the parameter xaResource.
211      *
212      @param xaResource An XAResource object whose resource manager instance is to
213      *                   be compared with the resource manager instance of the target object
214      @return <code>true</code> if it's the same RM instance; otherwise <code>false</code>.
215      @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
216      */
217     public boolean isSameRM(XAResource xaResourcethrows XAException
218     {
219         // TODO : get the server identity of xaResource and compare it with our own one
220         return false;
221     }
222 
223     /**
224      * Prepare for a transaction commit of the transaction specified in <code>Xid</code>.
225      *
226      @param xid A global transaction identifier.
227      @return A value indicating the resource manager's vote on the outcome of the transaction.
228      *         The possible values are: XA_RDONLY or XA_OK.
229      @throws XAException An error has occurred. Possible exception values are: XAER_RMERR or XAER_NOTA
230      */
231     public int prepare(Xid xidthrows XAException
232     {
233         if (_logger.isDebugEnabled())
234         {
235             _logger.debug("prepare ", xid);
236         }
237         Future<XaResult> future = _xaSession.getQpidSession().dtxPrepare(convertXid(xid));
238         XaResult result = null;
239         try
240         {
241             result = future.get();
242         }
243         catch (SessionException e)
244         {
245             // we need to restore the qpid session that has been closed
246             _xaSession.createSession();
247             convertExecutionErrorToXAErr(e.getException().getErrorCode());
248         }
249         DtxXaStatus status = result.getStatus();
250         int outcome = XAResource.XA_OK;
251         switch (status)
252         {
253             case XA_OK:
254                 break;
255             case XA_RDONLY:
256                 outcome = XAResource.XA_RDONLY;
257                 break;
258             default:
259                 checkStatus(status);
260         }
261         return outcome;
262     }
263 
264     /**
265      * Obtains a list of prepared transaction branches.
266      <p/>
267      * The transaction manager calls this method during recovery to obtain the list of transaction branches
268      * that are currently in prepared or heuristically completed states.
269      *
270      @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
271      *             TMNOFLAGS must be used when no other flags are set in the parameter.
272      @return zero or more XIDs of the transaction branches that are currently in a prepared or heuristically
273      *         completed state.
274      @throws XAException An error has occurred. Possible value is XAER_INVAL.
275      */
276     public Xid[] recover(int flagthrows XAException
277     {
278         // the flag is ignored
279         Future<RecoverResult> future = _xaSession.getQpidSession().dtxRecover();
280         RecoverResult res = null;
281         try
282         {
283             res = future.get();
284         }
285         catch (SessionException e)
286         {
287             // we need to restore the qpid session that has been closed
288             _xaSession.createSession();
289             convertExecutionErrorToXAErre.getException().getErrorCode());
290         }
291         Xid[] result = new Xid[res.getInDoubt().size()];
292         int i = 0;
293         for (Object obj : res.getInDoubt())
294         {
295             org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xidobj;
296             result[inew XidImpl(xid.getBranchId()(intxid.getFormat(), xid.getGlobalId());
297             i++;
298         }
299         return result;
300     }
301 
302     /**
303      * Informs the resource manager to roll back work done on behalf of a transaction branch
304      *
305      @param xid A global transaction identifier.
306      @throws XAException An error has occurred.
307      */
308     public void rollback(Xid xidthrows XAException
309     {
310         if (_logger.isDebugEnabled())
311         {
312             _logger.debug("rollback tx branch with xid: ", xid);
313         }
314 
315         Future<XaResult> future = _xaSession.getQpidSession().dtxRollback(convertXid(xid));
316         // now wait on the future for the result
317         XaResult result = null;
318         try
319         {
320             result = future.get();
321         }
322         catch (SessionException e)
323         {
324             // we need to restore the qpid session that has been closed
325             _xaSession.createSession();
326             convertExecutionErrorToXAErre.getException().getErrorCode());
327         }
328         checkStatus(result.getStatus());
329     }
330 
331     /**
332      * Sets the current transaction timeout value for this XAResource instance.
333      * Once set, this timeout value is effective until setTransactionTimeout is
334      * invoked again with a different value.
335      * To reset the timeout value to the default value used by the resource manager, set the value to zero.
336      *
337      @param timeout The transaction timeout value in seconds.
338      @return true if transaction timeout value is set successfully; otherwise false.
339      @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
340      */
341     public boolean setTransactionTimeout(int timeoutthrows XAException
342     {
343         boolean result = false;
344         if (_xid != null)
345         {
346             try
347             {
348                 _xaSession.getQpidSession()
349                         .dtxSetTimeout(XidImpl.convert(_xid), timeout);
350             }
351             catch (QpidException e)
352             {
353                 if (_logger.isDebugEnabled())
354                 {
355                     _logger.debug("Cannot convert Xid into String format ", e);
356                 }
357                 throw new XAException(XAException.XAER_PROTO);
358             }
359             result = true;
360         }
361         return result;
362     }
363 
364     /**
365      * Starts work on behalf of a transaction branch specified in xid.
366      <ul>
367      <li> If TMJOIN is specified, an exception is thrown as it is not supported
368      <li> If TMRESUME is specified, the start applies to resuming a suspended transaction specified in the parameter xid.
369      <li> If neither TMJOIN nor TMRESUME is specified and the transaction specified by xid has previously been seen by the
370      * resource manager, the resource manager throws the XAException exception with XAER_DUPID error code.
371      </ul>
372      *
373      @param xid  A global transaction identifier to be associated with the resource
374      @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
375      @throws XAException An error has occurred. Possible exceptions
376      *                     are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
377      */
378     public void start(Xid xid, int flagthrows XAException
379     {
380         if (_logger.isDebugEnabled())
381         {
382             _logger.debug("start tx branch with xid: ", xid);
383         }
384         switch (flag)
385         {
386             case(XAResource.TMNOFLAGS):
387                 break;
388             case(XAResource.TMJOIN):
389                 break;
390             case(XAResource.TMRESUME):
391                 break;
392             default:
393                  throw new XAException(XAException.XAER_INVAL);
394         }
395         Future<XaResult> future = _xaSession.getQpidSession()
396                 .dtxStart(convertXid(xid),
397                         flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
398                         flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
399         // now wait on the future for the result
400         XaResult result = null;
401         try
402         {
403             result = future.get();
404         }
405         catch (SessionException e)
406         {
407             // we need to restore the qpid session that has been closed
408             _xaSession.createSession();
409             convertExecutionErrorToXAErr(e.getException().getErrorCode());
410             // TODO: The amqp spec does not allow to make the difference
411             // between an already known XID and a wrong arguments (join and resume are set)
412             // TODO: make sure amqp addresses that
413         }
414         checkStatus(result.getStatus());
415         _xid = xid;
416     }
417 
418     //------------------------------------------------------------------------
419     // Private methods
420     //------------------------------------------------------------------------
421 
422     /**
423      * Check xa method outcome and, when required, convert the status into the corresponding xa exception
424      @param status method status code
425      @throws XAException corresponding XA Exception when required
426      */
427     private void checkStatus(DtxXaStatus statusthrows XAException
428     {
429         switch (status)
430         {
431             case XA_OK:
432                 // Do nothing this ok
433                 break;
434             case XA_RBROLLBACK:
435                 // The tx has been rolled back for an unspecified reason.
436                 throw new XAException(XAException.XA_RBROLLBACK);
437             case XA_RBTIMEOUT:
438                 // The transaction branch took too long.
439                 throw new XAException(XAException.XA_RBTIMEOUT);
440             case XA_HEURHAZ:
441                 // The transaction branch may have been heuristically completed.
442                 throw new XAException(XAException.XA_HEURHAZ);
443             case XA_HEURCOM:
444                 // The transaction branch has been heuristically committed.
445                 throw new XAException(XAException.XA_HEURCOM);
446             case XA_HEURRB:
447                 // The transaction branch has been heuristically rolled back.
448                 throw new XAException(XAException.XA_HEURRB);
449             case XA_HEURMIX:
450                 // The transaction branch has been heuristically committed and rolled back.
451                 throw new XAException(XAException.XA_HEURMIX);
452             case XA_RDONLY:
453                 // The transaction branch was read-only and has been committed.
454                 throw new XAException(XAException.XA_RDONLY);
455             default:
456                 // this should not happen
457                 if (_logger.isDebugEnabled())
458                 {
459                     _logger.debug("got unexpected status value: ", status);
460                 }
461                 //A resource manager error has occured in the transaction branch.
462                 throw new XAException(XAException.XAER_RMERR);
463         }
464     }
465 
466     /**
467      * Convert execution error to xa exception.
468      @param error the execution error code
469      @throws XAException
470      */
471     private void convertExecutionErrorToXAErr(ExecutionErrorCode errorthrows XAException
472     {
473         switch (error)
474         {
475             case NOT_ALLOWED:
476                 // The XID already exists.
477                 throw new XAException(XAException.XAER_DUPID);
478             case NOT_FOUND:
479                 // The XID is not valid.
480                 throw new XAException(XAException.XAER_NOTA);
481             case ILLEGAL_STATE:
482                 // Routine was invoked in an inproper context.
483                 throw new XAException(XAException.XAER_PROTO);
484             case NOT_IMPLEMENTED:
485                 // the command is not implemented
486                 throw new XAException(XAException.XAER_RMERR);
487             case COMMAND_INVALID:
488                 // Invalid call
489                 throw new XAException(XAException.XAER_INVAL);
490             default:
491                 // this should not happen
492                 if (_logger.isDebugEnabled())
493                 {
494                     _logger.debug("Got unexpected error: " + error);
495                 }
496                 //A resource manager error has occured in the transaction branch.
497                 throw new XAException(XAException.XAER_RMERR);
498         }
499     }
500 
501     /**
502      * convert a generic xid into qpid format
503      @param xid xid to be converted
504      @return the qpid formated xid
505      @throws XAException when xid is null or when it cannot be converted. 
506      */
507     private org.apache.qpid.transport.Xid convertXid(Xid xidthrows XAException
508     {
509         if (xid == null)
510         {
511             // Invalid arguments were given.
512             throw new XAException(XAException.XAER_INVAL);
513         }
514         try
515         {
516             return XidImpl.convert(xid);
517         }
518         catch (QpidException e)
519         {
520             if (_logger.isDebugEnabled())
521             {
522                 _logger.debug("Cannot convert Xid into String format ", e);
523             }
524             //A resource manager error has occured in the transaction branch.
525             throw new XAException(XAException.XAER_RMERR);
526         }
527     }
528 
529 }