#include "StdAfx.h" #include "PipeClient.h" #include "Utility.h" PER_IO_CONTEXT CPipeClient::m_IoRead; PER_IO_CONTEXT CPipeClient::m_IoWrite; CPipeClient::CPipeClient(LPCTSTR lpPipeName, DWORD dwMode) { m_hReadInst = INVALID_HANDLE_VALUE; m_bClientStop = FALSE; m_dwMode = dwMode; memset(m_szPipeName, 0, MAX_PATH*sizeof(TCHAR)); memset(m_szWriteBuff, 0, 1024*sizeof(TCHAR)); memset(m_szReceiveBuff, 0, 1024*sizeof(TCHAR)); if ( lpPipeName ) _stprintf_s(m_szPipeName, _T("%s"), lpPipeName); } CPipeClient::~CPipeClient(void) { StopWork(); if ( m_hReadInst != INVALID_HANDLE_VALUE ) CloseHandle(m_hReadInst); } BOOL CPipeClient::StartWork() { if ( !m_bClientStop ) { BOOL bRet = TRUE; HANDLE hConnect = CreateThread(NULL, 0, ConnectThread, this, 0, NULL); HANDLE hReadMsg = CreateThread(NULL, 0, ReadMsgThread, this, 0, NULL); if ( hConnect == NULL /*|| hReadMsg == NULL */) bRet = FALSE; if ( hConnect ) CloseHandle(hConnect); if ( hReadMsg ) CloseHandle(hReadMsg); return bRet; } return TRUE; } DWORD CPipeClient::ConnectThread(LPVOID lpParam) { CPipeClient *pInstance = (CPipeClient*)lpParam; if ( !pInstance ) return 0L; while(!pInstance->m_bClientStop) { if ( pInstance->m_hReadInst != INVALID_HANDLE_VALUE ) { // 1分钟检测; Sleep(60000); Utility::dprintf(_T("m_hReadInst 已存在\n")); continue; } // 等待10秒; if ( !WaitNamedPipe(pInstance->m_szPipeName, 10000) ) {// 如果管道不存在,会立即返回而不考虑超时值,所以此处仍要Sleep; Utility::dprintf(_T("<%ld> WaitNamedPipe 失败\n"), Utility::g_WndInfo.dwProcessId); Sleep(10000); continue; } pInstance->m_hReadInst = CreateFile( pInstance->m_szPipeName, // pipe name GENERIC_READ | GENERIC_WRITE, // read and write access 0, // no sharing NULL, // default security attributes OPEN_EXISTING, // opens existing pipe FILE_FLAG_OVERLAPPED, // default attributes NULL); // no template file // 创建成功,退出; if ( pInstance->m_hReadInst != INVALID_HANDLE_VALUE ) { // 管道连接成功,修改管道通信模式:message-read mode. BOOL fSuccess = SetNamedPipeHandleState( pInstance->m_hReadInst, // pipe handle &pInstance->m_dwMode, // new pipe mode NULL, // don't set maximum bytes NULL); // don't set maximum time if (!fSuccess) { Utility::dprintf(_T("SetNamedPipeHandleState failed. GLE=%d\n"), GetLastError() ); CloseHandle(pInstance->m_hReadInst); } } else { // Exit if an error other than ERROR_PIPE_BUSY occurs. if ( GetLastError() != ERROR_PIPE_BUSY ) { Utility::dprintf(_T("Could not open pipe. GLE=%d\n"), GetLastError() ); } } } Utility::dprintf(_T("<%ld> ConnectThread 退出\n"), Utility::g_WndInfo.dwProcessId); return 0; } DWORD CPipeClient::ReadMsgThread(LPVOID lpParam) { DWORD cbRead = 0; BOOL bSuccess = FALSE; TCHAR chBuf[BUFSIZE]; DWORD dwDataIndex = 0; DWORD dwError = 0; CPipeClient *pInstance = (CPipeClient*)lpParam; if ( !pInstance ) return 0L; int i = 0; while(!pInstance->m_bClientStop) { if ( pInstance->m_hReadInst == INVALID_HANDLE_VALUE ) { Sleep(2000); continue; } //do { bSuccess = ReadFile( pInstance->m_hReadInst, // pipe handle m_IoRead.szBuffer, // buffer to receive reply BUFSIZE, // size of buffer &m_IoRead.dwBufferSize, // number of bytes read (OVERLAPPED*)&pInstance->m_IoRead); // not overlapped dwError = GetLastError(); //if ( !bSuccess && (dwError = GetLastError()) != ERROR_MORE_DATA ) // break; if ( WaitFinish(pInstance->m_hReadInst, &m_IoRead) ) { } Utility::dprintf(_T("读取数据:Error=%ld, Len=%ld, Data=%s\n"), dwError, m_IoRead.m_Overlapped.InternalHigh, m_IoRead.szBuffer); //Utility::dprintf(_T("读取数据:%ld, %ld"), dwError, cbRead); #if 0 TCHAR szMsg[8912] = {0}; _stprintf_s(szMsg, _T("读取数据:%d, %ld, %ld, %s\n"), (int)bSuccess, dwError, cbRead, chBuf); OutputDebugString(szMsg); #endif // 追回数据; memcpy(pInstance->m_szReceiveBuff + dwDataIndex, chBuf, cbRead); dwDataIndex += cbRead; } //while ( !bSuccess ); // repeat loop if ERROR_MORE_DATA // 清空缓存数据; dwDataIndex = 0; memset(chBuf, 0, BUFSIZE*sizeof(TCHAR)); memset(m_IoRead.szBuffer, 0, BUFSIZE); if ( bSuccess ) { Utility::dprintf(_T("读取到的消息=%d"), sizeof(pInstance->m_szReceiveBuff)); //Sleep(3000); Utility::dprintf(_T("读取到的消息=%s"), pInstance->m_szReceiveBuff); // 消息处理; // ... } else { //Utility::dprintf(_T("ReadFile from pipe failed. GLE=%d\n"), dwError ); if ( dwError == ERROR_PIPE_NOT_CONNECTED || dwError == ERROR_BROKEN_PIPE) { Utility::dprintf("CloseHandle\n"); CloseHandle(pInstance->m_hReadInst); pInstance->m_hReadInst = INVALID_HANDLE_VALUE; } #ifdef _DEBUG //Sleep(10000); #endif } memset(pInstance->m_szReceiveBuff, 0, BUFSIZE*sizeof(TCHAR)); } Utility::dprintf(_T("<%ld> ReadMsgThread 退出\n"),Utility::g_WndInfo.dwProcessId); return 0; } BOOL CPipeClient::WaitFinish(HANDLE hPipe, PER_IO_CONTEXT *pIoContext) { #if 1 bool bPendingIO = false; switch(GetLastError()) { // 正在连接中; case ERROR_IO_PENDING: bPendingIO = true; break; // 已经连接; case ERROR_PIPE_CONNECTED: SetEvent(pIoContext->m_Overlapped.hEvent); break; } DWORD dwWait = -1; DWORD dwTransBytes = -1; // 等待读写操作完成; dwWait = WaitForSingleObject(pIoContext->m_Overlapped.hEvent,INFINITE); switch(dwWait) { case 0: if (bPendingIO) { // 获取Overlapped结果; if( GetOverlappedResult(hPipe, &pIoContext->m_Overlapped, &dwTransBytes, TRUE) == FALSE) { printf("ConnectNamedPipe failed %d\n",GetLastError()); return -1; } } break; // 读写完成; case WAIT_IO_COMPLETION: break; } return 0; #else DWORD dwError=::GetLastError(); DWORD dwWait = -1; DWORD dwTransBytes = -1; WaitForSingleObject(pIoContext->m_Overlapped.hEvent, INFINITE); // 等待异步操作完成; if (dwError == ERROR_IO_PENDING) { while (!::GetOverlappedResult(hPipe, &pIoContext->m_Overlapped, &dwTransBytes, false)) { dwError = ::GetLastError(); if ( dwError == ERROR_IO_PENDING ) { Sleep(50); //Utility::dprintf("读等待:%ld", dwError); } else if ( dwError == ERROR_SUCCESS || dwError == ERROR_IO_INCOMPLETE ) { //Utility::dprintf("读完成:%ld,%ld,%ld", dwError, dwTransBytes, pIoContext->m_Overlapped.InternalHigh); break; } else { //Utility::dprintf("读错误:%ld", dwError); dwTransBytes = 0; return 0; } } } return 1; #endif } BOOL CPipeClient::SendMessage(PACKAGE &pak) { // 是否连接了服务端; if ( m_hReadInst == INVALID_HANDLE_VALUE ) return FALSE; // 是否初始化了句柄; return FALSE; } BOOL CPipeClient::SendData(const TCHAR *lpszMsg, DWORD dwDataLen) { if ( m_hReadInst == INVALID_HANDLE_VALUE ) return FALSE; static int i = 0; DWORD dwNumberOfBytesWritten = 0; char szMsg[255] = {0};//"你好----001"; sprintf(szMsg, "发送内容:%d,%d", ::GetCurrentProcessId(), i++); BOOL fWrite = WriteFile(m_hReadInst,szMsg,strlen(szMsg),&dwNumberOfBytesWritten, NULL);//;&m_IoWrite.m_Overlapped); //WaitFinish(m_hReadInst, &m_IoWrite); if ( fWrite ) { Utility::dprintf(_T("SendData:%s\n"),lpszMsg); } return TRUE; }