松花皮蛋的黑板報
  • 分享在京東工作的技術感悟,還有JAVA技術和業內最佳實踐,大部分都是務實的、能看懂的、可復現的

掃一掃
關注公眾號

深度解析NIO底層

博客首頁文章列表 松花皮蛋me 2019-04-11 00:39

一、BIO、NIO的介紹

我們可以分別啟動高版本tomcat-8和低版本tomcat-6,然后模擬連接 Socket socket = new Socket(“localhost”,8080)會發現BIO和NIO最明顯的區別

  1. 1、BIO的應用每來一個請求都會起一個線程去接待。
  2. 2、NIO的應用會有一個accepter線程專門負責接待。

所以NIO在做高并發和高吞吐量服務時比BIO更加適合,并且在長鏈接的情況下,同一時間對CPU的利用率也更高。

傳統的BIO也叫”同步阻塞IO”,還有一種叫做”同步非阻塞I/O”

那么問題來了?我們的NIO 底層用的是那種I/O模型呢?其實是IO多路復用

這里提到了兩個概念:

1、select/poll 多路由復用器(這里可以先簡單的理解成一個隊列,有一個單線程在不斷的輪詢處理隊列里的事件)

2、fd 類似于一種請求事件(Socket描述符).有一點很重要,這里的select輪詢是阻塞的。

剛剛說的I/O復用模型可以說是奠定了NIO的模型基礎,但是我們的UNIX對這個模型做了進一步的優化,剛剛的圖我們可以發現一個明顯的問題,什么問題呢?select/poll線程像傻瓜一樣的順序輪詢fd列表,而且處理的fd也是有限的。默認是1024個。這個時候 epoll(Event Poll 基于事件驅動的select/poll模型)模型就呼之欲出了。

這里的select輪詢同樣也是阻塞的。在就緒隊列里的數據為空的時候,select內部的監聽線程就會阻塞。

所以我們說NIO是一個典型的同步非阻塞的I/O。而底層的IO模型是采用的epoll方式的多路復用模型。(在UNIX和Linux下)

總結一下,epoll模型相比select/poll模型的一些優點

select/poll模型

  1. 1、每次調用select,都需要把fd集合從用戶態拷貝到內核態。
  2. 2、每次調用select都需要在內核遍歷傳遞進來的所有fd。
  3. 3、select支持的文件描述符數量太小了,默認是1024。

epoll模型

  1. 1、初次調用select時,會掛載所有的fd進來,并且沒有從用戶態到內核態的內存復制,而是通過內核和用戶空間mmap同一塊內存來實現的。
  2. 2、epoll在事件就緒時的觸發并沒有遍歷所有fd,而是遍歷就緒態的fd鏈表,節省了大量的CPU時間。
  3. 3、所支持的fd的上限是操作系統的最大文件句柄數,簡單理解,也就是可以支持的連接數。一般來說1GB內存的機器上大約是10W個句柄左右

二、NIO的模型介紹和實現原理

NIO這個概念,早在jdk1.4的時候就支持了,我們完全可以通過jdk中的nio功能模塊去實現一個NIO框架的服務。下面給出一個簡單的例子

