123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- #include "StdAfx.h"
- #include "PipeService.h"
- //#include "MainDlg.h"
- #define PIPENAME _T("\\\\.\\pipe\\Assist")
- // 每一个处理器上产生多少个线程(为了最大限度的提升服务器性能,详见配套文档)
- #define WORKER_THREADS_PER_PROCESSOR 2
- // 同时投递的Accept请求的数量(这个要根据实际的情况灵活设置)
- #define MAX_POST_ACCEPT 10
- // 传递给Worker线程的退出信号
- #define EXIT_CODE NULL
- // 释放指针和句柄资源的宏
- // 释放指针宏
- #define RELEASE(x) {if(x != NULL ){delete x;x=NULL;}}
- // 释放句柄宏
- #define RELEASE_HANDLE(x) {if(x != NULL && x!=INVALID_HANDLE_VALUE){ CloseHandle(x);x = NULL;}}
- // 释放Socket宏
- #define RELEASE_SOCKET(x) {if(x !=INVALID_SOCKET) { closesocket(x);x=INVALID_SOCKET;}}
- CIOCPPipe::CIOCPPipe(void)
- {
- m_nThreads = 0;
- m_hShutdownEvent = NULL;
- m_hIOCompletionPort = NULL;
- m_phWorkerThreads= NULL;
- m_pMain = NULL;
- }
- CIOCPPipe::~CIOCPPipe(void)
- {
- // 确保资源彻底释放
- this->Stop();
- }
- DWORD WINAPI CIOCPPipe::_WorkerThread(LPVOID lpParam)
- {
- THREADPARAMS_WORKER* pParam = (THREADPARAMS_WORKER*)lpParam;
- CIOCPPipe* pIOCPModel = (CIOCPPipe*)pParam->pIOCPModel;
- int nThreadNo = (int)pParam->nThreadNo;
- pIOCPModel->_ShowMessage(_T("工作者线程启动,ID: %d."),nThreadNo);
- OVERLAPPED *pOverlapped = NULL;
- ULONG_PTR pCompletionKey = NULL;
- DWORD dwBytesTransfered = 0;
- // 循环处理请求,知道接收到Shutdown信息为止
- while (WAIT_OBJECT_0 != WaitForSingleObject(pIOCPModel->m_hShutdownEvent, 0))
- {
- BOOL bReturn = GetQueuedCompletionStatus(
- pIOCPModel->m_hIOCompletionPort,
- &dwBytesTransfered,
- &pCompletionKey,
- &pOverlapped,
- INFINITE);
- dprintf(_T("IOCP有消息"));
- // 如果收到的是退出标志,则直接退出
- if ( EXIT_CODE==(DWORD)pCompletionKey )
- {
- //break;
- continue;
- }
- // 判断是否出现了错误
- if( !bReturn )
- {
- DWORD dwErr = GetLastError();
- // 显示一下提示信息
- // 读取传入的参数
- PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
- if( !pIOCPModel->HandleError( pIoContext,dwErr ) )
- {
- //break;
- continue;
- }
- continue;
- }
- else
- {
- // 读取传入的参数
- PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);
- // 判断是否有客户端断开了
- if((0 == dwBytesTransfered) && ( OP_RECV==pIoContext->m_OpType || OP_SEND==pIoContext->m_OpType))
- {
- pIOCPModel->_ShowMessage( _T("客户端 %s:%d 断开连接."),pIoContext->chClientName, pIoContext->dwProcessId );
- // 释放掉对应的资源
- pIOCPModel->_RemoveContext( pIoContext );
- continue;
- }
- else
- {
- switch( pIoContext->m_OpType )
- {
- case OP_ACCEPT:
- {
- pIOCPModel->_DoAccpet( pIoContext );
- dprintf(_T("客户端连接成功"));
- }
- break;
- case OP_RECV:
- {
- //pIOCPModel->_DoRecv( pSocketContext,pIoContext );
- }
- break;
- case OP_SEND:
- break;
- default:
- // 不应该执行到这里
- TRACE(_T("_WorkThread中的 pIoContext->m_OpType 参数异常.\n"));
- break;
- }
- }
- }
- }
- TRACE(_T("工作者线程 %d 号退出.\n"),nThreadNo);
- // 释放线程参数
- RELEASE(lpParam);
- return 0;
- }
- bool CIOCPPipe::Start()
- {
- // 初始化线程互斥量
- InitializeCriticalSection(&m_csContextList);
- // 建立系统退出的事件通知
- m_hShutdownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
- // 初始化IOCP
- if (false == _InitializeIOCP())
- {
- this->_ShowMessage(_T("初始化IOCP失败!\n"));
- return false;
- }
- else
- {
- this->_ShowMessage(_T("\nIOCP初始化完毕\n."));
- }
- this->_ShowMessage(_T("系统准备就绪,等候连接....\n"));
- return true;
- }
- void CIOCPPipe::Stop()
- {
- // 激活关闭消息通知
- SetEvent(m_hShutdownEvent);
- for (int i = 0; i < m_nThreads; i++)
- {
- // 通知所有的完成端口操作退出
- PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);
- }
- // 等待所有的客户端资源退出
- WaitForMultipleObjects(m_nThreads, m_phWorkerThreads, TRUE, INFINITE);
- // 清除客户端列表信息
- this->_ClearContextList();
- // 释放其他资源
- this->_DeInitialize();
- this->_ShowMessage(_T("停止监听\n"));
- }
- bool CIOCPPipe::_InitializeIOCP()
- {
- // 建立第一个完成端口
- m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
- if ( NULL == m_hIOCompletionPort)
- {
- this->_ShowMessage(_T("建立完成端口失败!错误代码: %d!\n"), WSAGetLastError());
- return false;
- }
- // 根据本机中的处理器数量,建立对应的线程数
- #ifdef _DEBUG
- m_nThreads = 1;
- #else
- m_nThreads = WORKER_THREADS_PER_PROCESSOR * _GetNoOfProcessors();
- #endif
- // 为工作者线程初始化句柄
- m_phWorkerThreads = new HANDLE[m_nThreads];
-
- // 根据计算出来的数量建立工作者线程
- DWORD nThreadID;
- for (int i = 0; i < m_nThreads; i++)
- {
- THREADPARAMS_WORKER* pThreadParams = new THREADPARAMS_WORKER;
- pThreadParams->pIOCPModel = this;
- pThreadParams->nThreadNo = i+1;
- m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, (void *)pThreadParams, 0, &nThreadID);
- }
- _PostAccept();
- TRACE(" 建立 _WorkerThread %d 个.\n", m_nThreads );
- return true;
- }
- void CIOCPPipe::_DeInitialize()
- {
- // 删除客户端列表的互斥量
- DeleteCriticalSection(&m_csContextList);
- // 关闭系统退出事件句柄
- RELEASE_HANDLE(m_hShutdownEvent);
- // 释放工作者线程句柄指针
- for( int i=0;i<m_nThreads;i++ )
- {
- RELEASE_HANDLE(m_phWorkerThreads[i]);
- }
-
- RELEASE(m_phWorkerThreads);
- // 关闭IOCP句柄
- RELEASE_HANDLE(m_hIOCompletionPort);
- _ShowMessage(_T("释放资源完毕.\n"));
- }
- bool CIOCPPipe::_PostAccept()
- {
- HANDLE hPipeAccept = CreateNamedPipe(
- PIPENAME, // pipe name
- PIPE_ACCESS_DUPLEX | // read/write access
- FILE_FLAG_OVERLAPPED, // overlapped mode
- PIPE_TYPE_MESSAGE | // message-type pipe
- PIPE_READMODE_MESSAGE | // message-read mode
- PIPE_WAIT, // blocking mode
- PIPE_UNLIMITED_INSTANCES, // number of instances
- 0, // output buffer size
- 0, // input buffer size
- NMPWAIT_WAIT_FOREVER, // client time-out
- NULL); // default security attributes
- if ( hPipeAccept == INVALID_HANDLE_VALUE )
- return false;
- HANDLE hIOCP = CreateIoCompletionPort(hPipeAccept, m_hIOCompletionPort, 10001, 0);
- if ( hIOCP == NULL )
- {
- CloseHandle(hPipeAccept);
- return false;
- }
- PER_IO_CONTEXT *pMyOverlapped = new PER_IO_CONTEXT;
- memset(pMyOverlapped, 0, sizeof(PER_IO_CONTEXT));
- pMyOverlapped->m_OpType = OP_ACCEPT;
- pMyOverlapped->m_PipeAccept = hPipeAccept;
- BOOL bRet = ConnectNamedPipe(hPipeAccept, (LPOVERLAPPED)pMyOverlapped);
- if( bRet )
- {// 重叠IO或完成端口,返回的是Fase;
- CloseHandle(hPipeAccept);
- CloseHandle(hIOCP);
- delete pMyOverlapped;
- pMyOverlapped = NULL;
- _ShowMessage(_T("创建用于Accept的Pipe失败!错误代码: %d"), GetLastError());
- return false;
- }
- return true;
- }
- bool CIOCPPipe::_DoAccpet( PER_IO_CONTEXT* pIoContext )
- {
- // 客户端连接后,发出指令:获取客户端注入进程信息;
- pIoContext->m_OpType = OP_SEND;
- _stprintf_s(pIoContext->chReply, _T("%s"), _T("1001\n"));
- pIoContext->cbToWrite = sizeof(pIoContext->chReply);
-
- BOOL fWrite = WriteFile(
- pIoContext->m_PipeAccept,
- pIoContext->chReply,
- pIoContext->cbToWrite,
- &pIoContext->cbToWrite,
- (LPOVERLAPPED)pIoContext);
-
- if (!fWrite) {
- dprintf("WriteFie: dwWritten=%d,GLE=%d\n", pIoContext->cbToWrite, GetLastError());
- }
- dprintf(_T("向客户端发消息"));
- ////////////////////////////////////////////////////////////////////////////////////////////////
- // 5. 使用完毕之后,把Listen Socket的那个IoContext重置,然后准备投递新的AcceptEx
- pIoContext->ResetBuffer();
- return this->_PostAccept( );
- }
- bool CIOCPPipe::_PostRecv( PER_IO_CONTEXT* pIoContext )
- {
- // 初始化变量
- DWORD dwFlags = 0;
- DWORD dwBytes = 0;
- OVERLAPPED *p_ol = &pIoContext->m_Overlapped;
- pIoContext->ResetBuffer();
- pIoContext->m_OpType = OP_RECV;
- return true;
- }
- bool CIOCPPipe::_DoRecv( PER_IO_CONTEXT* pIoContext )
- {
- // 先把上一次的数据显示出现,然后就重置状态,发出下一个Recv请求
- // 然后开始投递下一个WSARecv请求
- return _PostRecv( pIoContext );
- }
- bool CIOCPPipe::_AssociateWithIOCP( PER_IO_CONTEXT* pIoContext )
- {
- // 将用于和客户端通信的SOCKET绑定到完成端口中
-
- return true;
- }
- void CIOCPPipe::_AddToContextList( PER_IO_CONTEXT* pIoContext )
- {
- EnterCriticalSection(&m_csContextList);
- //m_arrayClientContext.Add(pHandleData);
-
- LeaveCriticalSection(&m_csContextList);
- }
- void CIOCPPipe::_RemoveContext( PER_IO_CONTEXT* pIoContext )
- {
- EnterCriticalSection(&m_csContextList);
-
- LeaveCriticalSection(&m_csContextList);
- }
- void CIOCPPipe::_ClearContextList()
- {
- EnterCriticalSection(&m_csContextList);
- for( int i=0;i<m_arrayClientContext.GetCount();i++ )
- {
- delete m_arrayClientContext.GetAt(i);
- }
- m_arrayClientContext.RemoveAll();
- LeaveCriticalSection(&m_csContextList);
- }
- int CIOCPPipe::_GetNoOfProcessors()
- {
- SYSTEM_INFO si;
- GetSystemInfo(&si);
- return si.dwNumberOfProcessors;
- }
- void CIOCPPipe::_ShowMessage(const CString szFormat,...) const
- {
- // 根据传入的参数格式化字符串
- CString strMessage;
- va_list arglist;
- // 处理变长参数
- va_start(arglist, szFormat);
- strMessage.FormatV(szFormat,arglist);
- va_end(arglist);
- }
- /////////////////////////////////////////////////////////////////////
- // 判断客户端Socket是否已经断开,否则在一个无效的Socket上投递WSARecv操作会出现异常
- // 使用的方法是尝试向这个socket发送数据,判断这个socket调用的返回值
- // 因为如果客户端网络异常断开(例如客户端崩溃或者拔掉网线等)的时候,服务器端是无法收到客户端断开的通知的
- bool CIOCPPipe::_IsSocketAlive(SOCKET s)
- {
- int nByteSent=send(s,"",0,0);
- if (-1 == nByteSent) return false;
- return true;
- }
- bool CIOCPPipe::HandleError( PER_IO_CONTEXT *pIoContext,const DWORD& dwErr )
- {
- // 如果是超时了,就再继续等吧
- if(WAIT_TIMEOUT == dwErr)
- {
- }
- // 可能是客户端异常退出了
- else if( ERROR_NETNAME_DELETED==dwErr )
- {
- this->_ShowMessage( _T("检测到客户端异常退出!") );
- return true;
- }
- else
- {
- this->_ShowMessage( _T("完成端口操作出现错误,线程退出。错误代码:%d"),dwErr );
- return false;
- }
- }
|