IO线程模型

阻塞I/O

title

  • 每一条连接都需要建立一个独立的线程来处理,机器耗费线程资源。
  • 当没有数据读写时,线程还会阻塞。

Reactor模式

IO多路复用+线程池来实现
I/O多路复用来解决会有多个线程阻塞的问题,IO多路复用只会造成一个线程阻塞。
线程池不必为每个连接都建立一个新的线程。

Reactor模型,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。 服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor模式也叫Dispatcher模式,即I/O多路复用统一监听事件,收到事件后分发(Dispatch给某进程)。

Reactor两个关键组成:

  • Reactor
    负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
  • Handler
    处理程序执行I/O事件要完成的实际事件.
    title

    单Reactor

    title
  • Reactor对象通过select不断轮询监控客户端请求事件,收到事件后通过dispatch进行分发
  • 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应
  • Handler会完成read->业务处理->send的完整业务流程

    单Reactor多线程

    主要通过建立一个线程池。
    title
    Worker线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理。

主从Reactor多线程

title

  • Reactor主线程MainReactor对象通过select监控建立连接事件,收到事件后通过Acceptor接收,处理建立连接事件。
  • Accepto处理建立连接事件后,MainReactor将Socket分配Reactor子线程给SubReactor进行处理
  • SubReactor将Socket加入连接队列进行监听,并创建一个Handler用于处理各种连接事件,例如读写操作。
  • 当有新的事件发生时,SubReactor会调用连接对应的Handler进行响应
  • Handler通过read读取数据后,会分发给后面的Worker线程池进行业务处理
  • Worker线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理
  • Handler收到响应结果后通过send将响应结果返回给client

    NIO代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;


/*服务器端,:接收客户端发送过来的数据并显示,
*服务器把上接收到的数据加上"echo from service:"再发送回去*/
public class ServiceSocketChannelDemo {

public static class TCPEchoServer implements Runnable{
/*服务器地址*/
private InetSocketAddress localAddress;
public TCPEchoServer(int port) throws IOException{
this.localAddress = new InetSocketAddress(port);
}

@Override
public void run(){
ServerSocketChannel ssc = null;
Selector selector = null;
Random rnd = new Random();
try {
/*创建选择器*/
selector = Selector.open();
/*创建服务器通道*/
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
/*设置监听服务器的端口,设置最大连接缓冲数为100*/
ssc.bind(localAddress, 100);
/*服务器通道只能对tcp链接事件感兴趣*/
ssc.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e1) {
System.out.println("server start failed");
return;
}
System.out.println("server start with address : " + localAddress);
/*服务器线程被中断后会退出*/
try{
while(!Thread.currentThread().isInterrupted()){
int n = selector.select();
if(n == 0){
continue;
}
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
/*防止下次select方法返回已处理过的通道*/
it.remove();
/*若发现异常,说明客户端连接出现问题,但服务器要保持正常*/
try{
/*ssc通道只能对链接事件感兴趣*/
if(key.isAcceptable()){
/*accept方法会返回一个普通通道,
每个通道在内核中都对应一个socket缓冲区*/
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
/*向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区*/
int interestSet = SelectionKey.OP_READ;
sc.register(selector, interestSet, new Buffers(256,256));
System.out.println("accept from"+ sc.getRemoteAddress());
}
/*(普通)通道感兴趣读事件且有数据可读*/
if(key.isReadable()){
/*通过SelectionKey获取通道对应的缓冲区*/
Buffers buffers = (Buffers)key.attachment();
ByteBuffer readBuffer = buffers.getReadBuffer();
ByteBuffer writeBuffer = buffers.gerWriteBuffer();

/*通过SelectionKey获取对应的通道*/
SocketChannel sc = (SocketChannel) key.channel();

/*从底层socket读缓冲区中读入数据*/
sc.read(readBuffer);
readBuffer.flip();

/*解码显示,客户端发送来的信息*/
CharBuffer cb = utf8.decode(readBuffer);
System.out.println(cb.array());
readBuffer.rewind();
/*准备好向客户端发送的信息*/
/*先写入"echo:",再写入收到的信息*/
writeBuffer.put("echo from service:".getBytes("UTF-8"));
writeBuffer.put(readBuffer);
readBuffer.clear();
/*设置通道写事件*/
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

}

/*通道感兴趣写事件且底层缓冲区有空闲*/
if(key.isWritable()){
doSomething();
}
}catch(IOException e){
System.out.println("service encounter client error");
/*若客户端连接出现异常,从Seletcor中移除这个key*/
key.cancel();
key.channel().close();
}

}
Thread.sleep(rnd.nextInt(500));
}

}catch(InterruptedException e){
System.out.println("serverThread is interrupted");
} catch (IOException e1) {
System.out.println("serverThread selecotr error");
}finally{
try{
selector.close();
}catch(IOException e){
System.out.println("selector close failed");
}finally{
System.out.println("server close");
}
}

}
}
public static void main(String[] args) throws InterruptedException, IOException{
Thread thread = new Thread(new TCPEchoServer(8080));
thread.start();
Thread.sleep(100000);
/*结束服务器线程*/
thread.interrupt();
}

}

Proactor模型(异步)

主要的核心在于回调机制。
Reactor在接收事件后需要交给Reactor处理。而Proactor直接由操作系统来处理相关事件,然后返回结果。
title

理解高性能网络模型