|
- #include "StdAfx.h"
- #include "IOCPModel.h"
- #define WORKER_THREADS_PER_PROCESSOR 2
- #define MAX_POST_ACCEPT 10
- #define EXIT_CODE NULL
- #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
- #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
- #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
- CIOCPModel::CIOCPModel(void):
- m_nThreads(0),
- m_hShutdownEvent(NULL),
- m_hIOCompletionPort(NULL),
- m_phWorkerThreads(NULL),
- m_strIP(DEFAULT_IP),
- m_nPort(DEFAULT_PORT),
- m_pMain(NULL),
- m_lpfnAcceptEx( NULL ),
- m_pListenContext( NULL )
- {
- }
- CIOCPModel::~CIOCPModel(void)
- {
-
- this->Stop();
- }
- DWORD WINAPI CIOCPModel::_WorkerThread(LPVOID lpParam)
- {
- THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
- CIOCPModel* pIOCPModel = (CIOCPModel*)pParam->pIOCPModel;
- int nThreadNo = (int)pParam->nThreadNo;
- pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
- OVERLAPPED *pOverlapped = NULL;
- PER_SOCKET_CONTEXT *pSocketContext = NULL;
- DWORD dwBytesTransfered = 0;
-
- while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
- {
- BOOL bReturn = GetQueuedCompletionStatus(
- pIOCPModel->m_hIOCompletionPort,
- &dwBytesTransfered,
- (PULONG_PTR)&pSocketContext,
- &pOverlapped,
- INFINITE);
-
- if ( EXIT_CODE==(DWORD)pSocketContext )
- {
- break;
- }
-
- if( !bReturn )
- {
- DWORD dwErr = GetLastError();
-
- if( !pIOCPModel->HandleError( pSocketContext,dwErr ) )
- {
- break;
- }
- continue;
- }
- else
- {
-
- PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
-
- if((0 == dwBytesTransfered) && ( RECV_POSTED==pIoContext->m_OpType || SEND_POSTED==pIoContext->m_OpType))
- {
- pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),inet_ntoa(pSocketContext->m_ClientAddr.sin_addr), ntohs(pSocketContext->m_ClientAddr.sin_port) );
-
- pIOCPModel->_RemoveContext( pSocketContext );
- continue;
- }
- else
- {
- switch( pIoContext->m_OpType )
- {
-
- case ACCEPT_POSTED:
- {
-
- pIOCPModel->_DoAccpet( pSocketContext, pIoContext );
-
- }
- break;
-
- case RECV_POSTED:
- {
-
- pIOCPModel->_DoRecv( pSocketContext,pIoContext );
- }
- break;
-
-
- case SEND_POSTED:
- {
- }
- break;
- default:
-
- TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
- break;
- }
- }
- }
- }
- TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
-
- RELEASE(lpParam);
- return 0;
- }
- bool CIOCPModel::LoadSocketLib()
- {
- WSADATA wsaData;
- int nResult;
- nResult = WSAStartup(MAKEWORD(2,2), &wsaData);
-
- if (NO_ERROR != nResult)
- {
- this->_ShowMessage(_T("初始化WinSock 2.2失败!\n"));
- return false;
- }
- return true;
- }
- bool CIOCPModel::Start(unsigned int port)
- {
-
- InitializeCriticalSection(&m_csContextList);
-
- m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
-
- if (false == _InitializeIOCP())
- {
- this->_ShowMessage(_T("初始化IOCP失败!\n"));
- return false;
- }
- else
- {
- this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
- }
-
- if( false==_InitializeListenSocket(port) )
- {
- this->_ShowMessage(_T("Listen Socket初始化失败!\n"));
- this->_DeInitialize();
- return false;
- }
- else
- {
- this->_ShowMessage(_T("Listen Socket初始化完毕."));
- }
- this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
- return true;
- }
- void CIOCPModel::Stop()
- {
- if( m_pListenContext!=NULL && m_pListenContext->m_Socket!=INVALID_SOCKET )
- {
-
- SetEvent(m_hShutdownEvent);
- for (int i = 0; i < m_nThreads; i++)
- {
-
- PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
- }
-
- WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
-
- this->_ClearContextList();
-
- this->_DeInitialize();
- this->_ShowMessage(_T("停止监听\n"));
- }
- }
- bool CIOCPModel::_InitializeIOCP()
- {
-
- m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
- if ( NULL == m_hIOCompletionPort)
- {
- this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
- return false;
- }
-
- m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
-
-
- m_phWorkerThreads = new HANDLE[m_nThreads];
-
-
- DWORD nThreadID;
- for (int i = 0; i < m_nThreads; i++)
- {
- THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
- pThreadParams->pIOCPModel = this;
- pThreadParams->nThreadNo = i+1;
- m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
- }
- TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
- return true;
- }
- bool CIOCPModel::_InitializeListenSocket(unsigned int port)
- {
-
- GUID GuidAcceptEx = WSAID_ACCEPTEX;
- GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
-
- struct sockaddr_in ServerAddress;
-
- m_pListenContext = new PER_SOCKET_CONTEXT;
-
- m_pListenContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- if (INVALID_SOCKET == m_pListenContext->m_Socket)
- {
- this->_ShowMessage(_T("初始化Socket失败,错误代码: %d.\n"), WSAGetLastError());
- return false;
- }
- else
- {
- TRACE("WSASocket() 完成.\n");
- }
-
- if( NULL== CreateIoCompletionPort( (HANDLE)m_pListenContext->m_Socket, m_hIOCompletionPort,(DWORD)m_pListenContext, 0))
- {
- this->_ShowMessage(_T("绑定 Listen Socket至完成端口失败!错误代码: %d/n"), WSAGetLastError());
- RELEASE_SOCKET( m_pListenContext->m_Socket );
- return false;
- }
- else
- {
- TRACE("Listen Socket绑定完成端口 完成.\n");
- }
-
- ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
- ServerAddress.sin_family = AF_INET;
-
- ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
-
- ServerAddress.sin_port = htons(port);
-
- if (SOCKET_ERROR == bind(m_pListenContext->m_Socket, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
- {
- this->_ShowMessage(_T("bind()函数执行错误! 错误代码: %d/n"), WSAGetLastError());
- return false;
- }
- else
- {
- TRACE("bind() 完成.\n");
- }
-
- if (SOCKET_ERROR == listen(m_pListenContext->m_Socket,SOMAXCONN))
- {
- this->_ShowMessage(_T("Listen()函数执行出现错误; 错误代码: %d/n"), WSAGetLastError());
- return false;
- }
- else
- {
- TRACE("Listen() 完成.\n");
- }
-
-
-
- DWORD dwBytes = 0;
- if(SOCKET_ERROR == WSAIoctl(
- m_pListenContext->m_Socket,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &GuidAcceptEx,
- sizeof(GuidAcceptEx),
- &m_lpfnAcceptEx,
- sizeof(m_lpfnAcceptEx),
- &dwBytes,
- NULL,
- NULL))
- {
- this->_ShowMessage(_T("WSAIoctl 未能获取AcceptEx函数指针。错误代码: %d\n"), WSAGetLastError());
- this->_DeInitialize();
- return false;
- }
-
- if(SOCKET_ERROR == WSAIoctl(
- m_pListenContext->m_Socket,
- SIO_GET_EXTENSION_FUNCTION_POINTER,
- &GuidGetAcceptExSockAddrs,
- sizeof(GuidGetAcceptExSockAddrs),
- &m_lpfnGetAcceptExSockAddrs,
- sizeof(m_lpfnGetAcceptExSockAddrs),
- &dwBytes,
- NULL,
- NULL))
- {
- this->_ShowMessage(_T("WSAIoctl 未能获取GuidGetAcceptExSockAddrs函数指针。错误代码: %d\n"), WSAGetLastError());
- this->_DeInitialize();
- return false;
- }
-
- for( int i=0;i<MAX_POST_ACCEPT;i++ )
- {
-
- PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
- if( false==this->_PostAccept( pAcceptIoContext ) )
- {
- m_pListenContext->RemoveContext(pAcceptIoContext);
- return false;
- }
- }
- this->_ShowMessage( _T("投递 %d 个AcceptEx请求完毕"),MAX_POST_ACCEPT );
- return true;
- }
- void CIOCPModel::_DeInitialize()
- {
-
- DeleteCriticalSection(&m_csContextList);
-
- RELEASE_HANDLE(m_hShutdownEvent);
-
- for( int i=0;i<m_nThreads;i++ )
- {
- RELEASE_HANDLE(m_phWorkerThreads[i]);
- }
-
- RELEASE(m_phWorkerThreads);
-
- RELEASE_HANDLE(m_hIOCompletionPort);
-
- RELEASE(m_pListenContext);
- this->_ShowMessage(_T("释放资源完毕.\n"));
- }
- bool CIOCPModel::_PostAccept( PER_IO_CONTEXT* pAcceptIoContext )
- {
- ASSERT( INVALID_SOCKET!=m_pListenContext->m_Socket );
-
- DWORD dwBytes = 0;
- pAcceptIoContext->m_OpType = ACCEPT_POSTED;
- WSABUF *p_wbuf = &pAcceptIoContext->m_wsaBuf;
- OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;
-
-
- pAcceptIoContext->m_sockAccept = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
- if( INVALID_SOCKET==pAcceptIoContext->m_sockAccept )
- {
- _ShowMessage(_T("创建用于Accept的Socket失败!错误代码: %d"), WSAGetLastError());
- return false;
- }
-
- if(FALSE == m_lpfnAcceptEx( m_pListenContext->m_Socket, pAcceptIoContext->m_sockAccept, p_wbuf->buf, p_wbuf->len - ((sizeof(SOCKADDR_IN)+16)*2),
- sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, &dwBytes, p_ol))
- {
- if(WSA_IO_PENDING != WSAGetLastError())
- {
- _ShowMessage(_T("投递 AcceptEx 请求失败,错误代码: %d"), WSAGetLastError());
- return false;
- }
- }
- return true;
- }
- bool CIOCPModel::_DoAccpet( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
- {
- SOCKADDR_IN* ClientAddr = NULL;
- SOCKADDR_IN* LocalAddr = NULL;
- int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);
-
-
-
-
- this->m_lpfnGetAcceptExSockAddrs(pIoContext->m_wsaBuf.buf, pIoContext->m_wsaBuf.len - ((sizeof(SOCKADDR_IN)+16)*2),
- sizeof(SOCKADDR_IN)+16, sizeof(SOCKADDR_IN)+16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&ClientAddr, &remoteLen);
- this->_ShowMessage( _T("客户端 %s:%d 连入."), inet_ntoa(ClientAddr->sin_addr), ntohs(ClientAddr->sin_port) );
-
- OnEventWriteLog(pIoContext);
-
-
-
- PER_SOCKET_CONTEXT* pNewSocketContext = new PER_SOCKET_CONTEXT;
- pNewSocketContext->m_Socket = pIoContext->m_sockAccept;
- memcpy(&(pNewSocketContext->m_ClientAddr), ClientAddr, sizeof(SOCKADDR_IN));
-
- if( false==this->_AssociateWithIOCP( pNewSocketContext ) )
- {
- RELEASE( pNewSocketContext );
- return false;
- }
-
-
- PER_IO_CONTEXT* pNewIoContext = pNewSocketContext->GetNewIoContext();
- pNewIoContext->m_OpType = RECV_POSTED;
- pNewIoContext->m_sockAccept = pNewSocketContext->m_Socket;
-
-
-
- if( false==this->_PostRecv( pNewIoContext) )
- {
- pNewSocketContext->RemoveContext( pNewIoContext );
- return false;
- }
-
-
- this->_AddToContextList( pNewSocketContext );
-
-
- pIoContext->ResetBuffer();
- return this->_PostAccept( pIoContext );
- }
- bool CIOCPModel::_PostRecv( PER_IO_CONTEXT* pIoContext )
- {
-
- DWORD dwFlags = 0;
- DWORD dwBytes = 0;
- WSABUF *p_wbuf = &pIoContext->m_wsaBuf;
- OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
- pIoContext->ResetBuffer();
- pIoContext->m_OpType = RECV_POSTED;
-
- int nBytesRecv = WSARecv( pIoContext->m_sockAccept, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL );
-
- if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
- {
- this->_ShowMessage(_T("投递第一个WSARecv失败!"));
- return false;
- }
- return true;
- }
- bool CIOCPModel::_DoRecv( PER_SOCKET_CONTEXT* pSocketContext, PER_IO_CONTEXT* pIoContext )
- {
-
- SOCKADDR_IN* ClientAddr = &pSocketContext->m_ClientAddr;
-
-
- OnEventWriteLog(pIoContext);
-
- return _PostRecv( pIoContext );
- }
- bool CIOCPModel::_AssociateWithIOCP( PER_SOCKET_CONTEXT *pContext )
- {
-
- HANDLE hTemp = CreateIoCompletionPort((HANDLE)pContext->m_Socket, m_hIOCompletionPort, (DWORD)pContext, 0);
- if (NULL == hTemp)
- {
- this->_ShowMessage(_T("执行CreateIoCompletionPort()出现错误.错误代码:%d"),GetLastError());
- return false;
- }
- return true;
- }
- void CIOCPModel::_AddToContextList( PER_SOCKET_CONTEXT *pHandleData )
- {
- EnterCriticalSection(&m_csContextList);
- m_arrayClientContext.Add(pHandleData);
-
- LeaveCriticalSection(&m_csContextList);
- }
- void CIOCPModel::_RemoveContext( PER_SOCKET_CONTEXT *pSocketContext )
- {
- EnterCriticalSection(&m_csContextList);
- for( int i=0;i<m_arrayClientContext.GetCount();i++ )
- {
- if( pSocketContext==m_arrayClientContext.GetAt(i) )
- {
- RELEASE( pSocketContext );
- m_arrayClientContext.RemoveAt(i);
- break;
- }
- }
- LeaveCriticalSection(&m_csContextList);
- }
- void CIOCPModel::_ClearContextList()
- {
- EnterCriticalSection(&m_csContextList);
- for( int i=0;i<m_arrayClientContext.GetCount();i++ )
- {
- delete m_arrayClientContext.GetAt(i);
- }
- m_arrayClientContext.RemoveAll();
- LeaveCriticalSection(&m_csContextList);
- }
- CString CIOCPModel::GetLocalIP()
- {
-
- char hostname[MAX_PATH] = {0};
- gethostname(hostname,MAX_PATH);
- struct hostent FAR* lpHostEnt = gethostbyname(hostname);
- if(lpHostEnt == NULL)
- {
- return DEFAULT_IP;
- }
-
- LPSTR lpAddr = lpHostEnt->h_addr_list[0];
-
- struct in_addr inAddr;
- memmove(&inAddr,lpAddr,4);
- m_strIP = CString( inet_ntoa(inAddr) );
- return m_strIP;
- }
- int CIOCPModel::_GetNoOfProcessors()
- {
- SYSTEM_INFO si;
- GetSystemInfo(&si);
- return si.dwNumberOfProcessors;
- }
- void CIOCPModel::_ShowMessage(const CString szFormat,...) const
- {
-
- CString strMessage;
- va_list arglist;
-
- va_start(arglist, szFormat);
- strMessage.FormatV(szFormat,arglist);
- va_end(arglist);
- #if 0
-
- CMainDlg* pMain = (CMainDlg*)m_pMain;
- if( m_pMain!=NULL )
- {
- pMain->AddInformation(strMessage);
- TRACE( strMessage+_T("\n") );
- }
- #else
- Global::WriteTextLog(strMessage);
- #endif
- }
- void CIOCPModel::OnEventWriteLog(PER_IO_CONTEXT* pIoContext)
- {
- if ( pIoContext == NULL)
- return;
-
- cJSON *pJson = cJSON_Parse(Global::DeCode_URLUNICODE(pIoContext->m_wsaBuf.buf).c_str());
- if ( pJson )
- {
- std::string report_type = cJSON_GetObjectItem(pJson, _T("ReportType")) ? cJSON_GetObjectItem(pJson, _T("ReportType"))->valuestring : "";
- if ( _tcsicmp(report_type.c_str(), _T("printLog")) == 0 )
- {
- if ( Global::g_bEnableLog )
- {
- std::string report_data = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valuestring : "";
- Global::WritePythonLog(report_data.c_str());
- Global::g_time = time(NULL);
- Global::g_lastTime = COleDateTime::GetCurrentTime();
- }
- }
- else if (_tcsicmp(report_type.c_str(), _T("shutdown")) == 0 || _tcsicmp(report_type.c_str(), _T("reboot")) == 0)
- {
-
- Global::g_notify.notify = true;
- Global::g_notify.report_type = report_type;
- Global::g_notify.datetime = cJSON_GetObjectItem(pJson, _T("prinMsg")) ? cJSON_GetObjectItem(pJson, _T("prinMsg"))->valueint : 0;
- #ifdef _DEBUG
- TRACE2("通知时间%ld, 通知类型%s\n\n", Global::g_notify.datetime, Global::g_notify.report_type.c_str());
- #endif
- }
- cJSON_Delete(pJson);
- pJson = NULL;
- }
- }
- bool CIOCPModel::_IsSocketAlive(SOCKET s)
- {
- int nByteSent=send(s,"",0,0);
- if (-1 == nByteSent) return false;
- return true;
- }
- bool CIOCPModel::HandleError( PER_SOCKET_CONTEXT *pContext,const DWORD& dwErr )
- {
-
- if(WAIT_TIMEOUT == dwErr)
- {
-
- if( !_IsSocketAlive( pContext->m_Socket) )
- {
- this->_ShowMessage( _T("检测到客户端异常退出!") );
- this->_RemoveContext( pContext );
- return true;
- }
- else
- {
- this->_ShowMessage( _T("网络操作超时!重试中...") );
- return true;
- }
- }
-
- else if( ERROR_NETNAME_DELETED==dwErr )
- {
- this->_ShowMessage( _T("检测到客户端异常退出!") );
- this->_RemoveContext( pContext );
- return true;
- }
- else
- {
- this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
- return false;
- }
- }
|