一.Selector(选择器,多路复用器) 1.1 多路复用的概念 多路:多个服务器分别去监听多个端口
如果不使用”多路复用”,每个服务器都需要开许多线程处理每个端口的请求,如果在高并发下,会造成系统性能下降。 如果使用”多路复用”,可以把多个服务器注册到一个Selector,只需要开启一个线程即可处理服务器(在高并发下性能较高)
1.2 选择器Selector
什么是Selector Selector也称为选择器,也叫多路复用器,可以让多个Channel注册到Selector上,然后监听各个Channel上发生的事件
Selector创建API 创建Selector的方式: Selector selector = Selector.open();
将要交给选择器管理的通道注册到选择器: Channel.configureBlocking(false);//channel是一个通道,并且必须是非阻塞通道 SelectionKey key = channel.register(selector,SelectionKey.OP_ACCEPT);
参数1:该通道注册到的选择器 参数2:选择器对何种事件感兴趣(服务器通道只能写SelectionKey.OP_ACCEPT,表示有客户端连接) 返回值:SelectionKey 是一个对象,该对象中包含了注册到选择器的通道
可以监听4中不同类型的事件,使用SelectionKey常量表示: 连接就绪–常量:SelectionKey.OP_CONNECT 接收就绪–常量:SelectionKey.OP_ACCEPT(ServerSocketChannel在注册时只能使用此项) 读就绪–常量:SelectionKey.OP_READ 写就绪–常量:SelectionKey.OP_WRITE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class SelectorDemo { public static void main (String[] args) throws IOException { ServerSocketChannel server1 = ServerSocketChannel.open(); server1.configureBlocking(false ); server1.bind(new InetSocketAddress (8888 )); ServerSocketChannel server2 = ServerSocketChannel.open(); server2.configureBlocking(false ); server2.bind(new InetSocketAddress (9999 )); Selector selector = Selector.open(); server1.register(selector, SelectionKey.OP_ACCEPT); server2.register(selector, SelectionKey.OP_ACCEPT); } }
1.3 Selector中的常用方法
Channel注册Selector的API •获取所有已经成功注册到当前选择器的通道集合 public Set keys();
•表示所有已有客户端连接的通道的集合 public Set selectedKeys();
•如果目前没有客户端连接,该方法会阻塞。如果有客户端连接会返回本次连接的客户端数量 public int select();
代码演示 1.4 Selector实现多路连接(上)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class SocketChannelSelector_Demo { public static void main (String[] args) { new Thread (() -> { try (SocketChannel socket = SocketChannel.open()) { socket.connect(new InetSocketAddress ("127.0.0.1" , 8888 )); } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread (() -> { try (SocketChannel socket = SocketChannel.open()) { socket.connect(new InetSocketAddress ("127.0.0.1" , 9999 )); } catch (IOException e) { e.printStackTrace(); } }).start(); } } public class ServerSelector_SelectionKey { public static void main (String[] args) throws IOException { ServerSocketChannel server1 = ServerSocketChannel.open(); server1.configureBlocking(false ); server1.bind(new InetSocketAddress (8888 )); ServerSocketChannel server2 = ServerSocketChannel.open(); server2.configureBlocking(false ); server2.bind(new InetSocketAddress (9999 )); Selector selector = Selector.open(); server1.register(selector, SelectionKey.OP_ACCEPT); server2.register(selector, SelectionKey.OP_ACCEPT); Set<SelectionKey> keys = selector.keys(); System.out.println(keys.size()); Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println(selectionKeys.size()); int selectedCount = selector.select(); System.out.println(selectedCount); } }
1.5 Selector实现多路连接(下)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class SocketChannelSelector_Demo { public static void main (String[] args) { new Thread (() -> { try (SocketChannel socket = SocketChannel.open()) { socket.connect(new InetSocketAddress ("127.0.0.1" , 8888 )); } catch (IOException e) { e.printStackTrace(); } }).start(); new Thread (() -> { try (SocketChannel socket = SocketChannel.open()) { socket.connect(new InetSocketAddress ("127.0.0.1" , 9999 )); } catch (IOException e) { e.printStackTrace(); } }).start(); } } public class ServerSelector_SelectionKey { public static void main (String[] args) throws IOException, InterruptedException { ServerSocketChannel server1 = ServerSocketChannel.open(); server1.configureBlocking(false ); server1.bind(new InetSocketAddress (8888 )); ServerSocketChannel server2 = ServerSocketChannel.open(); server2.configureBlocking(false ); server2.bind(new InetSocketAddress (9999 )); Selector selector = Selector.open(); server1.register(selector, SelectionKey.OP_ACCEPT); server2.register(selector, SelectionKey.OP_ACCEPT); Set<SelectionKey> keys = selector.keys(); System.out.println(keys.size()); while (true ) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println(selectionKeys.size()); int selectedCount = selector.select(); System.out.println(selectedCount); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); ServerSocketChannel channel = (ServerSocketChannel) key.channel(); System.out.println(channel.getLocalAddress()); SocketChannel accept = channel.accept(); System.out.println(accept); it.remove(); } Thread.sleep(1000 * 5 ); System.out.println(); } } }
1.6 Selecto多路信息接收测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class ServerSelector_SelectionKey { public static void main (String[] args) throws IOException, InterruptedException { ServerSocketChannel server1 = ServerSocketChannel.open(); server1.configureBlocking(false ); server1.bind(new InetSocketAddress (8888 )); ServerSocketChannel server2 = ServerSocketChannel.open(); server2.configureBlocking(false ); server2.bind(new InetSocketAddress (9999 )); Selector selector = Selector.open(); server1.register(selector, SelectionKey.OP_ACCEPT); server2.register(selector, SelectionKey.OP_ACCEPT); Set<SelectionKey> keys = selector.keys(); System.out.println(keys.size()); while (true ) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println(selectionKeys.size()); int selectedCount = selector.select(); System.out.println(selectedCount); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); ServerSocketChannel channel = (ServerSocketChannel) key.channel(); System.out.println(channel.getLocalAddress()); SocketChannel accept = channel.accept(); System.out.println(accept); ByteBuffer inBuf = ByteBuffer.allocate(1024 ); accept.read(inBuf); inBuf.flip(); String msg = new String (inBuf.array(),0 ,inBuf.limit()); System.out.println(channel.getLocalAddress()+msg); it.remove(); } Thread.sleep(1000 * 5 ); System.out.println(); } } }
二.NIO2-AIO(异步非阻塞) 2.1 AIO概述和分类 什么是AIO:ASynchronized 异步非阻塞的IO
AIO的分类: 异步文件通道•AsynchronousFileChannel 异步客户端通道•AsynchronousSocketChannel 异步服务器通道•AsynchronousServerSocketChannel 异步UDP协议通道•AsynchronousDatagramChannel
AIO的异步: a.建立连接时可以使用异步(调用连接时的方法,非阻塞,连接成功后会以方法的回调机制通知我们) b.读取数据时,可以使用异步(调用read方法时,非阻塞,等数据接收到之后以方法调用的机制通知我们)
2.2 AIO异步非阻塞连接的建立
异步非阻塞的客户端通道
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class AIO_SocketChannel { public static void main (String[] args) throws IOException { AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); socketChannel.connect(new InetSocketAddress ("127.0.0.1" ,8888 ),null , new CompletionHandler <Void, Object>() { @Override public void completed (Void result, Object attachment) { System.out.println("Successful" ); } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Failed" ); } }); System.out.println("Continue" ); while (true ){ } } } 异步非阻塞的服务器通道 public class AIO_ServerSocketChannel { public static void main (String[] args) throws IOException { AsynchronousServerSocketChannel serversocketChannel = AsynchronousServerSocketChannel.open(); serversocketChannel.bind(new InetSocketAddress (8888 )); serversocketChannel.accept(null , new CompletionHandler <AsynchronousSocketChannel, Object>() { @Override public void completed (AsynchronousSocketChannel result, Object attachment) { System.out.println("Connect Client Successful" ); } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Connect Client Failed" ); } }); System.out.println("Server Continue" ); while (true ) { } } }
异步非阻塞建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class AIO_SocketChannel { public static void main (String[] args) throws IOException { AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); socketChannel.connect(new InetSocketAddress ("127.0.0.1" , 8888 ), null , new CompletionHandler <Void, Object>() { @Override public void completed (Void result, Object attachment) { System.out.println("Connect Server Successful" ); } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Connect Server Failed" ); } }); System.out.println("Client Continue" ); while (true ) { } } } public class AIO_ServerSocketChannel { public static void main (String[] args) throws IOException { AsynchronousServerSocketChannel serversocketChannel = AsynchronousServerSocketChannel.open(); serversocketChannel.bind(new InetSocketAddress (8888 )); serversocketChannel.accept(null , new CompletionHandler <AsynchronousSocketChannel, Object>() { @Override public void completed (AsynchronousSocketChannel result, Object attachment) { System.out.println("Connect Client Successful" ); try { System.out.println(result.getLocalAddress()); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Connect Client Failed" ); } }); System.out.println("Server Continue" ); while (true ) { } } }
2.3 AIO同步读写数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public class AIO_SocketChannel { public static void main (String[] args) throws IOException { AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); socketChannel.connect(new InetSocketAddress ("127.0.0.1" , 8888 ), null , new CompletionHandler <Void, Object>() { @Override public void completed (Void result, Object attachment) { System.out.println("Connect Server Successful" ); ByteBuffer buffer = ByteBuffer.allocate(1024 ); buffer.put("I am AIO Client" .getBytes()); buffer.flip(); socketChannel.write(buffer); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Connect Server Failed" ); } }); System.out.println("Client Continue" ); while (true ) { } } } public class AIO_ServerSocketChannel { public static void main (String[] args) throws IOException { AsynchronousServerSocketChannel serversocketChannel = AsynchronousServerSocketChannel.open(); serversocketChannel.bind(new InetSocketAddress (8888 )); serversocketChannel.accept(null , new CompletionHandler <AsynchronousSocketChannel, Object>() { @Override public void completed (AsynchronousSocketChannel result, Object attachment) { System.out.println("Connect Client Successful" ); ByteBuffer buffer = ByteBuffer.allocate(1024 ); Future<Integer> future = result.read(buffer); try { byte [] array = buffer.array(); System.out.println(Arrays.toString(array)); Integer len = future.get(); System.out.println(len); buffer.flip(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("Connect Client Failed" ); } }); System.out.println("Server Continue" ); while (true ) { } } }
2.4 AIO异步读写数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 public class AIOSocketChannelDemo01 { public static void main (String[] args) throws IOException, InterruptedException { AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); socketChannel.connect(new InetSocketAddress ("127.0.0.1" , 8888 ), null , new CompletionHandler <Void, Object>() { @Override public void completed (Void result, Object attachment) { System.out.println("连接服务器成功..." ); ByteBuffer buffer = ByteBuffer.allocate(1000 ); buffer.put("你好我是AIO客户端.." .getBytes()); buffer.flip(); socketChannel.write(buffer, 10 , TimeUnit.SECONDS, null , new CompletionHandler <Integer, Object>() { @Override public void completed (Integer result, Object attachment) { System.out.println("数据成功发送..." ); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("数据发送失败..." ); } }); } @Override public void failed (Throwable exc, Object attachment) { System.out.println("连接服务器失败..." ); } }); System.out.println("程序继续执行...." ); Thread.sleep(5000 ); } } public class AIOServerSocketChannelDemo01 { public static void main (String[] args) throws IOException, InterruptedException { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress (8888 )); serverSocketChannel.accept(null , new CompletionHandler <AsynchronousSocketChannel, Object>() { @Override public void completed (AsynchronousSocketChannel socketChannel, Object attachment) { System.out.println("有客户端连接...." ); ByteBuffer buffer = ByteBuffer.allocate(1000 ); socketChannel.read(buffer, 10 , TimeUnit.SECONDS, null , new CompletionHandler <Integer, Object>() { @Override public void completed (Integer result, Object attachment) { System.out.println("数据读取完毕.." ); System.out.println("接收到数据的长度:" +result); System.out.println("接收到的数据是:" + new String (buffer.array(), 0 , result)); try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void failed (Throwable exc, Object attachment) { System.out.println("读取数据失败..." ); } }); } @Override public void failed (Throwable exc, Object attachment) { System.out.println("客户端连接失败..." ); } }); System.out.println("程序继续执行.." ); while (true ) { Thread.sleep(1000 ); } } }
总结: Selector作用: Selector可以让多个服务器注册到它上,完成多路复用功能
使用Selector选择器 注册: channel.register(selector,SelectionKey.OP_ACCEPT); 方法: //表示所有被连接到服务器通道的集合 public Set selectedKeys(); Set keys = selector.selectedKeys();
//获取所有已经成功注册到选择器的服务器通道集合 public Set keys(); Set keys = selector.keys();
//如果目前没有客户端连接,该方法会阻塞。如果有客户端连接会返回本次连接的客户端数量 public int select(); int count = selector.select();
AIO特点: a.支持异步非阻塞连接 connect accept b.支持异步非阻塞到读写数据 socketChannel.write(…); socketChannel.read(…);