IOCPModel.cpp 28 KB


  1. #include "StdAfx.h"
  2. #include "IOCPModel.h"
  3. //#include "MainDlg.h"
  4. // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
  5. #define WORKER_THREADS_PER_PROCESSOR 2
  6. // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
  7. #define MAX_POST_ACCEPT 10
  8. // 传递给Worker线程的退出信号
  9. #define EXIT_CODE NULL
  10. // 释放指针和句柄资源的宏
  11. // 释放指针宏
  12. #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
  13. // 释放句柄宏
  14. #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
  15. // 释放Socket宏
  16. #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
  17. CIOCPModel::CIOCPModel(void):
  18. m_nThreads(0),
  19. m_hShutdownEvent(NULL),
  20. m_hIOCompletionPort(NULL),
  21. m_phWorkerThreads(NULL),
  22. m_strIP(DEFAULT_IP),
  23. m_nPort(DEFAULT_PORT),
  24. m_pMain(NULL),
  25. m_lpfnAcceptEx( NULL ),
  26. m_pListenContext( NULL )
  27. {
  28. }
  29. CIOCPModel::~CIOCPModel(void)
  30. {
  31. // 确保资源彻底释放
  32. this->Stop();
  33. }
  34. ///////////////////////////////////////////////////////////////////
  35. // 工作者线程: 为IOCP请求服务的工作者线程
  36. // 也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程
  37. ///////////////////////////////////////////////////////////////////
  38. DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)
  39. {
  40. THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
  41. CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;
  42. int nThreadNo = (int)pParam->nThreadNo;
  43. pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
  44. OVERLAPPED *pOverlapped = NULL;
  45. PER_SOCKET_CONTEXT *pSocketContext = NULL;
  46. DWORD dwBytesTransfered = 0;
  47. // 循环处理请求,知道接收到Shutdown信息为止
  48. while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
  49. {
  50. BOOL bReturn = GetQueuedCompletionStatus(
  51. pIOCPModel->m_hIOCompletionPort,
  52. &dwBytesTransfered,
  53. (PULONG_PTR)&pSocketContext,
  54. &pOverlapped,
  55. INFINITE);
  56. // 如果收到的是退出标志,则直接退出
  57. if ( EXIT_CODE==(DWORD)pSocketContext )
  58. {
  59. break;
  60. }
  61. // 判断是否出现了错误
  62. if( !bReturn )
  63. {
  64. DWORD dwErr = GetLastError();
  65. // 显示一下提示信息
  66. if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
  67. {
  68. break;
  69. }
  70. continue;
  71. }
  72. else
  73. {
  74. // 读取传入的参数
  75. PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
  76. // 判断是否有客户端断开了
  77. if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType))
  78. {
  79. pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
  80. inet_ntoa(pSocketContext->m_ClientAddr.sin_addr),
  81. ntohs(pSocketContext->m_ClientAddr.sin_port) );
  82. // 释放掉对应的资源
  83. pIOCPModel->_RemoveContext( pSocketContext );
  84. continue;
  85. }
  86. else
  87. {
  88. switch( pIoContext->m_OpType )
  89. {
  90. // Accept
  91. case ACCEPT_POSTED:
  92. {
  93. // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求
  94. pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
  95. }
  96. break;
  97. // RECV
  98. case RECV_POSTED:
  99. {
  100. // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求
  101. pIOCPModel->_DoRecv( pSocketContext,pIoContext );
  102. }
  103. break;
  104. // SEND
  105. // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些
  106. case SEND_POSTED:
  107. {
  108. }
  109. break;
  110. default:
  111. // 不应该执行到这里
  112. TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
  113. break;
  114. } //switch
  115. }//if
  116. }//if
  117. }//while
  118. TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
  119. // 释放线程参数
  120. RELEASE(lpParam);
  121. return 0;
  122. }
  123. //====================================================================================
  124. //
  125. // 系统初始化和终止
  126. //
  127. //====================================================================================
  128. ////////////////////////////////////////////////////////////////////
  129. // 初始化WinSock 2.2
  130. bool CIOCPModel::LoadSocketLib()
  131. {
  132. WSADATA wsaData;
  133. int nResult;
  134. nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
  135. // 错误(一般都不可能出现)
  136. if (NO_ERROR != nResult)
  137. {
  138. this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));
  139. return false;
  140. }
  141. return true;
  142. }
  143. //////////////////////////////////////////////////////////////////
  144. // 启动服务器
  145. bool CIOCPModel::Start(unsigned int port)
  146. {
  147. // 初始化线程互斥量
  148. InitializeCriticalSection(&m_csContextList);
  149. // 建立系统退出的事件通知
  150. m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  151. // 初始化IOCP
  152. if (false == _InitializeIOCP())
  153. {
  154. this->_ShowMessage(_T("初始化IOCP失败!\n"));
  155. return false;
  156. }
  157. else
  158. {
  159. this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
  160. }
  161. // 初始化Socket
  162. if( false==_InitializeListenSocket(port) )
  163. {
  164. this->_ShowMessage(_T("Listen Socket初始化失败!\n"));
  165. this->_DeInitialize();
  166. return false;
  167. }
  168. else
  169. {
  170. this->_ShowMessage(_T("Listen Socket初始化完毕."));
  171. }
  172. this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
  173. return true;
  174. }
  175. ////////////////////////////////////////////////////////////////////
  176. // 开始发送系统退出消息,退出完成端口和线程资源
  177. void CIOCPModel::Stop()
  178. {
  179. if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )
  180. {
  181. // 激活关闭消息通知
  182. SetEvent(m_hShutdownEvent);
  183. for (int i = 0; i < m_nThreads; i++)
  184. {
  185. // 通知所有的完成端口操作退出
  186. PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
  187. }
  188. // 等待所有的客户端资源退出
  189. WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
  190. // 清除客户端列表信息
  191. this->_ClearContextList();
  192. // 释放其他资源
  193. this->_DeInitialize();
  194. this->_ShowMessage(_T("停止监听\n"));
  195. }
  196. }
  197. ////////////////////////////////
  198. // 初始化完成端口
  199. bool CIOCPModel::_InitializeIOCP()
  200. {
  201. // 建立第一个完成端口
  202. m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
  203. if ( NULL == m_hIOCompletionPort)
  204. {
  205. this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
  206. return false;
  207. }
  208. // 根据本机中的处理器数量,建立对应的线程数
  209. m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
  210. // 为工作者线程初始化句柄
  211. m_phWorkerThreads = new HANDLE[m_nThreads];
  212. // 根据计算出来的数量建立工作者线程
  213. DWORD nThreadID;
  214. for (int i = 0; i < m_nThreads; i++)
  215. {
  216. THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
  217. pThreadParams->pIOCPModel = this;
  218. pThreadParams->nThreadNo = i+1;
  219. m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
  220. }
  221. TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
  222. return true;
  223. }
  224. /////////////////////////////////////////////////////////////////
  225. // 初始化Socket
  226. bool CIOCPModel::_InitializeListenSocket(unsigned int port)
  227. {
  228. // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针
  229. GUID GuidAcceptEx = WSAID_ACCEPTEX;
  230. GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
  231. // 服务器地址信息,用于绑定Socket
  232. struct sockaddr_in ServerAddress;
  233. // 生成用于监听的Socket的信息
  234. m_pListenContext = new PER_SOCKET_CONTEXT;
  235. // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作
  236. m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  237. if (INVALID_SOCKET == m_pListenContext->m_Socket)
  238. {
  239. this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"), WSAGetLastError());
  240. return false;
  241. }
  242. else
  243. {
  244. TRACE("WSASocket() 完成.\n");
  245. }
  246. // 将Listen Socket绑定至完成端口中
  247. if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
  248. {
  249. this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"), WSAGetLastError());
  250. RELEASE_SOCKET( m_pListenContext->m_Socket );
  251. return false;
  252. }
  253. else
  254. {
  255. TRACE("Listen Socket绑定完成端口 完成.\n");
  256. }
  257. // 填充地址信息
  258. ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
  259. ServerAddress.sin_family = AF_INET;
  260. // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址
  261. ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
  262. //ServerAddress.sin_addr.s_addr = inet_addr(m_strIP.GetString());
  263. ServerAddress.sin_port = htons(port);
  264. // 绑定地址和端口
  265. if (SOCKET_ERROR == ::bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
  266. {
  267. this->_ShowMessage(_T("bind()函数执行错误! 错误代码: %d/n"), WSAGetLastError());
  268. return false;
  269. }
  270. else
  271. {
  272. TRACE("bind() 完成.\n");
  273. }
  274. // 开始进行监听
  275. if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
  276. {
  277. this->_ShowMessage(_T("Listen()函数执行出现错误; 错误代码: %d/n"), WSAGetLastError());
  278. return false;
  279. }
  280. else
  281. {
  282. TRACE("Listen() 完成.\n");
  283. }
  284. // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数
  285. // 所以需要额外获取一下函数的指针,
  286. // 获取AcceptEx函数指针
  287. DWORD dwBytes = 0;
  288. if(SOCKET_ERROR == WSAIoctl(
  289. m_pListenContext->m_Socket,
  290. SIO_GET_EXTENSION_FUNCTION_POINTER,
  291. &GuidAcceptEx,
  292. sizeof(GuidAcceptEx),
  293. &m_lpfnAcceptEx,
  294. sizeof(m_lpfnAcceptEx),
  295. &dwBytes,
  296. NULL,
  297. NULL))
  298. {
  299. this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"), WSAGetLastError());
  300. this->_DeInitialize();
  301. return false;
  302. }
  303. // 获取GetAcceptExSockAddrs函数指针,也是同理
  304. if(SOCKET_ERROR == WSAIoctl(
  305. m_pListenContext->m_Socket,
  306. SIO_GET_EXTENSION_FUNCTION_POINTER,
  307. &GuidGetAcceptExSockAddrs,
  308. sizeof(GuidGetAcceptExSockAddrs),
  309. &m_lpfnGetAcceptExSockAddrs,
  310. sizeof(m_lpfnGetAcceptExSockAddrs),
  311. &dwBytes,
  312. NULL,
  313. NULL))
  314. {
  315. this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), WSAGetLastError());
  316. this->_DeInitialize();
  317. return false;
  318. }
  319. // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求
  320. for( int i=0;i<MAX_POST_ACCEPT;i++ )
  321. {
  322. // 新建一个IO_CONTEXT
  323. PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
  324. if( false==this->_PostAccept( pAcceptIoContext ) )
  325. {
  326. m_pListenContext->RemoveContext(pAcceptIoContext);
  327. return false;
  328. }
  329. }
  330. this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );
  331. return true;
  332. }
  333. ////////////////////////////////////////////////////////////
  334. // 最后释放掉所有资源
  335. void CIOCPModel::_DeInitialize()
  336. {
  337. // 删除客户端列表的互斥量
  338. DeleteCriticalSection(&m_csContextList);
  339. // 关闭系统退出事件句柄
  340. RELEASE_HANDLE(m_hShutdownEvent);
  341. // 释放工作者线程句柄指针
  342. for( int i=0;i<m_nThreads;i++ )
  343. {
  344. RELEASE_HANDLE(m_phWorkerThreads[i]);
  345. }
  346. RELEASE(m_phWorkerThreads);
  347. // 关闭IOCP句柄
  348. RELEASE_HANDLE(m_hIOCompletionPort);
  349. // 关闭监听Socket
  350. RELEASE(m_pListenContext);
  351. this->_ShowMessage(_T("释放资源完毕.\n"));
  352. }
  353. //====================================================================================
  354. //
  355. // 投递完成端口请求
  356. //
  357. //====================================================================================
  358. //////////////////////////////////////////////////////////////////
  359. // 投递Accept请求
  360. bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
  361. {
  362. ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
  363. // 准备参数
  364. DWORD dwBytes = 0;
  365. pAcceptIoContext->m_OpType = ACCEPT_POSTED;
  366. WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
  367. OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
  368. // 为以后新连入的客户端先准备好Socket( 这个是与传统accept最大的区别 )
  369. pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  370. if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
  371. {
  372. _ShowMessage(_T("创建用于Accept的Socket失败!错误代码: %d"), WSAGetLastError());
  373. return false;
  374. }
  375. // 投递AcceptEx
  376. if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
  377. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
  378. {
  379. if(WSA_IO_PENDING != WSAGetLastError())
  380. {
  381. _ShowMessage(_T("投递 AcceptEx 请求失败,错误代码: %d"), WSAGetLastError());
  382. return false;
  383. }
  384. }
  385. return true;
  386. }
  387. ////////////////////////////////////////////////////////////
  388. // 在有客户端连入的时候,进行处理
  389. // 流程有点复杂,你要是看不懂的话,就看配套的文档吧....
  390. // 如果能理解这里的话,完成端口的机制你就消化了一大半了
  391. // 总之你要知道,传入的是ListenSocket的Context,我们需要复制一份出来给新连入的Socket用
  392. // 原来的Context还是要在上面继续投递下一个Accept请求
  393. //
  394. bool CIOCPModel::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  395. {
  396. SOCKADDR_IN* ClientAddr = NULL;
  397. SOCKADDR_IN* LocalAddr = NULL;
  398. int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
  399. ///////////////////////////////////////////////////////////////////////////
  400. // 1. 首先取得连入客户端的地址信息
  401. // 这个 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~
  402. // 不但可以取得客户端和本地端的地址信息,还能顺便取出客户端发来的第一组数据
  403. this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
  404. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
  405. _RecvProcess(pSocketContext, pIoContext);
  406. this->_ShowMessage( _T("客户端 %s:%d 连入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) );
  407. //this->_ShowMessage( _T("客户额 %s:%d 信息:%s."),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf );
  408. //if ( Global::g_bEnableLog )
  409. //{
  410. // // 解析Json字符串;
  411. // Global::TLog tlog;
  412. // cJSON *pJson = cJSON_Parse(Global::DeCode_URLUNICODE(pIoContext->m_wsaBuf.buf).c_str());
  413. // if ( pJson )
  414. // {
  415. // tlog.report_type = cJSON_GetObjectItem(pJson, _T("ReportType")) ? cJSON_GetObjectItem(pJson, _T("ReportType"))->valuestring : "";
  416. // tlog.report_data = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valuestring : "";
  417. // if ( _tcscmp(tlog.report_type.c_str(), _T("printLog")) == 0 )
  418. // {
  419. // Global::WritePythonLog(tlog.report_data.c_str());
  420. // Global::g_time = time(NULL);
  421. // Global::g_lastTime = COleDateTime::GetCurrentTime();
  422. // }
  423. // cJSON_Delete(pJson);
  424. // pJson = NULL;
  425. // }
  426. //}
  427. //////////////////////////////////////////////////////////////////////////////////////////////////////
  428. // 2. 这里需要注意,这里传入的这个是ListenSocket上的Context,这个Context我们还需要用于监听下一个连接
  429. // 所以我还得要将ListenSocket上的Context复制出来一份为新连入的Socket新建一个SocketContext
  430. PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
  431. pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
  432. memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
  433. // 参数设置完毕,将这个Socket和完成端口绑定(这也是一个关键步骤)
  434. if( false==this->_AssociateWithIOCP( pNewSocketContext ) )
  435. {
  436. RELEASE( pNewSocketContext );
  437. return false;
  438. }
  439. ///////////////////////////////////////////////////////////////////////////////////////////////////
  440. // 3. 继续,建立其下的IoContext,用于在这个Socket上投递第一个Recv数据请求
  441. PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
  442. pNewIoContext->m_OpType = RECV_POSTED;
  443. pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
  444. // 如果Buffer需要保留,就自己拷贝一份出来
  445. //memcpy( pNewIoContext->m_szBuffer,pIoContext->m_szBuffer,MAX_BUFFER_LEN );
  446. // 绑定完毕之后,就可以开始在这个Socket上投递完成请求了
  447. if( false==this->_PostRecv( pNewIoContext) )
  448. {
  449. pNewSocketContext->RemoveContext( pNewIoContext );
  450. return false;
  451. }
  452. /////////////////////////////////////////////////////////////////////////////////////////////////
  453. // 4. 如果投递成功,那么就把这个有效的客户端信息,加入到ContextList中去(需要统一管理,方便释放资源)
  454. this->_AddToContextList( pNewSocketContext );
  455. ////////////////////////////////////////////////////////////////////////////////////////////////
  456. // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx
  457. pIoContext->ResetBuffer();
  458. return this->_PostAccept( pIoContext );
  459. }
  460. ////////////////////////////////////////////////////////////////////
  461. // 投递接收数据请求
  462. bool CIOCPModel::_PostRecv( PER_IO_CONTEXT* pIoContext )
  463. {
  464. // 初始化变量
  465. DWORD dwFlags = 0;
  466. DWORD dwBytes = 0;
  467. WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
  468. OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
  469. pIoContext->ResetBuffer();
  470. pIoContext->m_OpType = RECV_POSTED;
  471. // 初始化完成后,,投递WSARecv请求
  472. int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
  473. // 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了
  474. if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
  475. {
  476. this->_ShowMessage(_T("投递第一个WSARecv失败!"));
  477. return false;
  478. }
  479. return true;
  480. }
  481. /////////////////////////////////////////////////////////////////
  482. // 在有接收的数据到达的时候,进行处理
  483. bool CIOCPModel::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  484. {
  485. // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求
  486. SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
  487. //this->_ShowMessage( _T("收到 %s:%d 信息:%s"),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port), pIoContext->m_wsaBuf.buf );
  488. _RecvProcess(pSocketContext, pIoContext);
  489. //if ( Global::g_bEnableLog )
  490. //{
  491. // // 解析Json字符串;
  492. // Global::TLog tlog;
  493. // cJSON *pJson = cJSON_Parse(Global::DeCode_URLUNICODE(pIoContext->m_wsaBuf.buf).c_str());
  494. // if ( pJson )
  495. // {
  496. // tlog.report_type = cJSON_GetObjectItem(pJson, _T("ReportType")) ? cJSON_GetObjectItem(pJson, _T("ReportType"))->valuestring : "";
  497. // tlog.report_data = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valuestring : "";
  498. // if ( _tcscmp(tlog.report_type.c_str(), _T("printLog")) == 0 )
  499. // {
  500. // Global::WritePythonLog(tlog.report_data.c_str());
  501. // Global::g_time = time(NULL);
  502. // Global::g_lastTime = COleDateTime::GetCurrentTime();
  503. // }
  504. // cJSON_Delete(pJson);
  505. // pJson = NULL;
  506. // }
  507. //
  508. //}
  509. // 然后开始投递下一个WSARecv请求
  510. return _PostRecv( pIoContext );
  511. }
  512. /////////////////////////////////////////////////////
  513. // 将句柄(Socket)绑定到完成端口中
  514. bool CIOCPModel::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext )
  515. {
  516. // 将用于和客户端通信的SOCKET绑定到完成端口中
  517. HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
  518. if (NULL == hTemp)
  519. {
  520. this->_ShowMessage(_T("执行CreateIoCompletionPort()出现错误.错误代码:%d"),GetLastError());
  521. return false;
  522. }
  523. return true;
  524. }
  525. //====================================================================================
  526. //
  527. // ContextList 相关操作
  528. //
  529. //====================================================================================
  530. //////////////////////////////////////////////////////////////
  531. // 将客户端的相关信息存储到数组中
  532. void CIOCPModel::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData )
  533. {
  534. EnterCriticalSection(&m_csContextList);
  535. m_arrayClientContext.Add(pHandleData);
  536. LeaveCriticalSection(&m_csContextList);
  537. }
  538. ////////////////////////////////////////////////////////////////
  539. // 移除某个特定的Context
  540. void CIOCPModel::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext )
  541. {
  542. EnterCriticalSection(&m_csContextList);
  543. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  544. {
  545. if( pSocketContext==m_arrayClientContext.GetAt(i) )
  546. {
  547. RELEASE( pSocketContext );
  548. m_arrayClientContext.RemoveAt(i);
  549. break;
  550. }
  551. }
  552. LeaveCriticalSection(&m_csContextList);
  553. }
  554. ////////////////////////////////////////////////////////////////
  555. // 清空客户端信息
  556. void CIOCPModel::_ClearContextList()
  557. {
  558. EnterCriticalSection(&m_csContextList);
  559. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  560. {
  561. delete m_arrayClientContext.GetAt(i);
  562. }
  563. m_arrayClientContext.RemoveAll();
  564. LeaveCriticalSection(&m_csContextList);
  565. }
  566. //====================================================================================
  567. //
  568. // 其他辅助函数定义
  569. //
  570. //====================================================================================
  571. ////////////////////////////////////////////////////////////////////
  572. // 获得本机的IP地址
  573. CString CIOCPModel::GetLocalIP()
  574. {
  575. // 获得本机主机名
  576. char hostname[MAX_PATH] = {0};
  577. gethostname(hostname,MAX_PATH);
  578. struct hostent FAR* lpHostEnt = gethostbyname(hostname);
  579. if(lpHostEnt == NULL)
  580. {
  581. return DEFAULT_IP;
  582. }
  583. // 取得IP地址列表中的第一个为返回的IP(因为一台主机可能会绑定多个IP)
  584. LPSTR lpAddr = lpHostEnt->h_addr_list[0];
  585. // 将IP地址转化成字符串形式
  586. struct in_addr inAddr;
  587. memmove(&inAddr,lpAddr,4);
  588. m_strIP = CString( inet_ntoa(inAddr) );
  589. return m_strIP;
  590. }
  591. ///////////////////////////////////////////////////////////////////
  592. // 获得本机中处理器的数量
  593. int CIOCPModel::_GetNoOfProcessors()
  594. {
  595. SYSTEM_INFO si;
  596. GetSystemInfo(&si);
  597. return si.dwNumberOfProcessors;
  598. }
  599. /////////////////////////////////////////////////////////////////////
  600. // 在主界面中显示提示信息
  601. void CIOCPModel::_ShowMessage(const CString szFormat,...) const
  602. {
  603. // 根据传入的参数格式化字符串
  604. CString strMessage;
  605. va_list arglist;
  606. // 处理变长参数
  607. va_start(arglist, szFormat);
  608. strMessage.FormatV(szFormat,arglist);
  609. va_end(arglist);
  610. #if 0
  611. // 在主界面中显示
  612. CMainDlg* pMain = (CMainDlg*)m_pMain;
  613. if( m_pMain!=NULL )
  614. {
  615. pMain->AddInformation(strMessage);
  616. TRACE( strMessage+_T("\n") );
  617. }
  618. #else
  619. Global::WriteTextLog(strMessage);
  620. #endif
  621. }
  622. #define PAK_LEN sizeof(ProHeader)
  623. void CIOCPModel::_RecvProcess(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
  624. {
  625. // 小于包头;
  626. ProHeader* phead = NULL;
  627. if (pSocketContext->lastData.size() == 0)
  628. {
  629. // 不足包头;
  630. if (PAK_LEN > pIoContext->m_Overlapped.InternalHigh)
  631. {
  632. OutputDebugString("A:不足包头;\n");
  633. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  634. }
  635. else
  636. {
  637. phead = (ProHeader*)pIoContext->m_wsaBuf.buf;
  638. // 完整的包;
  639. if (phead->len == pIoContext->m_Overlapped.InternalHigh)
  640. {
  641. OutputDebugString("A:完整的包;\n");
  642. _DeviceProc(pIoContext, (Package*)pIoContext->m_wsaBuf.buf);
  643. }
  644. // 小包;
  645. else if (phead->len > pIoContext->m_Overlapped.InternalHigh)
  646. {
  647. OutputDebugString("A:小包;\n");
  648. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  649. }
  650. // 超包;
  651. else if (phead->len < pIoContext->m_Overlapped.InternalHigh)
  652. {
  653. OutputDebugString("A:超包;\n");
  654. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + phead->len, pIoContext->m_Overlapped.InternalHigh - phead->len);
  655. _DeviceProc(pIoContext, (Package*)pIoContext->m_wsaBuf.buf);
  656. }
  657. }
  658. }
  659. else
  660. {
  661. int lastlen = pIoContext->m_Overlapped.InternalHigh;
  662. if (pSocketContext->lastData.size() >= PAK_LEN)
  663. {
  664. phead = (ProHeader*)pSocketContext->lastData.data();
  665. if (phead->len <= pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh)
  666. {
  667. if ( phead->len <= pSocketContext->lastData.size() )
  668. {
  669. OutputDebugString("C:超包;\n");
  670. // 完整包;
  671. _DeviceProc(pIoContext, (Package*)pSocketContext->lastData.substr(0, phead->len).data());
  672. pSocketContext->lastData = pSocketContext->lastData.substr(phead->len);
  673. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  674. }
  675. else
  676. {
  677. OutputDebugString("D:超包;\n");
  678. lastlen = pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh - phead->len;
  679. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh - lastlen);
  680. // 完整包;
  681. _DeviceProc(pIoContext, (Package*)pSocketContext->lastData.data());
  682. // 剩余包;
  683. pSocketContext->lastData.clear();
  684. if (lastlen)
  685. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - lastlen, lastlen);
  686. }
  687. }
  688. else
  689. {
  690. OutputDebugString("C:仍不足一个包;\n");
  691. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  692. }
  693. }
  694. else
  695. {
  696. // 包头剩余长度;
  697. int diflen = PAK_LEN - pSocketContext->lastData.size();
  698. // 仍不足一个包头;
  699. if ( diflen > pIoContext->m_Overlapped.InternalHigh )
  700. {
  701. OutputDebugString("B:仍不足一个包头;\n");
  702. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  703. }
  704. else
  705. {
  706. // 拼成完整包头;
  707. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, diflen);
  708. phead = (ProHeader*)pSocketContext->lastData.data();
  709. // 完整包;
  710. if ( phead->len == PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen )
  711. {
  712. OutputDebugString("B:完整包;\n");
  713. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  714. _DeviceProc(pIoContext, (Package*)pSocketContext->lastData.data());
  715. pSocketContext->lastData.clear();
  716. }
  717. // 小包;
  718. else if ( phead->len > PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  719. {
  720. OutputDebugString("B:小包;\n");
  721. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  722. }
  723. // 超包;
  724. else if (phead->len < PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  725. {
  726. OutputDebugString("B:超包;\n");
  727. // 组完成包;
  728. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, phead->len - PAK_LEN);
  729. _DeviceProc(pIoContext, (Package*)pSocketContext->lastData.data());
  730. pSocketContext->lastData.clear();
  731. int last = pIoContext->m_Overlapped.InternalHigh - diflen - phead->len + PAK_LEN;
  732. if (last)
  733. {
  734. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - last, last);
  735. }
  736. }
  737. }
  738. }
  739. }
  740. }
  741. void CIOCPModel::_DeviceProc(PER_IO_CONTEXT* pIoContext, Package* pak)
  742. {
  743. if ( pak->header.version != 0xAA )
  744. {
  745. return;
  746. }
  747. cJSON* pJson = cJSON_Parse((const char*)&pak->buf);
  748. if ( pJson == NULL )
  749. {
  750. return;
  751. }
  752. RequesJson reqj;
  753. reqj.device_id = cJSON_GetObjectItem(pJson, "device_id") ? cJSON_GetObjectItem(pJson, "device_id")->valueint : 0;
  754. reqj.device_name = cJSON_GetObjectItem(pJson, "device_name") ? cJSON_GetObjectItem(pJson, "device_name")->valuestring : "";
  755. reqj.device_cmd = cJSON_GetObjectItem(pJson, "device_cmd") ? cJSON_GetObjectItem(pJson, "device_cmd")->valuestring : "";
  756. if ( pJson )
  757. {
  758. cJSON_Delete(pJson);
  759. pJson = NULL;
  760. }
  761. auto iter = g_dmap.find(reqj.device_id);
  762. if ( iter == g_dmap.end())
  763. {
  764. return;
  765. }
  766. // 向串口发送指令;
  767. auto dev = iter->second;
  768. std::string readdata = dev->SendCommond(reqj.device_cmd);
  769. ResponseJson repj;
  770. repj.device_id = reqj.device_id;
  771. repj.device_name = reqj.device_name;
  772. repj.device_cmd_result = readdata;
  773. pJson = cJSON_CreateObject();
  774. cJSON_AddNumberToObject(pJson, "device_id", repj.device_id);
  775. cJSON_AddStringToObject(pJson, "device_name", repj.device_name.c_str());
  776. cJSON_AddStringToObject(pJson, "device_cmd_result", repj.device_cmd_result.c_str());
  777. char* pjdata = cJSON_Print(pJson);
  778. byte* sdata = new byte[strlen(pjdata) + PAK_LEN];
  779. Package* reponse_pak = (Package*)sdata;
  780. reponse_pak->header.version = 0xAB;
  781. reponse_pak->header.len = strlen(pjdata) + PAK_LEN;
  782. memcpy(reponse_pak->buf, pjdata, strlen(pjdata));
  783. if (pjdata)
  784. free(pjdata);
  785. pjdata = NULL;
  786. cJSON_Delete(pJson);
  787. int ret = send(pIoContext->m_sockAccept, (const char*)sdata, reponse_pak->header.len, 0);
  788. delete[]sdata;
  789. sdata = NULL;
  790. if ( ret == -1 )
  791. {
  792. DWORD dwEr = GetLastError();
  793. }
  794. }
  795. /////////////////////////////////////////////////////////////////////
  796. // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常
  797. // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值
  798. // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
  799. bool CIOCPModel::_IsSocketAlive(SOCKET s)
  800. {
  801. int nByteSent=send(s,"",0,0);
  802. if (-1 == nByteSent) return false;
  803. return true;
  804. }
  805. ///////////////////////////////////////////////////////////////////
  806. // 显示并处理完成端口上的错误
  807. bool CIOCPModel::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr )
  808. {
  809. // 如果是超时了,就再继续等吧
  810. if(WAIT_TIMEOUT == dwErr)
  811. {
  812. // 确认客户端是否还活着...
  813. if( !_IsSocketAlive( pContext->m_Socket) )
  814. {
  815. this->_ShowMessage( _T("检测到客户端异常退出!") );
  816. this->_RemoveContext( pContext );
  817. return true;
  818. }
  819. else
  820. {
  821. this->_ShowMessage( _T("网络操作超时!重试中...") );
  822. return true;
  823. }
  824. }
  825. // 可能是客户端异常退出了
  826. else if( ERROR_NETNAME_DELETED==dwErr )
  827. {
  828. this->_ShowMessage( _T("检测到客户端异常退出!") );
  829. this->_RemoveContext( pContext );
  830. return true;
  831. }
  832. else
  833. {
  834. this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
  835. return false;
  836. }
  837. }