001 package org.apache.qpid.transport.network.nio;
002 /*
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing,
015 * software distributed under the License is distributed on an
016 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
017 * KIND, either express or implied. See the License for the
018 * specific language governing permissions and limitations
019 * under the License.
020 *
021 */
022
023
024 import java.nio.ByteBuffer;
025 import java.nio.channels.SocketChannel;
026
027 import org.apache.qpid.transport.Sender;
028
029 public class NioSender implements Sender<java.nio.ByteBuffer>
030 {
031 private final Object lock = new Object();
032 private SocketChannel _ch;
033 private boolean _batch = false;
034 private ByteBuffer _batcher;
035
036 public NioSender(SocketChannel ch)
037 {
038 this._ch = ch;
039 }
040
041 public void send(java.nio.ByteBuffer buf)
042 {
043 if (_batch)
044 {
045 //System.out.println(_batcher.position() + " , " + buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
046 if (_batcher.position() + buf.remaining() >= _batcher.capacity())
047 {
048 _batcher.flip();
049 write(_batcher);
050 _batcher.clear();
051 if (buf.remaining() > _batcher.capacity())
052 {
053 write(buf);
054 }
055 else
056 {
057 _batcher.put(buf);
058 }
059 }
060 else
061 {
062 _batcher.put(buf);
063 }
064 }
065 else
066 {
067 write(buf);
068 }
069 }
070
071 public void flush()
072 {
073 // pass
074 }
075
076 private void write(java.nio.ByteBuffer buf)
077 {
078 synchronized (lock)
079 {
080 if( _ch.isConnected() && _ch.isOpen())
081 {
082 try
083 {
084 _ch.write(buf);
085 }
086 catch(Exception e)
087 {
088 e.fillInStackTrace();
089 }
090 }
091 else
092 {
093 throw new RuntimeException("Trying to write on a closed socket");
094 }
095
096 }
097 }
098
099 public void setStartBatching()
100 {
101 _batch = true;
102 _batcher = ByteBuffer.allocate(1024);
103 }
104
105 public void close()
106 {
107 // MINA will sometimes throw away in-progress writes when you
108 // ask it to close
109 synchronized (lock)
110 {
111 try
112 {
113 _ch.close();
114 }
115 catch(Exception e)
116 {
117 e.printStackTrace();
118 }
119 }
120 }
121
122 public void setIdleTimeout(long l)
123 {
124 //noop
125 }
126 }
|