Browse Source

客户端Pipe也要异步。

Jeff 3 years ago
parent
commit
316c475786

+ 1 - 1
Source/OGCAssist/OGCAssist/OGCAssist.vcproj

@@ -166,7 +166,7 @@
 			OutputDirectory="..\..\..\..\bin\$(SolutionName)"
 			IntermediateDirectory="$(OutDir)\$(ProjectName)\$(ConfigurationName)"
 			ConfigurationType="1"
-			CharacterSet="1"
+			CharacterSet="2"
 			>
 			<Tool
 				Name="VCPreBuildEventTool"

+ 79 - 25
Source/OGCAssist/OGCAssist/PipeClient.cpp

@@ -2,8 +2,6 @@
 #include "PipeClient.h"
 #include "Utility.h"
 
-#define BUFSIZE 512
-
 CPipeClient::CPipeClient(LPCTSTR lpPipeName, DWORD dwMode)
 {
     m_hReadInst = INVALID_HANDLE_VALUE;
@@ -31,7 +29,7 @@ BOOL CPipeClient::StartWork()
         HANDLE hConnect = CreateThread(NULL, 0, ConnectThread, this, 0, NULL);
         HANDLE hReadMsg = CreateThread(NULL, 0, ReadMsgThread, this, 0, NULL);
 
-        if ( hConnect == NULL || hReadMsg == NULL )
+        if ( hConnect == NULL /*|| hReadMsg == NULL */)
             bRet = FALSE;
 
         if ( hConnect )
@@ -75,7 +73,7 @@ DWORD CPipeClient::ConnectThread(LPVOID lpParam)
             0,                              // no sharing 
             NULL,                           // default security attributes
             OPEN_EXISTING,                  // opens existing pipe 
-            0,                              // default attributes 
+            FILE_FLAG_OVERLAPPED,			// default attributes 
             NULL);                          // no template file 
 
         // 创建成功,退出;
@@ -92,16 +90,6 @@ DWORD CPipeClient::ConnectThread(LPVOID lpParam)
                 Utility::dprintf(_T("SetNamedPipeHandleState failed. GLE=%d\n"), GetLastError() ); 
                 CloseHandle(pInstance->m_hReadInst);
             }
-
-			// 初始化写管道;
-			pInstance->m_hWriteInst = CreateFile(
-				pInstance->m_szPipeName,        // pipe name 
-				GENERIC_READ | GENERIC_WRITE,   // read and write access 
-				0,                              // no sharing 
-				NULL,                           // default security attributes
-				OPEN_EXISTING,                  // opens existing pipe 
-				0,                              // default attributes 
-				NULL);                          // no template file 
         }
         else
         {
@@ -130,6 +118,7 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
     if ( !pInstance ) 
         return 0L;
 
+	int i = 0;
     while(!pInstance->m_bClientStop)
     {
         if ( pInstance->m_hReadInst == INVALID_HANDLE_VALUE ) {
@@ -140,16 +129,22 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
         do 
         { 
             bSuccess = ReadFile( 
-                pInstance->m_hReadInst,     // pipe handle 
-                chBuf,                      // buffer to receive reply 
-                BUFSIZE*sizeof(TCHAR),      // size of buffer 
-                &cbRead,                    // number of bytes read 
-                NULL);                      // not overlapped 
+                pInstance->m_hReadInst,						// pipe handle 
+                pInstance->m_IoRead.szBuffer,				// buffer to receive reply 
+                BUFSIZE,									// size of buffer 
+                &cbRead,									// number of bytes read 
+                (OVERLAPPED*)&pInstance->m_IoRead);			// not overlapped 
+
+			if ( !bSuccess && (dwError = GetLastError()) != ERROR_MORE_DATA )
+				break; 
 
-            if ( !bSuccess && (dwError = GetLastError()) != ERROR_MORE_DATA )
-                break; 
 
-            Utility::dprintf(_T("读取数据:%ld, %ld, %s"), dwError, cbRead, chBuf);
+			if ( WaitFinish(pInstance->m_hReadInst, &pInstance->m_IoRead) )
+			{
+
+			}
+            
+            Utility::dprintf(_T("读取数据%d:%ld, %ld, %s"), i++, dwError, pInstance->m_IoRead.m_Overlapped.InternalHigh, pInstance->m_IoRead.szBuffer);
             //Utility::dprintf(_T("读取数据:%ld, %ld"), dwError, cbRead);
 #if 0
             TCHAR szMsg[8912] = {0};
@@ -159,7 +154,6 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
             // 追回数据;
             memcpy(pInstance->m_szReceiveBuff + dwDataIndex, chBuf, cbRead);
             dwDataIndex += cbRead;
-            Sleep(50);
         } while ( !bSuccess );  // repeat loop if ERROR_MORE_DATA 
 
         // 清空缓存数据;
@@ -169,7 +163,7 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
         if ( bSuccess )
         {
             Utility::dprintf(_T("读取到的消息=%d"), sizeof(pInstance->m_szReceiveBuff));
-            Sleep(3000);
+            //Sleep(3000);
             Utility::dprintf(_T("读取到的消息=%s"), pInstance->m_szReceiveBuff);
 
             // 消息处理;
@@ -185,7 +179,7 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
             }
 
 #ifdef _DEBUG
-            Sleep(10000);
+            //Sleep(10000);
 #endif
         }
 
@@ -197,6 +191,47 @@ DWORD CPipeClient::ReadMsgThread(LPVOID lpParam)
     return 0;
 }
 
