IOCPModel.cpp 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097
  1. #include "StdAfx.h"
  2. #include "IOCPModel.h"
  3. //#include "MainDlg.h"
  4. #include "MemoryDef.h"
  5. // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
  6. #define WORKER_THREADS_PER_PROCESSOR 2
  7. // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
  8. #define MAX_POST_ACCEPT 10
  9. // 传递给Worker线程的退出信号
  10. #define EXIT_CODE NULL
  11. // 释放指针和句柄资源的宏
  12. // 释放指针宏
  13. #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
  14. // 释放句柄宏
  15. #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
  16. // 释放Socket宏
  17. #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
  18. CIOCPModel::CIOCPModel(void):
  19. m_nThreads(0),
  20. m_hShutdownEvent(NULL),
  21. m_hIOCompletionPort(NULL),
  22. m_phWorkerThreads(NULL),
  23. m_strIP(DEFAULT_IP),
  24. m_nPort(DEFAULT_PORT),
  25. m_pMain(NULL),
  26. m_lpfnAcceptEx( NULL ),
  27. m_pListenContext( NULL )
  28. {
  29. }
  30. CIOCPModel::~CIOCPModel(void)
  31. {
  32. // 确保资源彻底释放
  33. this->Stop();
  34. }
  35. ///////////////////////////////////////////////////////////////////
  36. // 工作者线程: 为IOCP请求服务的工作者线程
  37. // 也就是每当完成端口上出现了完成数据包,就将之取出来进行处理的线程
  38. ///////////////////////////////////////////////////////////////////
  39. DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)
  40. {
  41. THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
  42. CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;
  43. int nThreadNo = (int)pParam->nThreadNo;
  44. pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
  45. OVERLAPPED *pOverlapped = NULL;
  46. PER_SOCKET_CONTEXT *pSocketContext = NULL;
  47. DWORD dwBytesTransfered = 0;
  48. // 循环处理请求,知道接收到Shutdown信息为止
  49. while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
  50. {
  51. BOOL bReturn = GetQueuedCompletionStatus(
  52. pIOCPModel->m_hIOCompletionPort,
  53. &dwBytesTransfered,
  54. (PULONG_PTR)&pSocketContext,
  55. &pOverlapped,
  56. INFINITE);
  57. // 如果收到的是退出标志,则直接退出
  58. if ( EXIT_CODE==(DWORD)pSocketContext )
  59. {
  60. break;
  61. }
  62. // 判断是否出现了错误
  63. if( !bReturn )
  64. {
  65. DWORD dwErr = GetLastError();
  66. // 显示一下提示信息
  67. if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
  68. {
  69. break;
  70. }
  71. continue;
  72. }
  73. else
  74. {
  75. // 读取传入的参数
  76. PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
  77. // 判断是否有客户端断开了
  78. if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType))
  79. {
  80. pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),
  81. inet_ntoa(pSocketContext->m_ClientAddr.sin_addr),
  82. ntohs(pSocketContext->m_ClientAddr.sin_port) );
  83. // 释放掉对应的资源
  84. pIOCPModel->_RemoveContext( pSocketContext );
  85. continue;
  86. }
  87. else
  88. {
  89. switch( pIoContext->m_OpType )
  90. {
  91. // Accept
  92. case ACCEPT_POSTED:
  93. {
  94. // 为了增加代码可读性,这里用专门的_DoAccept函数进行处理连入请求
  95. pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
  96. }
  97. break;
  98. // RECV
  99. case RECV_POSTED:
  100. {
  101. // 为了增加代码可读性,这里用专门的_DoRecv函数进行处理接收请求
  102. pIOCPModel->_DoRecv( pSocketContext,pIoContext );
  103. }
  104. break;
  105. // SEND
  106. // 这里略过不写了,要不代码太多了,不容易理解,Send操作相对来讲简单一些
  107. case SEND_POSTED:
  108. {
  109. }
  110. break;
  111. default:
  112. // 不应该执行到这里
  113. TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
  114. break;
  115. } //switch
  116. }//if
  117. }//if
  118. }//while
  119. TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
  120. // 释放线程参数
  121. RELEASE(lpParam);
  122. return 0;
  123. }
  124. //====================================================================================
  125. //
  126. // 系统初始化和终止
  127. //
  128. //====================================================================================
  129. ////////////////////////////////////////////////////////////////////
  130. // 初始化WinSock 2.2
  131. bool CIOCPModel::LoadSocketLib()
  132. {
  133. WSADATA wsaData;
  134. int nResult;
  135. nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
  136. // 错误(一般都不可能出现)
  137. if (NO_ERROR != nResult)
  138. {
  139. this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));
  140. return false;
  141. }
  142. return true;
  143. }
  144. //////////////////////////////////////////////////////////////////
  145. // 启动服务器
  146. bool CIOCPModel::Start(unsigned int port)
  147. {
  148. // 初始化线程互斥量
  149. InitializeCriticalSection(&m_csContextList);
  150. // 建立系统退出的事件通知
  151. m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  152. // 初始化IOCP
  153. if (false == _InitializeIOCP())
  154. {
  155. this->_ShowMessage(_T("初始化IOCP失败!\n"));
  156. return false;
  157. }
  158. else
  159. {
  160. this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
  161. }
  162. // 初始化Socket
  163. if( false==_InitializeListenSocket(port) )
  164. {
  165. this->_ShowMessage(_T("Listen Socket初始化失败!\n"));
  166. this->_DeInitialize();
  167. return false;
  168. }
  169. else
  170. {
  171. this->_ShowMessage(_T("Listen Socket初始化完毕."));
  172. }
  173. this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
  174. return true;
  175. }
  176. ////////////////////////////////////////////////////////////////////
  177. // 开始发送系统退出消息,退出完成端口和线程资源
  178. void CIOCPModel::Stop()
  179. {
  180. if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )
  181. {
  182. // 激活关闭消息通知
  183. SetEvent(m_hShutdownEvent);
  184. for (int i = 0; i < m_nThreads; i++)
  185. {
  186. // 通知所有的完成端口操作退出
  187. PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
  188. }
  189. // 等待所有的客户端资源退出
  190. WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
  191. // 清除客户端列表信息
  192. this->_ClearContextList();
  193. // 释放其他资源
  194. this->_DeInitialize();
  195. this->_ShowMessage(_T("停止监听\n"));
  196. }
  197. }
  198. ////////////////////////////////
  199. // 初始化完成端口
  200. bool CIOCPModel::_InitializeIOCP()
  201. {
  202. // 建立第一个完成端口
  203. m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
  204. if ( NULL == m_hIOCompletionPort)
  205. {
  206. this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
  207. return false;
  208. }
  209. // 根据本机中的处理器数量,建立对应的线程数
  210. m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
  211. // 为工作者线程初始化句柄
  212. m_phWorkerThreads = new HANDLE[m_nThreads];
  213. // 根据计算出来的数量建立工作者线程
  214. DWORD nThreadID;
  215. for (int i = 0; i < m_nThreads; i++)
  216. {
  217. THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
  218. pThreadParams->pIOCPModel = this;
  219. pThreadParams->nThreadNo = i+1;
  220. m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
  221. }
  222. TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
  223. return true;
  224. }
  225. /////////////////////////////////////////////////////////////////
  226. // 初始化Socket
  227. bool CIOCPModel::_InitializeListenSocket(unsigned int port)
  228. {
  229. // AcceptEx 和 GetAcceptExSockaddrs 的GUID,用于导出函数指针
  230. GUID GuidAcceptEx = WSAID_ACCEPTEX;
  231. GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
  232. // 服务器地址信息,用于绑定Socket
  233. struct sockaddr_in ServerAddress;
  234. // 生成用于监听的Socket的信息
  235. m_pListenContext = new PER_SOCKET_CONTEXT;
  236. // 需要使用重叠IO,必须得使用WSASocket来建立Socket,才可以支持重叠IO操作
  237. m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  238. if (INVALID_SOCKET == m_pListenContext->m_Socket)
  239. {
  240. this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"), WSAGetLastError());
  241. return false;
  242. }
  243. else
  244. {
  245. TRACE("WSASocket() 完成.\n");
  246. }
  247. // 将Listen Socket绑定至完成端口中
  248. if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
  249. {
  250. this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"), WSAGetLastError());
  251. RELEASE_SOCKET( m_pListenContext->m_Socket );
  252. return false;
  253. }
  254. else
  255. {
  256. TRACE("Listen Socket绑定完成端口 完成.\n");
  257. }
  258. // 填充地址信息
  259. ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
  260. ServerAddress.sin_family = AF_INET;
  261. // 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址
  262. ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
  263. //ServerAddress.sin_addr.s_addr = inet_addr(m_strIP.GetString());
  264. ServerAddress.sin_port = htons(port);
  265. // 绑定地址和端口
  266. if (SOCKET_ERROR == ::bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
  267. {
  268. this->_ShowMessage(_T("bind()函数执行错误! 错误代码: %d/n"), WSAGetLastError());
  269. return false;
  270. }
  271. else
  272. {
  273. TRACE("bind() 完成.\n");
  274. }
  275. // 开始进行监听
  276. if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
  277. {
  278. this->_ShowMessage(_T("Listen()函数执行出现错误; 错误代码: %d/n"), WSAGetLastError());
  279. return false;
  280. }
  281. else
  282. {
  283. TRACE("Listen() 完成.\n");
  284. }
  285. // 使用AcceptEx函数,因为这个是属于WinSock2规范之外的微软另外提供的扩展函数
  286. // 所以需要额外获取一下函数的指针,
  287. // 获取AcceptEx函数指针
  288. DWORD dwBytes = 0;
  289. if(SOCKET_ERROR == WSAIoctl(
  290. m_pListenContext->m_Socket,
  291. SIO_GET_EXTENSION_FUNCTION_POINTER,
  292. &GuidAcceptEx,
  293. sizeof(GuidAcceptEx),
  294. &m_lpfnAcceptEx,
  295. sizeof(m_lpfnAcceptEx),
  296. &dwBytes,
  297. NULL,
  298. NULL))
  299. {
  300. this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"), WSAGetLastError());
  301. this->_DeInitialize();
  302. return false;
  303. }
  304. // 获取GetAcceptExSockAddrs函数指针,也是同理
  305. if(SOCKET_ERROR == WSAIoctl(
  306. m_pListenContext->m_Socket,
  307. SIO_GET_EXTENSION_FUNCTION_POINTER,
  308. &GuidGetAcceptExSockAddrs,
  309. sizeof(GuidGetAcceptExSockAddrs),
  310. &m_lpfnGetAcceptExSockAddrs,
  311. sizeof(m_lpfnGetAcceptExSockAddrs),
  312. &dwBytes,
  313. NULL,
  314. NULL))
  315. {
  316. this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), WSAGetLastError());
  317. this->_DeInitialize();
  318. return false;
  319. }
  320. // 为AcceptEx 准备参数,然后投递AcceptEx I/O请求
  321. for( int i=0;i<MAX_POST_ACCEPT;i++ )
  322. {
  323. // 新建一个IO_CONTEXT
  324. PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
  325. if( false==this->_PostAccept( pAcceptIoContext ) )
  326. {
  327. m_pListenContext->RemoveContext(pAcceptIoContext);
  328. return false;
  329. }
  330. }
  331. this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );
  332. return true;
  333. }
  334. ////////////////////////////////////////////////////////////
  335. // 最后释放掉所有资源
  336. void CIOCPModel::_DeInitialize()
  337. {
  338. // 删除客户端列表的互斥量
  339. DeleteCriticalSection(&m_csContextList);
  340. // 关闭系统退出事件句柄
  341. RELEASE_HANDLE(m_hShutdownEvent);
  342. // 释放工作者线程句柄指针
  343. for( int i=0;i<m_nThreads;i++ )
  344. {
  345. RELEASE_HANDLE(m_phWorkerThreads[i]);
  346. }
  347. RELEASE(m_phWorkerThreads);
  348. // 关闭IOCP句柄
  349. RELEASE_HANDLE(m_hIOCompletionPort);
  350. // 关闭监听Socket
  351. RELEASE(m_pListenContext);
  352. this->_ShowMessage(_T("释放资源完毕.\n"));
  353. }
  354. //====================================================================================
  355. //
  356. // 投递完成端口请求
  357. //
  358. //====================================================================================
  359. //////////////////////////////////////////////////////////////////
  360. // 投递Accept请求
  361. bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
  362. {
  363. ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
  364. // 准备参数
  365. DWORD dwBytes = 0;
  366. pAcceptIoContext->m_OpType = ACCEPT_POSTED;
  367. WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
  368. OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
  369. // 为以后新连入的客户端先准备好Socket( 这个是与传统accept最大的区别 )
  370. pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
  371. if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
  372. {
  373. _ShowMessage(_T("创建用于Accept的Socket失败!错误代码: %d"), WSAGetLastError());
  374. return false;
  375. }
  376. // 投递AcceptEx
  377. if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
  378. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
  379. {
  380. if(WSA_IO_PENDING != WSAGetLastError())
  381. {
  382. _ShowMessage(_T("投递 AcceptEx 请求失败,错误代码: %d"), WSAGetLastError());
  383. return false;
  384. }
  385. }
  386. return true;
  387. }
  388. ////////////////////////////////////////////////////////////
  389. // 在有客户端连入的时候,进行处理
  390. // 流程有点复杂,你要是看不懂的话,就看配套的文档吧....
  391. // 如果能理解这里的话,完成端口的机制你就消化了一大半了
  392. // 总之你要知道,传入的是ListenSocket的Context,我们需要复制一份出来给新连入的Socket用
  393. // 原来的Context还是要在上面继续投递下一个Accept请求
  394. //
  395. bool CIOCPModel::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  396. {
  397. SOCKADDR_IN* ClientAddr = NULL;
  398. SOCKADDR_IN* LocalAddr = NULL;
  399. int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
  400. ///////////////////////////////////////////////////////////////////////////
  401. // 1. 首先取得连入客户端的地址信息
  402. // 这个 m_lpfnGetAcceptExSockAddrs 不得了啊~~~~~~
  403. // 不但可以取得客户端和本地端的地址信息,还能顺便取出客户端发来的第一组数据
  404. this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
  405. sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
  406. _RecvProcess(pSocketContext, pIoContext);
  407. this->_ShowMessage( _T("客户端 %s:%d 连入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) );
  408. //this->_ShowMessage( _T("客户额 %s:%d 信息:%s."),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port),pIoContext->m_wsaBuf.buf );
  409. //if ( Global::g_bEnableLog )
  410. //{
  411. // // 解析Json字符串;
  412. // Global::TLog tlog;
  413. // cJSON *pJson = cJSON_Parse(Global::DeCode_URLUNICODE(pIoContext->m_wsaBuf.buf).c_str());
  414. // if ( pJson )
  415. // {
  416. // tlog.report_type = cJSON_GetObjectItem(pJson, _T("ReportType")) ? cJSON_GetObjectItem(pJson, _T("ReportType"))->valuestring : "";
  417. // tlog.report_data = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valuestring : "";
  418. // if ( _tcscmp(tlog.report_type.c_str(), _T("printLog")) == 0 )
  419. // {
  420. // Global::WritePythonLog(tlog.report_data.c_str());
  421. // Global::g_time = time(NULL);
  422. // Global::g_lastTime = COleDateTime::GetCurrentTime();
  423. // }
  424. // cJSON_Delete(pJson);
  425. // pJson = NULL;
  426. // }
  427. //}
  428. //////////////////////////////////////////////////////////////////////////////////////////////////////
  429. // 2. 这里需要注意,这里传入的这个是ListenSocket上的Context,这个Context我们还需要用于监听下一个连接
  430. // 所以我还得要将ListenSocket上的Context复制出来一份为新连入的Socket新建一个SocketContext
  431. PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
  432. pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
  433. memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
  434. // 参数设置完毕,将这个Socket和完成端口绑定(这也是一个关键步骤)
  435. if( false==this->_AssociateWithIOCP( pNewSocketContext ) )
  436. {
  437. RELEASE( pNewSocketContext );
  438. return false;
  439. }
  440. ///////////////////////////////////////////////////////////////////////////////////////////////////
  441. // 3. 继续,建立其下的IoContext,用于在这个Socket上投递第一个Recv数据请求
  442. PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
  443. pNewIoContext->m_OpType = RECV_POSTED;
  444. pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
  445. // 如果Buffer需要保留,就自己拷贝一份出来
  446. //memcpy( pNewIoContext->m_szBuffer,pIoContext->m_szBuffer,MAX_BUFFER_LEN );
  447. // 绑定完毕之后,就可以开始在这个Socket上投递完成请求了
  448. if( false==this->_PostRecv( pNewIoContext) )
  449. {
  450. pNewSocketContext->RemoveContext( pNewIoContext );
  451. return false;
  452. }
  453. /////////////////////////////////////////////////////////////////////////////////////////////////
  454. // 4. 如果投递成功,那么就把这个有效的客户端信息,加入到ContextList中去(需要统一管理,方便释放资源)
  455. this->_AddToContextList( pNewSocketContext );
  456. ////////////////////////////////////////////////////////////////////////////////////////////////
  457. // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx
  458. pIoContext->ResetBuffer();
  459. return this->_PostAccept( pIoContext );
  460. }
  461. ////////////////////////////////////////////////////////////////////
  462. // 投递接收数据请求
  463. bool CIOCPModel::_PostRecv( PER_IO_CONTEXT* pIoContext )
  464. {
  465. // 初始化变量
  466. DWORD dwFlags = 0;
  467. DWORD dwBytes = 0;
  468. WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
  469. OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
  470. pIoContext->ResetBuffer();
  471. pIoContext->m_OpType = RECV_POSTED;
  472. // 初始化完成后,,投递WSARecv请求
  473. int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
  474. // 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了
  475. if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
  476. {
  477. this->_ShowMessage(_T("投递第一个WSARecv失败!"));
  478. return false;
  479. }
  480. return true;
  481. }
  482. /////////////////////////////////////////////////////////////////
  483. // 在有接收的数据到达的时候,进行处理
  484. bool CIOCPModel::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
  485. {
  486. // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求
  487. SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
  488. //this->_ShowMessage( _T("收到 %s:%d 信息:%s"),inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port), pIoContext->m_wsaBuf.buf );
  489. _RecvProcess(pSocketContext, pIoContext);
  490. //if ( Global::g_bEnableLog )
  491. //{
  492. // // 解析Json字符串;
  493. // Global::TLog tlog;
  494. // cJSON *pJson = cJSON_Parse(Global::DeCode_URLUNICODE(pIoContext->m_wsaBuf.buf).c_str());
  495. // if ( pJson )
  496. // {
  497. // tlog.report_type = cJSON_GetObjectItem(pJson, _T("ReportType")) ? cJSON_GetObjectItem(pJson, _T("ReportType"))->valuestring : "";
  498. // tlog.report_data = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valuestring : "";
  499. // if ( _tcscmp(tlog.report_type.c_str(), _T("printLog")) == 0 )
  500. // {
  501. // Global::WritePythonLog(tlog.report_data.c_str());
  502. // Global::g_time = time(NULL);
  503. // Global::g_lastTime = COleDateTime::GetCurrentTime();
  504. // }
  505. // cJSON_Delete(pJson);
  506. // pJson = NULL;
  507. // }
  508. //
  509. //}
  510. // 然后开始投递下一个WSARecv请求
  511. return _PostRecv( pIoContext );
  512. }
  513. /////////////////////////////////////////////////////
  514. // 将句柄(Socket)绑定到完成端口中
  515. bool CIOCPModel::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext )
  516. {
  517. // 将用于和客户端通信的SOCKET绑定到完成端口中
  518. HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
  519. if (NULL == hTemp)
  520. {
  521. this->_ShowMessage(_T("执行CreateIoCompletionPort()出现错误.错误代码:%d"),GetLastError());
  522. return false;
  523. }
  524. return true;
  525. }
  526. //====================================================================================
  527. //
  528. // ContextList 相关操作
  529. //
  530. //====================================================================================
  531. //////////////////////////////////////////////////////////////
  532. // 将客户端的相关信息存储到数组中
  533. void CIOCPModel::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData )
  534. {
  535. EnterCriticalSection(&m_csContextList);
  536. m_arrayClientContext.Add(pHandleData);
  537. LeaveCriticalSection(&m_csContextList);
  538. }
  539. ////////////////////////////////////////////////////////////////
  540. // 移除某个特定的Context
  541. void CIOCPModel::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext )
  542. {
  543. EnterCriticalSection(&m_csContextList);
  544. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  545. {
  546. if( pSocketContext==m_arrayClientContext.GetAt(i) )
  547. {
  548. RELEASE( pSocketContext );
  549. m_arrayClientContext.RemoveAt(i);
  550. break;
  551. }
  552. }
  553. LeaveCriticalSection(&m_csContextList);
  554. }
  555. ////////////////////////////////////////////////////////////////
  556. // 清空客户端信息
  557. void CIOCPModel::_ClearContextList()
  558. {
  559. EnterCriticalSection(&m_csContextList);
  560. for( int i=0;i<m_arrayClientContext.GetCount();i++ )
  561. {
  562. delete m_arrayClientContext.GetAt(i);
  563. }
  564. m_arrayClientContext.RemoveAll();
  565. LeaveCriticalSection(&m_csContextList);
  566. }
  567. //====================================================================================
  568. //
  569. // 其他辅助函数定义
  570. //
  571. //====================================================================================
  572. ////////////////////////////////////////////////////////////////////
  573. // 获得本机的IP地址
  574. CString CIOCPModel::GetLocalIP()
  575. {
  576. // 获得本机主机名
  577. char hostname[MAX_PATH] = {0};
  578. gethostname(hostname,MAX_PATH);
  579. struct hostent FAR* lpHostEnt = gethostbyname(hostname);
  580. if(lpHostEnt == NULL)
  581. {
  582. return DEFAULT_IP;
  583. }
  584. // 取得IP地址列表中的第一个为返回的IP(因为一台主机可能会绑定多个IP)
  585. LPSTR lpAddr = lpHostEnt->h_addr_list[0];
  586. // 将IP地址转化成字符串形式
  587. struct in_addr inAddr;
  588. memmove(&inAddr,lpAddr,4);
  589. m_strIP = CString( inet_ntoa(inAddr) );
  590. return m_strIP;
  591. }
  592. ///////////////////////////////////////////////////////////////////
  593. // 获得本机中处理器的数量
  594. int CIOCPModel::_GetNoOfProcessors()
  595. {
  596. SYSTEM_INFO si;
  597. GetSystemInfo(&si);
  598. return si.dwNumberOfProcessors;
  599. }
  600. /////////////////////////////////////////////////////////////////////
  601. // 在主界面中显示提示信息
  602. void CIOCPModel::_ShowMessage(const CString szFormat,...) const
  603. {
  604. // 根据传入的参数格式化字符串
  605. CString strMessage;
  606. va_list arglist;
  607. // 处理变长参数
  608. va_start(arglist, szFormat);
  609. strMessage.FormatV(szFormat,arglist);
  610. va_end(arglist);
  611. #if 0
  612. // 在主界面中显示
  613. CMainDlg* pMain = (CMainDlg*)m_pMain;
  614. if( m_pMain!=NULL )
  615. {
  616. pMain->AddInformation(strMessage);
  617. TRACE( strMessage+_T("\n") );
  618. }
  619. #else
  620. //Global::WriteTextLog(strMessage);
  621. #endif
  622. }
  623. #define PAK_LEN sizeof(ProHeader)
  624. void CIOCPModel::_RecvProcess(PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext)
  625. {
  626. // 小于包头;
  627. ProHeader* phead = NULL;
  628. if (pSocketContext->lastData.size() == 0)
  629. {
  630. // 不足包头;
  631. if (PAK_LEN > pIoContext->m_Overlapped.InternalHigh)
  632. {
  633. //OutputDebugString("A:不足包头;\n");
  634. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  635. }
  636. else
  637. {
  638. phead = (ProHeader*)pIoContext->m_wsaBuf.buf;
  639. // 完整的包;
  640. if (phead->len == pIoContext->m_Overlapped.InternalHigh)
  641. {
  642. //OutputDebugString("A:完整的包;\n");
  643. _TaskProcess(pIoContext, (ProPackage*)pIoContext->m_wsaBuf.buf);
  644. }
  645. // 小包;
  646. else if (phead->len > pIoContext->m_Overlapped.InternalHigh)
  647. {
  648. //OutputDebugString("A:小包;\n");
  649. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  650. }
  651. // 超包;
  652. else if (phead->len < pIoContext->m_Overlapped.InternalHigh)
  653. {
  654. //OutputDebugString("A:超包;\n");
  655. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + phead->len, pIoContext->m_Overlapped.InternalHigh - phead->len);
  656. _TaskProcess(pIoContext, (ProPackage*)pIoContext->m_wsaBuf.buf);
  657. }
  658. }
  659. }
  660. else
  661. {
  662. int lastlen = pIoContext->m_Overlapped.InternalHigh;
  663. if (pSocketContext->lastData.size() >= PAK_LEN)
  664. {
  665. phead = (ProHeader*)pSocketContext->lastData.data();
  666. if (phead->len <= pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh)
  667. {
  668. if ( phead->len <= pSocketContext->lastData.size() )
  669. {
  670. //OutputDebugString("C:超包;\n");
  671. // 完整包;
  672. _TaskProcess(pIoContext, (ProPackage*)pSocketContext->lastData.substr(0, phead->len).data());
  673. pSocketContext->lastData = pSocketContext->lastData.substr(phead->len);
  674. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  675. }
  676. else
  677. {
  678. //OutputDebugString("D:超包;\n");
  679. lastlen = pSocketContext->lastData.size() + pIoContext->m_Overlapped.InternalHigh - phead->len;
  680. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh - lastlen);
  681. // 完整包;
  682. _TaskProcess(pIoContext, (ProPackage*)pSocketContext->lastData.data());
  683. // 剩余包;
  684. pSocketContext->lastData.clear();
  685. if (lastlen)
  686. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - lastlen, lastlen);
  687. }
  688. }
  689. else
  690. {
  691. //OutputDebugString("C:仍不足一个包;\n");
  692. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  693. }
  694. }
  695. else
  696. {
  697. // 包头剩余长度;
  698. int diflen = PAK_LEN - pSocketContext->lastData.size();
  699. // 仍不足一个包头;
  700. if ( diflen > pIoContext->m_Overlapped.InternalHigh )
  701. {
  702. //OutputDebugString("B:仍不足一个包头;\n");
  703. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, pIoContext->m_Overlapped.InternalHigh);
  704. }
  705. else
  706. {
  707. // 拼成完整包头;
  708. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf, diflen);
  709. phead = (ProHeader*)pSocketContext->lastData.data();
  710. // 完整包;
  711. if ( phead->len == PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen )
  712. {
  713. //OutputDebugString("B:完整包;\n");
  714. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  715. _TaskProcess(pIoContext, (ProPackage*)pSocketContext->lastData.data());
  716. pSocketContext->lastData.clear();
  717. }
  718. // 小包;
  719. else if ( phead->len > PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  720. {
  721. //OutputDebugString("B:小包;\n");
  722. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, pIoContext->m_Overlapped.InternalHigh - diflen);
  723. }
  724. // 超包;
  725. else if (phead->len < PAK_LEN + pIoContext->m_Overlapped.InternalHigh - diflen)
  726. {
  727. //OutputDebugString("B:超包;\n");
  728. // 组完成包;
  729. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + diflen, phead->len - PAK_LEN);
  730. _TaskProcess(pIoContext, (ProPackage*)pSocketContext->lastData.data());
  731. pSocketContext->lastData.clear();
  732. int last = pIoContext->m_Overlapped.InternalHigh - diflen - phead->len + PAK_LEN;
  733. if (last)
  734. {
  735. pSocketContext->lastData.append(pIoContext->m_wsaBuf.buf + pIoContext->m_Overlapped.InternalHigh - last, last);
  736. }
  737. }
  738. }
  739. }
  740. }
  741. }
  742. void CIOCPModel::_TaskProcess(PER_IO_CONTEXT* pIoContext, ProPackage* pak)
  743. {
  744. if (pak->header.version == 0xAA)
  745. {
  746. _DeviceProc(pIoContext, pak);
  747. }
  748. else if (pak->header.version == 0xAB )
  749. {
  750. _CaptureProc(pIoContext, pak);
  751. }
  752. else if (pak->header.version == 0xAC)
  753. {
  754. // 测试精灵;
  755. _TestWizardProc(pIoContext, pak);
  756. }
  757. else if (pak->header.version == 0xAD)
  758. {
  759. // TV串口;
  760. _TVWatchProc(pIoContext, pak);
  761. }
  762. }
  763. void CIOCPModel::_DeviceProc(PER_IO_CONTEXT* pIoContext, ProPackage* pak)
  764. {
  765. cJSON* pJson = cJSON_Parse((const char*)&pak->buf);
  766. if ( pJson == NULL )
  767. {
  768. return;
  769. }
  770. RequesJson reqj;
  771. reqj.device_id = cJSON_GetObjectItem(pJson, "device_id") ? cJSON_GetObjectItem(pJson, "device_id")->valueint : 0;
  772. reqj.device_name = cJSON_GetObjectItem(pJson, "device_name") ? cJSON_GetObjectItem(pJson, "device_name")->valuestring : "";
  773. reqj.device_cmd = cJSON_GetObjectItem(pJson, "device_cmd") ? cJSON_GetObjectItem(pJson, "device_cmd")->valuestring : "";
  774. reqj.device_timeout = cJSON_GetObjectItem(pJson, "device_timeout") ? cJSON_GetObjectItem(pJson, "device_timeout")->valueint : 300;
  775. if ( pJson )
  776. {
  777. cJSON_Delete(pJson);
  778. pJson = NULL;
  779. }
  780. auto iter = g_dmap.find(reqj.device_id);
  781. if ( iter == g_dmap.end())
  782. {
  783. return;
  784. }
  785. // 向串口发送指令;
  786. auto dev = iter->second;
  787. std::string readdata = dev->SendCommond(reqj.device_cmd);
  788. ResponseJson repj;
  789. repj.device_id = reqj.device_id;
  790. repj.device_name = reqj.device_name;
  791. repj.device_cmd_result = readdata;
  792. pJson = cJSON_CreateObject();
  793. cJSON_AddNumberToObject(pJson, "device_id", repj.device_id);
  794. cJSON_AddStringToObject(pJson, "device_name", repj.device_name.c_str());
  795. cJSON_AddStringToObject(pJson, "device_cmd_result", repj.device_cmd_result.c_str());
  796. char* pjdata = cJSON_Print(pJson);
  797. byte* sdata = new byte[strlen(pjdata) + PAK_LEN];
  798. ProPackage* reponse_pak = (ProPackage*)sdata;
  799. reponse_pak->header.version = 0xAB;
  800. reponse_pak->header.len = strlen(pjdata) + PAK_LEN;
  801. memcpy(reponse_pak->buf, pjdata, strlen(pjdata));
  802. if (pjdata)
  803. free(pjdata);
  804. pjdata = NULL;
  805. cJSON_Delete(pJson);
  806. int ret = send(pIoContext->m_sockAccept, (const char*)sdata, reponse_pak->header.len, 0);
  807. delete[]sdata;
  808. sdata = NULL;
  809. if ( ret == -1 )
  810. {
  811. DWORD dwEr = GetLastError();
  812. }
  813. }
  814. void CIOCPModel::_CaptureProc(PER_IO_CONTEXT* pIoContext, ProPackage* pak)
  815. {
  816. }
  817. void CIOCPModel::_TestWizardProc(PER_IO_CONTEXT* pIoContext, ProPackage* pak)
  818. {
  819. bool bRet = false;
  820. TCHAR szType[8] = {0};
  821. TCHAR szCmd[256] = {0};
  822. int nRet = sscanf_s((char*)&pak->buf, _T("%[^>]>%s"), szType, 8, szCmd, 256);
  823. if (nRet == 2) {
  824. if ( _tcsicmp(szType, _T("cmd")) == 0 && TW::SendCmd )
  825. bRet = TW::SendCmd((char*)&pak->buf[4]);
  826. if (_tcsicmp(szType, _T("ir")) == 0 && TW::SendKey)
  827. bRet = TW::SendKey((char*)&pak->buf[3]);
  828. if (_tcsicmp(szType, _T("usb")) == 0 && TW::SendSwitch)
  829. bRet = TW::SendSwitch(_tstoi((char*)&pak->buf[4]));
  830. if (_tcsicmp(szType, _T("loadxml")) == 0 && TW::LoadXml)
  831. bRet = !TW::LoadXml(szCmd);
  832. if (_tcsicmp(szType, _T("getkeys")) == 0 && TW::GetSignalsName)
  833. {
  834. std::string keys = TW::GetSignalsName();
  835. byte* sdata = new byte[keys.size() + PAK_LEN];
  836. ProPackage* reponse_pak = (ProPackage*)sdata;
  837. reponse_pak->header.version = 0xAC;
  838. reponse_pak->header.len = keys.size() + PAK_LEN;
  839. memcpy(reponse_pak->buf, keys.c_str(), keys.size());
  840. send(pIoContext->m_sockAccept, (const char*)sdata, reponse_pak->header.len, 0);
  841. if (sdata)
  842. delete[] sdata;
  843. sdata = NULL;
  844. return;
  845. }
  846. // 电视串口;
  847. if (_tcsicmp(szType, _T("tv")) == 0 && USBUPGRADE::RTK_USBUpgrade)
  848. {
  849. bRet = TRUE;
  850. if (pak->buf[4] == 0)
  851. USBUPGRADE::RTKUpgrade(0x09); // VK_TAB
  852. else
  853. USBUPGRADE::RTKUpgrade(0x1B); // VK_ESC
  854. }
  855. }
  856. char* retdata = bRet ? "1" : "0";
  857. byte* sdata = new byte[strlen(retdata) + PAK_LEN];
  858. ProPackage* reponse_pak = (ProPackage*)sdata;
  859. reponse_pak->header.version = 0xAC;
  860. reponse_pak->header.len = strlen(retdata) + PAK_LEN;
  861. memcpy(reponse_pak->buf, retdata, strlen(retdata));
  862. int ret = send(pIoContext->m_sockAccept, (const char*)sdata, reponse_pak->header.len, 0);
  863. if (sdata)
  864. delete[] sdata;
  865. sdata = NULL;
  866. }
  867. void CIOCPModel::_TVWatchProc(PER_IO_CONTEXT* pIoContext, ProPackage* pak)
  868. {
  869. // 串口启动时,需要同时启动监听功能;
  870. // SendCommand:// 指令,循环次数,每次循环Sleep时间;
  871. // WatchWord // 要监听的关键字,监听时长;
  872. // RTKUpgrade // 升级方式:VK_TAB、VK_ESC;
  873. }
  874. /////////////////////////////////////////////////////////////////////
  875. // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常
  876. // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值
  877. // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
  878. bool CIOCPModel::_IsSocketAlive(SOCKET s)
  879. {
  880. int nByteSent=send(s,"",0,0);
  881. if (-1 == nByteSent) return false;
  882. return true;
  883. }
  884. ///////////////////////////////////////////////////////////////////
  885. // 显示并处理完成端口上的错误
  886. bool CIOCPModel::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr )
  887. {
  888. // 如果是超时了,就再继续等吧
  889. if(WAIT_TIMEOUT == dwErr)
  890. {
  891. // 确认客户端是否还活着...
  892. if( !_IsSocketAlive( pContext->m_Socket) )
  893. {
  894. this->_ShowMessage( _T("检测到客户端异常退出!") );
  895. this->_RemoveContext( pContext );
  896. return true;
  897. }
  898. else
  899. {
  900. this->_ShowMessage( _T("网络操作超时!重试中...") );
  901. return true;
  902. }
  903. }
  904. // 可能是客户端异常退出了
  905. else if( ERROR_NETNAME_DELETED==dwErr )
  906. {
  907. this->_ShowMessage( _T("检测到客户端异常退出!") );
  908. this->_RemoveContext( pContext );
  909. return true;
  910. }
  911. else
  912. {
  913. this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
  914. return false;
  915. }
  916. }