public class NioDemo {


public static void main(String[] args)
{
    try {
        initServer(9999);
        listenSelector();
    } catch (IOException e) {
        e.printStackTrace();
    }

}

private static Selector selector;

public static void initServer(int port) throws IOException {
    //init一個通道,并且打開對連接的接收
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    //設置為非阻塞
    serverSocketChannel.configureBlocking(false);
    //綁定端口
    serverSocketChannel.socket().bind(new InetSocketAddress(port));
    //打開多路復用器,監控監聽fd
    selector = Selector.open();
    //注冊監聽器,SelectionKey.OP_ACCEPT OP_CONNECT OP_READ OP_WRITE
    serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
    System.out.println("服務端啟動成功");
}

public static void listenSelector() throws IOException {
    while (true) {
        System.out.println("select開始阻塞");
        selector.select();
        Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
        System.out.println("獲取到新的key");
        while (keyIterator.hasNext()) {
            SelectionKey selectionKey = keyIterator.next();
            //刪除已選的key,防止重復處理
            keyIterator.remove();
            try {
                handler(selectionKey,keyIterator);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

public static void  handler(SelectionKey selectionKey,Iterator<SelectionKey> keyIterator) throws IOException {

    if(selectionKey.isAcceptable()) {
        System.out.println("新的客戶端連接");
        //有新的客戶端連接則注冊到讀就就緒事件
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel channel = serverSocketChannel.accept();
        channel.configureBlocking(false);
        channel.register(selector,SelectionKey.OP_READ);
    } else if(selectionKey.isReadable()) {
        //通道可讀說明可以從buffer里取數據
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int readData = socketChannel.read(buffer);
        if(readData>0) {
            String msg = new String(buffer.array(),"GBK").trim();
            System.out.println("服務端收到消息"+msg);
            ByteBuffer byteBuffer = ByteBuffer.wrap("我收到你的消費了".getBytes("UTF-8"));
            socketChannel.write(byteBuffer);
        } else {
            System.out.println("客戶端關閉");
            selectionKey.cancel();
        }
    } else {
        System.out.println(selectionKey.isValid());
    }
}
}

當我telnet 192.168.0.101 9999時,會打印

服務端啟動成功
select開始阻塞
獲取到新的key
新的客戶端連接
select開始阻塞  

當我通過瀏覽器訪問時,會打印

服務端啟動成功
select開始阻塞
獲取到新的key
新的客戶端連接
select開始阻塞
獲取到新的key
新的客戶端連接
select開始阻塞
獲取到新的key
新的客戶端連接
服務端收到消息GET / HTTP/1.1
Host: 192.168.0.101:9999
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
select開始阻塞
獲取到新的key
客戶端關閉
select開始阻塞

演示完了上面的代碼,可以大概的看到他的執行思路是優先注冊鏈接事件,然后監聽這個事件,收到事件后處理完成后,又向select注冊接下來的讀取就緒、寫入就緒事件。我們稱這種開發模型為Reactor模型,也叫反應堆模型

事件驅動模型,有一個或多個并發輸入源,有一個Service Handler,有多個Request Handlers

三、基于NIO實現的Nettty通信框架

Netty是基于NIO實現的網絡通信框架,在rpc框架中被廣為應用(dubbo、jsf),同時Netty可以支持多種協議的開發,非常適合高并發的網絡編程(彈幕、游戲服務器等)
下面是Netty對JDK原生NIO的一些增強

  1. 1、實現了事件分發,和業務執行的線程池隔離。(也就是我們說的IO線程、工作線程職責剝離)
  2. 2、一個NIO服務端處理網絡的閃斷、客戶端的重復接入、客戶端的安全認證、消息的編解碼、半包讀寫等情況。而這些在Netty中都得到了很好的解決。
  3. 3、代碼編寫較復雜,缺少封裝,每增加一層處理需要修改的地方有很多,且很難調試。而Netty實現了PipeLine來實現不同的上下行的Handler。
  4. 4、需要具備其他的額外技能做鋪墊,例如熟悉Java多線程編程。這是因為NIO編程涉及到Reactor模式,你必須對多線程和網路編程非常熟悉,才能編寫出高質量的NIO程序。
  5. 5、Netty在健壯性、功能、性能、可定制性和可擴展性在同類框架中都是首屈一指的。

四、Netty案例演示

這里以彈幕為例,介紹一下Netty在實際項目中的應用。

這里我們使用WebSocket+Netty的方式來實踐彈幕的推送,設想一下,如果我們不用Netty能不能實現彈幕系統的功能?肯定是可以實現的:

簡單暴力的方案:Ajax輪詢去服務器取消息。客戶端按照某個時間間隔不斷地向服務端發送請求,請求服務端的最新數據然后更新客戶端顯示。這種方式實際上浪費了大量流量并且對服務端造成了很大壓力。

以上方案的弊端:

  1. 1、Http為半雙工超文本協議,也就是說同一時刻,只有一個方向的數據傳送。
  2. 2、Http消息冗長,包含請求行、請求頭、請求體。占用很多的帶寬和服務器資源。
  3. 3、空輪詢問題。

使用WebSocket,可以很好的解決http協議帶來的問題。

webSocket特點如下:

  1. 1、單一TCP長連接,采用全雙工通信模式。這是一個二進制的協議
  2. 2、對代理、防火墻透明。
  3. 3、無頭部信息、消息更精簡。
  4. 4、通過ping/pong 來保活。
  5. 5、服務器可以主動推送消息給客戶端,不在需要客戶輪詢

直接看代碼吧

WebsocketDanmuServer.clss

/**
 * 基于Websocket的彈幕服務
 */
public class WebsocketDanmuServer {

private static final Integer port = 7777;

public static void main(String[] args) {

    ServerBootstrap bootstrap = new ServerBootstrap();

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new WebsocketDanmuChannelInitializer());

    try {
        ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
        System.out.println("彈幕服務器啟動,網址是 : " + "http://127.0.0.1:" + port);
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}
}

WebsocketDanmuChannelInitializer

    /**
 * 彈幕服務的上下行handler
 */
    public class WebsocketDanmuChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline channelPipeline = socketChannel.pipeline();
        channelPipeline.addLast("http-decode", new HttpRequestDecoder());//解碼
        channelPipeline.addLast("http-encode", new HttpResponseEncoder());//編碼
        channelPipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
        channelPipeline.addLast("http-chunked", new ChunkedWriteHandler());

        channelPipeline.addLast("http-request",new HttpRequestHandler("/ws"));
        channelPipeline.addLast("WebSocket-protocol",new WebSocketServerProtocolHandler("/ws"));
        channelPipeline.addLast("WebSocket-request",new TextWebSocketFrameHandler());
    }

}

HttpRequestHandler

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

private final String wsUri;
private static final File INDEX;

static {
    URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
    try {
        String path = location.toURI() + "WebsocketDanMu.html";
        path = !path.contains("file:") ? path : path.substring(5);
        INDEX = new File(path);
    } catch (URISyntaxException e) {
        throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
    }
}

public HttpRequestHandler(String wsUri) {
    this.wsUri = wsUri;
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
    if (wsUri.equalsIgnoreCase(fullHttpRequest.getUri())) {
        channelHandlerContext.fireChannelRead(fullHttpRequest.retain()); 
    } else {
        if (HttpHeaders.is100ContinueExpected(fullHttpRequest)) {
            send100Continue(channelHandlerContext);                               
        }

        RandomAccessFile file = new RandomAccessFile(INDEX, "r");

        HttpResponse response = new DefaultHttpResponse(fullHttpRequest.getProtocolVersion(), HttpResponseStatus.OK);
        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

        boolean keepAlive = HttpHeaders.isKeepAlive(fullHttpRequest);

        if (keepAlive) {          
            response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
            response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
        }
        channelHandlerContext.write(response);                    

        if (channelHandlerContext.pipeline().get(SslHandler.class) == null) {     
            channelHandlerContext.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
        } else {
            channelHandlerContext.write(new ChunkedNioFile(file.getChannel()));
        }
        ChannelFuture future = channelHandlerContext.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);          
        if (!keepAlive) {
            future.addListener(ChannelFutureListener.CLOSE);        
        }

        file.close();
    }
}

private static void send100Continue(ChannelHandlerContext ctx) {
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
    ctx.writeAndFlush(response);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "異常");
    // 當出現異常就關閉連接
    cause.printStackTrace();
    ctx.close();
}

TextWebSocketFrameHandler

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
    Channel incoming = channelHandlerContext.channel();
    for (Channel channel : channels) {
        if (channel != incoming) {
            channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
        } else {
            channel.writeAndFlush(new TextWebSocketFrame(textWebSocketFrame.text()));
        }
    }
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
    channels.add(incoming);
    System.out.println("Client:" + incoming.remoteAddress() + "加入");
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 離開"));
    System.err.println("Client:" + incoming.remoteAddress() + "離開");
    // 不需要手動remove"channels.remove(ctx.channel());"
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "在線");
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel incoming = ctx.channel();
    System.out.println("Client:" + incoming.remoteAddress() + "掉線");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel incoming = ctx.channel();
    System.err.println("Client:" + incoming.remoteAddress() + "異常");
    // 當出現異常就關閉連接
    cause.printStackTrace();
    ctx.close();
}
}

黑龙江6+1开奖结果查询