NioSender.java
001 package org.apache.qpid.transport.network.nio;
002 /*
003  
004  * Licensed to the Apache Software Foundation (ASF) under one
005  * or more contributor license agreements.  See the NOTICE file
006  * distributed with this work for additional information
007  * regarding copyright ownership.  The ASF licenses this file
008  * to you under the Apache License, Version 2.0 (the
009  * "License"); you may not use this file except in compliance
010  * with the License.  You may obtain a copy of the License at
011  
012  *   http://www.apache.org/licenses/LICENSE-2.0
013  
014  * Unless required by applicable law or agreed to in writing,
015  * software distributed under the License is distributed on an
016  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017  * KIND, either express or implied.  See the License for the
018  * specific language governing permissions and limitations
019  * under the License.
020  
021  */
022 
023 
024 import java.nio.ByteBuffer;
025 import java.nio.channels.SocketChannel;
026 
027 import org.apache.qpid.transport.Sender;
028 
029 public class NioSender implements Sender<java.nio.ByteBuffer>
030 {
031     private final Object lock = new Object();
032     private SocketChannel _ch;
033     private boolean _batch =  false;
034     private ByteBuffer _batcher;
035 
036     public NioSender(SocketChannel ch)
037     {
038         this._ch = ch;
039     }
040 
041     public void send(java.nio.ByteBuffer buf)
042     {
043         if (_batch)
044         {
045             //System.out.println(_batcher.position() + " , " +  buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
046             if (_batcher.position() + buf.remaining() >= _batcher.capacity())
047             {
048                 _batcher.flip();
049                 write(_batcher);
050                 _batcher.clear();
051                 if (buf.remaining() > _batcher.capacity())
052                 {
053                     write(buf);
054                 }
055                 else
056                 {
057                     _batcher.put(buf);
058                 }
059             }
060             else
061             {
062                 _batcher.put(buf);
063             }
064         }
065         else
066         {
067             write(buf);
068         }
069     }
070 
071     public void flush()
072     {
073         // pass
074     }
075 
076     private void write(java.nio.ByteBuffer buf)
077     {
078         synchronized (lock)
079         {
080             if_ch.isConnected() && _ch.isOpen())
081             {
082                 try
083                 {
084                     _ch.write(buf);
085                 }
086                 catch(Exception e)
087                 {
088                     e.fillInStackTrace();
089                 }
090             }
091             else
092             {
093                 throw new RuntimeException("Trying to write on a closed socket");
094             }
095 
096         }
097     }
098 
099     public void setStartBatching()
100     {
101         _batch = true;
102         _batcher = ByteBuffer.allocate(1024);
103     }
104 
105     public void close()
106     {
107         // MINA will sometimes throw away in-progress writes when you
108         // ask it to close
109         synchronized (lock)
110         {
111             try
112             {
113                 _ch.close();
114             }
115             catch(Exception e)
116             {
117                 e.printStackTrace();
118             }
119         }
120     }
121     
122     public void setIdleTimeout(long l)
123     {
124       //noop
125     }
126 }