+BOOL CPipeClient::WaitFinish(HANDLE hPipe, PER_IO_CONTEXT *pIoContext)
+{
+	bool bPendingIO = false;
+	switch(GetLastError())
+	{
+		// 正在连接中;
+	case ERROR_IO_PENDING:
+		bPendingIO = true;
+		break;
+		// 已经连接;
+	case ERROR_PIPE_CONNECTED:
+		SetEvent(pIoContext->m_Overlapped.hEvent);
+		break;
+	}
+
+	DWORD dwWait = -1;
+	DWORD dwTransBytes = -1;
+
+	// 等待读写操作完成;
+	dwWait = WaitForSingleObject(pIoContext->m_Overlapped.hEvent,INFINITE);
+	switch(dwWait)
+	{
+	case 0:
+		if (bPendingIO)
+		{
+			// 获取Overlapped结果;
+			if( GetOverlappedResult(hPipe, &pIoContext->m_Overlapped, &dwTransBytes, FALSE) == FALSE)
+			{
+				printf("ConnectNamedPipe  failed   %d\n",GetLastError());
+				return -1;
+			}
+		}
+		break;
+		// 读写完成;
+	case WAIT_IO_COMPLETION:
+		break;
+	}
+
+	return 0;
+}
+
 BOOL CPipeClient::SendMessage(PACKAGE &pak)
 {
 	// 是否连接了服务端;
@@ -205,4 +240,23 @@ BOOL CPipeClient::SendMessage(PACKAGE &pak)
 
 	// 是否初始化了句柄;
 
+	return FALSE;
+}
+
+BOOL CPipeClient::SendData(const TCHAR *lpszMsg, DWORD dwDataLen)
+{
+	if ( m_hReadInst == INVALID_HANDLE_VALUE ) 
+		return FALSE;
+
+	static int i = 0;
+	DWORD dwNumberOfBytesWritten = 0;
+	char szMsg[255] = {0};//"你好----001";
+	sprintf(szMsg, "发送内容:%d,%d", ::GetCurrentProcessId(), i++);
+	BOOL fWrite = WriteFile(m_hReadInst,szMsg,strlen(szMsg),&dwNumberOfBytesWritten,NULL);
+	if ( fWrite )
+	{
+		Utility::dprintf(_T("SendData:%s\n"),lpszMsg);
+	}
+
+	return TRUE;
 }

+ 31 - 0
Source/OGCAssist/OGCAssist/PipeClient.h

@@ -1,6 +1,31 @@
 #pragma once
 #include "Protocol.h"
 
+#define BUFSIZE 4096
+typedef struct _PER_IO_CONTEXT
+{
+	OVERLAPPED	m_Overlapped;
+	BYTE		szBuffer[BUFSIZE];
+	DWORD		dwBufferSize;
+
+	_PER_IO_CONTEXT()
+	{
+		dwBufferSize = 0;
+		memset(&m_Overlapped, 0, sizeof(OVERLAPPED));
+		memset(szBuffer, 0, BUFSIZE);
+		m_Overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	}
+
+	~_PER_IO_CONTEXT()
+	{
+		if ( m_Overlapped.hEvent != NULL )
+		{
+			SetEvent(m_Overlapped.hEvent);
+			CloseHandle(m_Overlapped.hEvent);
+		}
+	}
+}PER_IO_CONTEXT, *PPER_IO_CONTEXT;
+
 class CPipeClient
 {
 public:
@@ -22,6 +47,9 @@ private:
     // 缓存区;
     TCHAR m_szWriteBuff[1024];
     TCHAR m_szReceiveBuff[1024];
+
+	PER_IO_CONTEXT m_IoRead;
+	PER_IO_CONTEXT m_IoWrite;
 public:
     BOOL StartWork();
     void StopWork() { m_bClientStop = TRUE;Sleep(100000);}
@@ -29,6 +57,9 @@ public:
     static DWORD WINAPI ConnectThread(LPVOID lpParam);
     // 读取管道消息线程;
     static DWORD WINAPI ReadMsgThread(LPVOID lpParam);
+	// 等待重叠IO完成;
+	static BOOL WaitFinish(HANDLE hPipe, PER_IO_CONTEXT *pIoContext);
 	// 发送消息给服务端;
 	BOOL SendMessage(PACKAGE &pak);
+	BOOL SendData(const TCHAR *lpszMsg, DWORD dwDataLen);
 };

+ 8 - 0
Source/OGCAssist/OGCAssist/dllmain.cpp

@@ -46,6 +46,14 @@ int _tmain(int argc, TCHAR* argv[], TCHAR* envp[])
 		Utility::g_pPipeClient->StartWork();
 	}
 	
+	int i = 0;
+	TCHAR szData[36] = {0};
+	while (true)
+	{
+		_stprintf(szData, _T("·¢ËÍÄÚÈÝ£º%d,%d"), ::GetCurrentProcessId(), i++);
+		Utility::g_pPipeClient->SendData(szData, _tcslen(szData));
+		Sleep(500);
+	}
 
 	system("pause");
 }