网络模型之Event select
开始之前,再来看看流程图。
程序阻塞的主要有两部分,一个是等待数据到来,一个是将数据从内核复制到程序缓冲区。
事件选择模型,其实是在 select 模型的基础上更进一步地做了优化,这次优化把等待数据到来的这部分变成了非阻塞。而这主要是使用 WSAEventSelect
函数完成的,其原型如下:
int WSAEventSelect (
SOCKET s, // 套接字
WSAEVENT hEventObject, // 事件对象
long lNetworkEvents // 网络事件
);
事件选择给每个套接字绑定了一个事件,当发生指定网络事件的时候它会把绑定的事件变为 signaled 状态。
WSAEVENT
其实和之前在 Windows 线程同步中所说的内核对象事件是一样的,只是在这个函数中需要手动重置事件,为了清晰方便,所以弄了个 typedef
,并提供了一个 WSACreateEvent
函数来创建手动重置的事件。
WSAEVENT event = WSACreateEvent();
其实,这个 WSACreateEvent
函数就相当于调用以下语句:
CreateEvent(NULL, TRUE, FALSE, NULL);
lNetworkEvents
可以指定一些指定的网络事件,我们感兴趣的值如下:
FD_READ
FD_WRITE
FD_ACCEPT
FD_CONNECT
FD_CLOSE
当将套接字和事件绑定起来后,在网络事件发生的时候,OS就把事件设为signaled状态,然后我们就可以根据事件来进行处理。
那么如何验证是否发生了事件呢?
大家可能还记得在 Windows 线程中所说的 WaitForMultipleObjects
,它用于等待多个内核对象变为 signaled 状态。在这里,它同样提供了一个 WSAWaitForMultipleEvents
函数来验证是否发生事件,原型如下:
DWORD WSAWaitForMultipleEvents(
DWORD cEvents, // 对象个数
const WSAEVENT FAR *lphEvents, // 事件数组地址
BOOL fWaitAll, // 等待标志
DWORD dwTimeOUT, //超时时间
BOOL fAlertable
);
前四个参数的意义和 WSAForMultipleObjects
相同,最后一个参数 fAlerable
用于触发 alertable wait 状态,这个在重叠 IO 中会用到,现在只需填入 FALSE
即可。
若是超时,则会返回 WAIT_TIMEOUT
;否则会返回一个常量,使用这个常量减去 WSA_WAIT_EVENT_0
可以得到事件数组中变为 signaled 状态的事件索引,若有多个事件都为 signaled 状态,那么返回的便是最小的那个索引。知道最小索引也就知道全部的了,可以对后面的依次再调用一次 WSAWaitForMultipleEvents
来确认所有发生事件的索引,具体的操作可以稍后在代码中看。
因为这个函数最多可等待 64 个事件,所以一次就只能监视 64 个,但可以创建线程或扩展保存句柄的数组,然后多次调用这个函数以监视更多事件。
现在已经得到发生事件的索引了,接下来如何确认发生的事件类型呢?
这得使用以下函数:
int WSAEnumNetworkEvents (
SOCKET s,
WSAEVENT hEventObject,
LPWSANETWORKEVENTS lpNetworkEvents
);
前两个分别是发生事件的套接字和与之相连的事件,这两个参数通过上一步得到的索引便能得到了。lpNetworkEvents
是个指向 WSANETWORKEVENTS
的结构体,用于保存发生的事件类型或错误信息,原型如下:
typedef struct _WSANETWORKEVENTS {
long lNetworkEvents;
int iErrorCode[FD_MAX_EVENTS];
} WSANETWORKEVENTS, FAR * LPWSANETWORKEVENTS;
lNetworkEvents
就是用于保存发生的事件类型的,比如是 FD_READ
事件它就为 FD_READ
。若是有错误,iErrorCode
中就会记录,若是 FD_READ
, 则 iErrorCode[FD_READ_BIT]
中就为非 0,应该加以判断。
现在,创建一个 CEventSelect
类来实现上述知识。
class CEventSelect
{
public:
CEventSelect(int port);
~CEventSelect();
void Accept();
private:
void InitSock(); // 初始化相关操作
void RequestHandler(); // 处理请求
void CompressSocksAndEvents(int index); // 有客户端退出时,同步删除套接字和对应的事件
private:
WSADATA m_wsaData;
SOCKET m_servSock, m_clntSock;
SOCKADDR_IN m_servAddr, m_clntAddr;
SOCKET m_hSockAddr[WSA_MAXIMUM_WAIT_EVENTS]; // 套接字数组
WSAEVENT m_hEventAddr[WSA_MAXIMUM_WAIT_EVENTS]; // 事件数组
int m_nPort; // 端口
int m_nNumOfSock; // 记录当前的套接字数量
};
同样先从 InitSock
中来看。
void CEventSelect::InitSock()
{
// 初始化套按字库
int ret = WSAStartup(MAKEWORD(2, 2), &m_wsaData);
assert(ret == 0);
// 创建服务端套接字
m_servSock = socket(PF_INET, SOCK_STREAM, 0);
memset(&m_servAddr, 0, sizeof(m_servAddr));
m_servAddr.sin_family = AF_INET;
m_servAddr.sin_addr.s_addr = htonl(ADDR_ANY);
m_servAddr.sin_port = htons(m_nPort);
// 绑定地址
ret = bind(m_servSock, (SOCKADDR *)&m_servAddr, sizeof(m_servAddr));
assert(ret != SOCKET_ERROR);
//监听
ret = listen(m_servSock, 5);
assert(ret != SOCKET_ERROR);
// 创建一个事件
WSAEVENT newEvent = WSACreateEvent();
//将服务端套接字与新建的事件绑定起来,关注FD_ACCEPT和FD_CLOSE消息
ret = WSAEventSelect(m_servSock, newEvent, FD_ACCEPT | FD_CLOSE);
assert(ret != SOCKET_ERROR);
//将套接字与事件同时保存入数组
m_hSockAddr[m_nNumOfSock] = m_servSock;
m_hEventAddr[m_nNumOfSock] = newEvent;
m_nNumOfSock++; //套接字计数++
}
主要的就是 WSAEventSelect
函数,这里我们创建了一个事件来和服务端的套接字绑定起来,并监听 FD_ACCEPT
和 FD_CLOSE
消息,这样就把本来占用程序时间片的 accept
交给了 OS,由它去处理,有了消息再通知我们。此处就解决了等待数据到来的这部分阻塞,这也正是事件选择模型的改进效果。
我们把每一个套接字和与之相对应的事件同步保存到数组中,这步操作很重要,因为套接字要和事件一一对应,得能通过套接字找到事件,也能通过事件找到套接字。
同样在构造函数中去调用:
CEventSelect::CEventSelect(int port)
: m_nPort(port),
m_nNumOfSock(0)
{
InitSock();
}
现在的工作只剩下处理 OS 传递的通知,这也是关键所在。
void CEventSelect::RequestHandler()
{
int posInfo, startIndex;
while (true)
{
// 1.验证是否发生了事件
posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
startIndex = posInfo - WSA_WAIT_EVENT_0; // 得到最小索引
for (int i = startIndex; i < m_nNumOfSock; ++i)
{
int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
{
continue;
}
else
{
// 2.区分事件类型
WSANETWORKEVENTS netEvents;
WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);
if (netEvents.lNetworkEvents & FD_ACCEPT) // 请求连接时
{
if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0) // Error
break;
int clntAddrSize = sizeof(m_clntAddr);
m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);
WSAEVENT newEvent = WSACreateEvent();
WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
m_hSockAddr[m_nNumOfSock] = m_clntSock;
m_hEventAddr[m_nNumOfSock] = newEvent;
m_nNumOfSock++;
printf("connected new client:%d \n", m_clntSock);
}
if (netEvents.lNetworkEvents & FD_READ) // 接收数据时
{
if (netEvents.iErrorCode[FD_READ_BIT] != 0)
break;
char buf[BUF_SIZE] = "";
int recvLen = recv(m_hSockAddr[i], buf, sizeof(buf), 0);
send(m_hSockAddr[i], buf, recvLen, 0);
}
if (netEvents.lNetworkEvents & FD_CLOSE) // 断开连接时
{
if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
break;
WSACloseEvent(m_hEventAddr[i]);
closesocket(m_hSockAddr[i]);
m_nNumOfSock--;
printf("closed client-%d...\n", m_hSockAddr[i]);
CompressSocksAndEvents(i);
}
}
}
}
}
先来看第一部分验证是否发生了事件,这主要在前 16 行。
int posInfo, startIndex;
while (true)
{
//验证是否发生了事件
posInfo = WSAWaitForMultipleEvents(m_nNumOfSock, m_hEventAddr, FALSE, WSA_INFINITE, FALSE);
startIndex = posInfo - WSA_WAIT_EVENT_0; //得到最小索引
for (int i = startIndex; i < m_nNumOfSock; ++i)
{
int sigEventIndex = WSAWaitForMultipleEvents(1, &m_hEventAddr[i], TRUE, 0, FALSE);
if (sigEventIndex == WSA_WAIT_FAILED || sigEventIndex == WSA_WAIT_TIMEOUT)
{
continue;
}
// ...
通过调用 WSAWaitForMultipleEvents
函数来等待系统通知我们,第一个参数是 m_nNumOfSock
,基中就保存着当前拥有的套接字数量,第二个参数是事件数组,这个的个数和 m_nNumOfSock
是同步的,所以也就是一样的。
此处无需等待所有的事件,只要有一个有消息了我们就返回,所以第三个参数传入 FALSE
。然后传入 WSA_INFINITE
来一直等待系统的通知。
有通知了后减去 WSA_WAIT_EVENT_0
便可得到对应的最小索引,我们再依次对最小索引后面的事件依次调用 WSAWaitForMultipleEvents
就能得到所有发生消息的事件,这里我们只监听一个事件,所以第一个参数只需传入 1 便可。接着就是一些错误检验。
现在来看第二部分,得到了发生的事件索引,现在就得区分事件类型:
WSANETWORKEVENTS netEvents;
WSAEnumNetworkEvents(m_hSockAddr[i], m_hEventAddr[i], &netEvents);
只需给它个 netEvents
用来保存类型或错误,很简单。
当有客户端请求连接时,我们处理 FD_ACCEPT
消息:
if (netEvents.lNetworkEvents & FD_ACCEPT) // 请求连接时
{
if (netEvents.iErrorCode[FD_ACCEPT_BIT] != 0) //Error
break;
int clntAddrSize = sizeof(m_clntAddr);
m_clntSock = accept(m_servSock, (SOCKADDR *)&m_clntAddr, &clntAddrSize);
WSAEVENT newEvent = WSACreateEvent();
WSAEventSelect(m_clntSock, newEvent, FD_READ | FD_CLOSE);
m_hSockAddr[m_nNumOfSock] = m_clntSock;
m_hEventAddr[m_nNumOfSock] = newEvent;
m_nNumOfSock++;
printf("connected new client:%d \n", m_clntSock);
}
只需和对应的消息进行与操作就能检验是否发生了该消息,接着再这里我们又看到了 accept
函数,因为现在肯定已经有消息了,所以 accept
就能立即处理,而不用一直傻傻地等待。
然后为每位连接进来的客户端也绑定上事件,关注 FD_READ
和 FD_CLOSE
消息,这样客户端发消息过来或退出我们就能知道了。
接收数据没什么特别的就不说了,现在来看退出消息:
if (netEvents.lNetworkEvents & FD_CLOSE) // 断开连接时
{
if (netEvents.iErrorCode[FD_CLOSE_BIT] != 0)
break;
WSACloseEvent(m_hEventAddr[i]);
closesocket(m_hSockAddr[i]);
m_nNumOfSock--;
printf("closed client-%d...\n", m_hSockAddr[i]);
CompressSocksAndEvents(i);
}
在关闭了对应的事件和套接字后,要对计数进行更新,也要在数组中删除对应的事件和套接字,这个主要是在 CompressSocksAndEvents
函数中完成的:
void CEventSelect::CompressSocksAndEvents(int index)
{
for (int i = index; i < m_nNumOfSock; ++i)
{
m_hSockAddr[i] = m_hSockAddr[i + 1];
m_hEventAddr[i] = m_hEventAddr[i + 1];
}
}
这里同步更新数组,这小算法没啥说的,现在就使用事件选择写好了数据库,我们在 Accept
函数中调用 RequestHanlder
以供用户使用就好。
可以看出事件选择主要是对套接字绑定一个事件,然后提供感兴趣的消息,交给 OS 去处理,只需经过一个短暂的阻塞(将数据从网卡缓冲区拷贝到程序的内存),就可以完成接收过程了。
这个模型稍微扩展便可以监听几百个事件,而且使用简单,对于这个规模的服务器可以使用其来开发。
但扩展太多管理可能有点麻烦,且还是存在第二部分阻塞的,所以想要支持更多的客户端还是得使用重叠 IO 和完成端口,这些下次来看。