原文地址:
参考文档:
aio(或者叫nio2 ?) jdk1.7的新特性,代码上比nio写着舒服,但是性能貌似没比nio强。。。
import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousChannelGroup;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.apache.log4j.Logger;public class AioServer implements Runnable{ final static Logger logger = Logger.getLogger(AioServer.class); Object lock = new Object(); InetSocketAddress serverAddress = null; int backlog = 0; int buff_size = 1024; int threadPoolSize = 0; public AioServer(int port){ this.serverAddress = new InetSocketAddress(port); initialization(); } public AioServer(String ip,int port){ this.serverAddress = new InetSocketAddress(ip,port); initialization(); } void initialization(){ threadPoolSize = threadPoolSize>0? threadPoolSize: Runtime.getRuntime().availableProcessors(); } @Override public void run() { try { logger.info("aioserver threadPoolSize:"+this.threadPoolSize); ExecutorService threadPool = Executors.newFixedThreadPool(this.threadPoolSize); AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); final AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open(channelGroup); if(this.backlog>0){ assc.bind(serverAddress,this.backlog); } else { assc.bind(serverAddress); } logger.info("aioserver listen:"+this.serverAddress); assc.accept(null, new CompletionHandler(){ @Override public void completed(AsynchronousSocketChannel result, Object attachment) { assc.accept(null, this); handler(result,attachment); } @Override public void failed(Throwable exc, Object attachment) { exc.printStackTrace(); } }); synchronized(lock){ lock.wait(); } channelGroup.shutdownNow(); logger.info("aioserver shutdownC."); } catch (Exception e) { e.printStackTrace(); } } static byte[] echo = "done.".getBytes(); static int connCount = 1; void handler(AsynchronousSocketChannel conn,Object att){ try{// logger.info("connect server :"+connCount++); ByteBuffer buff = ByteBuffer.allocate(this.buff_size); buff.clear(); int rl = conn.read(buff).get(); buff.flip(); logger.info("recv "+rl+": "+new String(buff.array(),0,rl)); buff.clear(); //清空buff数据 buff.put(echo); buff.flip(); int wl = conn.write(buff).get(); logger.info("send "+wl); conn.close(); }catch(Exception ex){ ex.printStackTrace(); } } public void setThreadPoolSize(int threadPoolSize){ this.threadPoolSize = threadPoolSize; } public void setBacklog(int backlog){ this.backlog = backlog; } public void shutdown(){ //logger.info("call shutdown()"); synchronized(lock){ lock.notifyAll(); } }}
AioTest1.java
static void t3(){ AioServer aiose = new AioServer(9777); //线程模式启动 new Thread(aiose).start();; //非线程模式启动// aiose.run(); try { Thread.sleep(1000*60*5); //3秒后关闭 aiose.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); }}