#include "StdAfx.h" #include "IClientImpl.h" #include #include #include "ThreadPool.hpp" const int AF_IPV4 = 0; const int AF_IPV6 = 1; const int SOCK_TCP = SOCK_STREAM-1; const int SOCK_UDP = SOCK_DGRAM-1; //IClientImpl* IClientImpl::m_pTcpClient[TCPCLIENTNUM] = {NULL}; IClientImpl::IClientImpl():m_nMode(AF_IPV4) ,m_nSockType(SOCK_TCP) { m_bSocket = FALSE; m_bStopbeat = FALSE; m_SocketClient.SetInterface(this); } IClientImpl::~IClientImpl() { //m_SocketClient.Terminate(); DisConnectServer(); } void IClientImpl::GetAddress(const SockAddrIn& addrIn, CString& rString) const { TCHAR szIPAddr[MAX_PATH] = { 0 }; CSocketHandle::FormatIP(szIPAddr, MAX_PATH, addrIn); rString.Format(_T("%s : %d"), szIPAddr, static_cast(static_cast(ntohs(addrIn.GetPort()))) ); } void IClientImpl::AppendText(LPCTSTR lpszFormat, ...) { // if ( !::IsWindow(m_ctlMsgList.GetSafeHwnd()) ) return; // TCHAR szBuffer[512]; // HWND hWnd = m_ctlMsgList.GetSafeHwnd(); // DWORD dwResult = 0; // if (SendMessageTimeout(hWnd, WM_GETTEXTLENGTH, 0, 0, SMTO_NORMAL, 500L, &dwResult) != 0) // { // int nLen = (int) dwResult; // if (SendMessageTimeout(hWnd, EM_SETSEL, nLen, nLen, SMTO_NORMAL, 500L, &dwResult) != 0) // { // size_t cb = 0; // va_list args; // va_start(args, lpszFormat); // ::StringCchVPrintfEx(szBuffer, 512, NULL, &cb, 0, lpszFormat, args); // va_end(args); // SendMessageTimeout(hWnd, EM_REPLACESEL, FALSE, reinterpret_cast(szBuffer), SMTO_NORMAL, 500L, &dwResult); // } // } } bool IClientImpl::GetDestination(SockAddrIn& addrIn) const { CString strPort; int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6; return addrIn.CreateFrom(NULL, strPort, nFamily); } bool IClientImpl::SetupMCAST() { const TCHAR szIPv4MCAST[] = TEXT("239.121.1.2"); const TCHAR szIPv6MCAST[] = TEXT("FF02:0:0:0:0:0:0:1"); // All Nodes local address bool result = false; if ( m_nSockType == SOCK_UDP ) { if ( m_nMode == AF_IPV4 ) { result = m_SocketClient->AddMembership(szIPv4MCAST, NULL); } else { result = m_SocketClient->AddMembership(szIPv6MCAST, NULL); HRESULT hr = HRESULT_FROM_WIN32(GetLastError()); hr = hr; } } return result; } /////////////////////////////////////////////////////////////////////////////// void IClientImpl::OnThreadBegin(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); (pSH); CString strAddr; SockAddrIn sockAddr; m_SocketClient->GetSockName(sockAddr); GetAddress( sockAddr, strAddr ); InitializeCriticalSection(&pSH->m_hClient2SrvSection); } void IClientImpl::OnThreadExit(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); DeleteCriticalSection( &pSH->m_hClient2SrvSection ); (pSH); } void IClientImpl::OnConnectionDropped(CSocketHandle* pSH) { ASSERT( pSH == m_SocketClient ); (pSH); AppendText( _T("Connection lost with client.\r\n") ); } void IClientImpl::OnConnectionError(CSocketHandle* pSH, DWORD dwError) { ASSERT( pSH == m_SocketClient ); (pSH); _com_error err(dwError); AppendText( _T("Communication Error:\r\n%s\r\n"), err.ErrorMessage() ); } void IClientImpl::OnDataReceived(CSocketHandle* pSH, const BYTE* pbData, DWORD dwCount, const SockAddrIn& addr) { ASSERT( pSH == m_SocketClient ); (pSH); if( !m_SocketClient->IsOpen() ) return; // 处理接收回来的数据; ToprocessRecivebuf(pSH,pbData,dwCount); } void IClientImpl::ToprocessRecivebuf(IN CSocketHandle *pSockHandle, IN const BYTE* pReceivebuf, IN DWORD dwReceiveSize) { DWORD dwIndexOfProcessed = 0; // 当前接收的包被处理的长度; DWORD dwOnceProcessedLen = 0; // 循环一次处理了多少当前包; STProtocolheader *pstProtocolheader; while( dwIndexOfProcessed < dwReceiveSize) { pstProtocolheader = NULL; dwOnceProcessedLen = 0; if ( pSockHandle->m_npendingSize == 0) //----------------------------------------------// 1.第一次接收或完整组包后接收剩余包; { pstProtocolheader = (STProtocolheader *)&((unsigned char *)pReceivebuf)[dwIndexOfProcessed]; if( dwReceiveSize - dwIndexOfProcessed < sizeof(STProtocolheader) ) // 1.1.第一次接收或剩余数据不足一个包头; { // 如果第一次接包就没有收够包头,认为是非法包,扔掉,就是说已处理的长度dwOnceProcessedLen = 0; pSockHandle->m_npendingSize = dwReceiveSize - dwIndexOfProcessed; memcpy(pSockHandle->m_szpendingbuf, pstProtocolheader, pSockHandle->m_npendingSize); } else // 1.2.第一次接收或剩余数据大于一个包头; { // dwOnceProcessedLen = pstProtocolheader->nDataLen; if( (int)pstProtocolheader->nDataLen > dwReceiveSize - dwIndexOfProcessed ) // 1.2.1.第一次接收或剩余数据不足一个完整的协议包; { memcpy(pSockHandle->m_szpendingbuf, pstProtocolheader, dwReceiveSize - dwIndexOfProcessed); //如果第一次接包,pstProtocolheader->nDataLen大于当前包的总长,认为是非法包,扔掉(不处理这部分,不一定是非法包) if( dwIndexOfProcessed == 0 ) { //组包错误,则扔掉当前包 //LOG4C((LOG_NOTICE, "第一次接包,服务器pstProtocolheader->nDataLen大于当前包的总长,认为是非法包,扔掉\r\n")); } pSockHandle->m_npendingSize = dwReceiveSize - dwIndexOfProcessed; dwOnceProcessedLen = dwReceiveSize - dwIndexOfProcessed; // 该次处理的数据长度; } else // ---------------------------------------------------------------------// 1.2.2.第一次接收或剩余数据大于一个完整的协议包; { //LOG4C((LOG_NOTICE, "正常包")); pSockHandle->m_npendingSize = 0; dwOnceProcessedLen = pstProtocolheader->nDataLen; // 该次处理的数据长度; } } } else //------------------------------------------------------------------------------// 2.第n+1次接收数据包; { pstProtocolheader = (STProtocolheader *)pSockHandle->m_szpendingbuf; if( pSockHandle->m_npendingSize < sizeof(STProtocolheader) ) // 2.1.pengingbuf数据小于包头 { DWORD dwRestheader = sizeof(STProtocolheader) - pSockHandle->m_npendingSize; if( dwRestheader < dwReceiveSize ) // 2.1.1.当前包大小 > 剩下包头长度,可以组成一个完整包头; { // -1.先收完一个完整的包头; memcpy( &pSockHandle->m_szpendingbuf[pSockHandle->m_npendingSize], pReceivebuf, dwRestheader); // -2.处理剩余数据; dwOnceProcessedLen = pstProtocolheader->nDataLen - pSockHandle->m_npendingSize; if( dwOnceProcessedLen <= dwReceiveSize ) // 可以组成一个完整的协议包; { memcpy( &pSockHandle->m_szpendingbuf[sizeof(STProtocolheader)], &((char *)pReceivebuf)[dwRestheader], pstProtocolheader->nDataLen - sizeof(STProtocolheader)); } else // 未能组成一个完整的协议包,断续接收等待; { //int nTemp = dwReceiveSize - dwRestheader; //除去头剩余部分的长度 //if ( nTemp > 0 ) //刚好是Header的长度,不用拷贝内存,所以这里加了>0的判断 //{ //memcpy( &pSockHandle->m_szpendingbuf[sizeof(STProtocolheader)],&((char *)pReceivebuf)[dwRestheader],nTemp ); //} memcpy( &pSockHandle->m_szpendingbuf[sizeof(STProtocolheader)],&((char *)pReceivebuf)[dwRestheader],dwReceiveSize - dwRestheader); pSockHandle->m_npendingSize += dwReceiveSize; } } else //------------------------------------------------------// 2.1.2.当前包大小 <= 剩下包头长度,未能或刚好组成一个完整的包头; { memcpy( &pSockHandle->m_szpendingbuf[pSockHandle->m_npendingSize], pReceivebuf, dwReceiveSize ); pSockHandle->m_npendingSize += dwReceiveSize; dwOnceProcessedLen = dwReceiveSize; } } else // --------------------------------------------------------// 2.2.pengingbuf数据大于包头; { dwOnceProcessedLen = pstProtocolheader->nDataLen - pSockHandle->m_npendingSize; if ( dwOnceProcessedLen <= dwReceiveSize ) // 可以组成一个完整的协议包; { memcpy( &pSockHandle->m_szpendingbuf[pSockHandle->m_npendingSize], pReceivebuf, dwOnceProcessedLen ); pSockHandle->m_npendingSize = 0; } else // 未能组成一个完整协议包; { memcpy( &pSockHandle->m_szpendingbuf[pSockHandle->m_npendingSize], pReceivebuf, dwReceiveSize ); pSockHandle->m_npendingSize += dwReceiveSize; } } } if ( dwOnceProcessedLen == 0 ) { // 没有收够包头,认为是非法包,扔掉 //LOG4C((LOG_NOTICE, "没有收够包头,认为是非法包,扔掉")); break; } if ( pSockHandle->m_npendingSize == 0 ) { if ( pstProtocolheader->nDataLen > SOCKET_BUFFSIZE ) { // 包长度超过限制,暂时不处理; //LOG4C((LOG_NOTICE, "pstProtocolheader->nDataLen超过限制")); } if(-1 == OnIntegrityPacket(pSockHandle, pstProtocolheader)) { LOG4C((LOG_NOTICE, "Error OnIntegrityPacket")); break; } } dwIndexOfProcessed += dwOnceProcessedLen; } } int IClientImpl::OnIntegrityPacket(IN CSocketHandle *pSockHandle, IN void *pIntegrityPacket) { STProtocolheader *pHeader = (STProtocolheader *)pIntegrityPacket; if( pHeader == NULL || pHeader->bof != PBOF) { LOG4C((LOG_NOTICE,"协议头标识错误")); return -1; } if( pHeader->nDataLen < 0 || pHeader->nDataLen > 65535 ) { LOG4C((LOG_NOTICE,"协议长度越界错误")); return -1; } unsigned int tmp = VerifyIntegrityPacket(pHeader, pHeader->nDataLen); if(tmp != pHeader->nVerify) { LOG4C((LOG_NOTICE,"协议校验值错误:len=%d,%d != %d",pHeader->nDataLen,tmp,pHeader->nVerify)); return -1; } switch (pHeader->nCmd) { case CMD_HEART: // 公共命令:心跳包 { // 心跳包,不处理任务事务; //LOG4C((LOG_NOTICE,"接收到的心跳包")); } break; case CMD_TOCHAT: // 公共命令:文字聊天; { STChatbody *tChatbody = (STChatbody*)pIntegrityPacket; //TRACE("------------------"); //AfxMessageBox(tChatbody->szChat); //LOG4C((LOG_NOTICE,"聊天内容:%s",tChatbody->szChat)); } break; case CMD_DATABASEINFO: { // 1.接到服务器返回的数据库连接信息; // 2.连接数据库; // 同时,此处做为全局开关,只有获取到了数据库信息才断续其他操作; // 定义一个全局标识 g_bSuccess;才能启动重连线程的心跳机制等等; STDatabaseInfobody *pDatabaseInfobody = (STDatabaseInfobody*)pIntegrityPacket; LOG4C((LOG_NOTICE,"数据库源:%s;数据库源端口:%s;数据库用户:%s;数据库密码:%s;数据库名称:%s", pDatabaseInfobody->szDatabaseServer, pDatabaseInfobody->szDatabaseTCPPort, pDatabaseInfobody->szDatabaseAccount, pDatabaseInfobody->szDatabasePassword, pDatabaseInfobody->szDatabaseName)); } break; default: return 0; } return 0; } //unsigned int IClientImpl::VerityIntegrityPacket(IN void *pIntegrityPacket,unsigned int nPacketSize) //{ // unsigned int checksum = 0; // //if ( nPacketSize <= sizeof(STProtocolheader) ) // //{ // // return 0; // //} // unsigned char *pBody = &((unsigned char*)pIntegrityPacket)[sizeof(STProtocolheader)]; // // if( pBody ) // checksum = crc32( 0, pBody, nPacketSize-sizeof(STProtocolheader) ); // // return checksum; //} BOOL IClientImpl::Initialize() { TCHAR szIPAddr[MAX_PATH] = { 0 }; CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET); AppendText(_T("Local Address (IPv4): %s\r\n"), szIPAddr); CSocketHandle::GetLocalAddress(szIPAddr, MAX_PATH, AF_INET6); AppendText(_T("Local Address (IPv6): %s\r\n"), szIPAddr); return TRUE; } void IClientImpl::StartReConnectSrvThread() { // Jeff.启用重连服务端线程.------------------- m_hRunObject = CreateEvent( NULL, TRUE, FALSE, "ClientThreadRun" ); if ( m_hRunObject == NULL ) { LOG4C((LOG_NOTICE,"创建事件失败")); } m_hReConnectSrvThreadHandle = CreateThread(NULL,0,ReConnectSrvThread,this,0,NULL); if ( m_hReConnectSrvThreadHandle == NULL ) { LOG4C((LOG_NOTICE,"创建线程失败")); } } BOOL IClientImpl::ConnectServer(LPCTSTR strAddr, LPCTSTR strPort) { int nFamily = (m_nMode == AF_IPV4) ? AF_INET : AF_INET6; if ( !m_SocketClient.StartClient(NULL, strAddr, strPort, nFamily, (m_nSockType+1) ) ) { m_bSocket = FALSE; LOG4C((LOG_NOTICE, "连接失败 [%s, %s, %d, %d]",strAddr, strPort, nFamily, (m_nSockType+1))); return FALSE; } else { m_bSocket = TRUE; CSocketHandle* pSH = (CSocketHandle *)m_SocketClient; pSH->m_npendingSize = 0; memset(pSH->m_szpendingbuf, 0, SOCKET_BUFFSIZE); SetupMCAST(); LOG4C((LOG_NOTICE, "连接成功 [%s, %s, %d, %d]",strAddr, strPort, nFamily, (m_nSockType+1))); return TRUE; } } void IClientImpl::DisConnectServer() { if(m_hRunObject) SetEvent(m_hRunObject); if( m_hReConnectSrvThreadHandle ) { if (WaitForSingleObject(m_hReConnectSrvThreadHandle,INFINITE) != WAIT_FAILED) { CloseHandle(m_hReConnectSrvThreadHandle); m_hReConnectSrvThreadHandle = NULL; } } CloseHandle( m_hRunObject ); m_hRunObject = NULL; m_SocketClient.Terminate(); } void IClientImpl::SendMsg(void *pMsg,const int nLen) { if ( m_SocketClient.IsOpen() ) { USES_CONVERSION; if (m_nSockType == SOCK_TCP) { m_SocketClient.Write((const LPBYTE)(pMsg), nLen, NULL); } else { SockAddrIn sockAddr; GetDestination(sockAddr); m_SocketClient.Write((const LPBYTE)(pMsg), nLen, sockAddr); } } else { AfxMessageBox(_T("Socket is not connected")); } } // 静态成员函数,提供全局访问的接口 //IClientImpl* IClientImpl::GetInstancePtr( int iTCPIndex ) //{ // if( NULL == m_pTcpClient[iTCPIndex] ) // { // m_pTcpClient[iTCPIndex] = new IClientImpl(); // } // // return m_pTcpClient[iTCPIndex]; //} DWORD WINAPI IClientImpl::ReConnectSrvThread(LPVOID pInstance) { LOG4C((LOG_NOTICE,"重连服务器线程")); IClientImpl *pClientImpl = (IClientImpl*)pInstance; #if 0 STProtocolheader tProtocolheader; tProtocolheader.nCmd = CMD_HEART; tProtocolheader.nCmdType = 1; tProtocolheader.nDataLen = sizeof(STProtocolheader); //tProtocolheader.nVerify = crc32(0, reinterpret_cast(&tProtocolheader), sizeof(STProtocolheader)); tProtocolheader.nVerify = VerityIntegrityPacket(&tProtocolheader, sizeof(STProtocolheader)); #else STChatbody tChatbody; tChatbody.tPHeader.nCmd = CMD_TOCHAT; tChatbody.tPHeader.nCmdType = 0; tChatbody.tPHeader.nDataLen = sizeof(STChatbody); memset(tChatbody.szChat,99,MAX_CHATLENGTH); tChatbody.szChat[MAX_CHATLENGTH-1] = '\0'; //tChatbody.tPHeader.nVerify = VerifyIntegrityPacket(&tChatbody, sizeof(STChatbody)); tChatbody.GetVerify(); #endif do { #if 0 // 心跳包; if( !pClientImpl->m_bStopbeat && pClientImpl->m_bSocket ) { //LOG4C((LOG_NOTICE,"发送心跳包")); USES_CONVERSION; if( pClientImpl->m_SocketClient->Write((const LPBYTE)&tProtocolheader,sizeof(STProtocolheader)) == -1) pClientImpl->m_bSocket = FALSE; } #else if( !pClientImpl->m_bStopbeat && pClientImpl->m_bSocket ) { //LOG4C((LOG_NOTICE,"发送心跳包")); //USES_CONVERSION; if( pClientImpl->m_SocketClient->Write((const LPBYTE)&tChatbody,sizeof(STChatbody)) == -1) pClientImpl->m_bSocket = FALSE; } #endif // 检测连接状态; if ( pClientImpl->m_bSocket == FALSE ) { if ( pClientImpl->m_SocketClient->IsOpen() == TRUE ) { pClientImpl->m_SocketClient.Terminate(); } if ( FALSE == pClientImpl->ConnectServer(_T(g_szServerIP),_T(g_szCmdPort))) { LOG4C((LOG_NOTICE,"重连服务器失败")); pClientImpl->m_bSocket = FALSE; } else { LOG4C((LOG_NOTICE,"重连服务器成功")); pClientImpl->m_bSocket = TRUE; } } } while( WaitForSingleObject(pClientImpl->m_hRunObject,200L) == WAIT_TIMEOUT ); return 0; } #if 1 void IClientImpl::SendFile(LPCTSTR lpzFileName,LPCTSTR strPort) { CFile file; CFileException e; file.Open(lpzFileName, CFile::modeRead|CFile::shareDenyNone, &e); if (e.m_cause != 0) { LOG4C((LOG_NOTICE,"读取文件失败")); file.Close(); return; } // 1 发送文件信息; LOG4C((LOG_NOTICE,"发送文件信息")); STFileInfobody tfInfo; GetFileName(lpzFileName,tfInfo.szFileName); tfInfo.nFileLength = file.GetLength(); //tfInfo.tPHeader.nCmd = CMD_FILEINFO_TRANSFER; //tfInfo.tPHeader.nCmdType = 1; CFileStatus FileStatus ; if (file.GetStatus(FileStatus) != FALSE) { tfInfo.tCreateDate = FileStatus.m_ctime.GetTime(); tfInfo.tModifyDate = FileStatus.m_mtime.GetTime(); } //tfInfo.tPHeader.nDataLen = sizeof(STFileInfobody);// - sizeof(STProtocolheader); //tfInfo.tPHeader.nVerify = crc32(0,reinterpret_cast(&tfInfo),sizeof(STFileInfobody)); //tfInfo.tPHeader.nVerify = VerifyIntegrityPacket(&tfInfo,sizeof(STFileInfobody)); tfInfo.GetVerify(); LOG4C((LOG_NOTICE,"文件信息:协议标识:%d,协议命令:%d,协议类型:%d,协议长度:%d,协议校验:%d,文件大小:%d", tfInfo.tPHeader.bof, tfInfo.tPHeader.nCmd, tfInfo.tPHeader.nCmdType, tfInfo.tPHeader.nDataLen, tfInfo.tPHeader.nVerify, tfInfo.nFileLength)); m_bStopbeat = TRUE; if( m_bSocket ) { USES_CONVERSION; if ( m_SocketClient->Write((const LPBYTE)&tfInfo,sizeof(STFileInfobody)) == -1) { LOG4C((LOG_NOTICE,"发送文件信息失败")); m_bStopbeat = FALSE; return; } } //return ; Sleep(100); // 异步传输时这里的文件传输会出现发包乱序; // 2 发送文件数据; LOG4C((LOG_NOTICE,"发送文件数据")); STFileContextbody tFileContextbody; //tFileContextbody.tPHeader.nCmd = CMD_FILECONT_TRANSFER; //tFileContextbody.tPHeader.nCmdType = 0; //tFileContextbody.tPHeader.nDataLen = sizeof(STFileContextbody); do { USES_CONVERSION; //memset(tFileContextbody.szFileContext,0,MAX_FILETRANSFERLENGTH); tFileContextbody.nFileContextLen = file.Read(tFileContextbody.szFileContext,MAX_FILETRANSFERLENGTH); //tFileContextbody.tPHeader.nVerify = VerifyIntegrityPacket(&tFileContextbody,sizeof(STFileContextbody)); tFileContextbody.GetVerify(); LOG4C((LOG_NOTICE,"文件内容:协议标识:%d,协议命令:%d,协议类型:%d,协议长度:%d,协议校验:%d", tFileContextbody.tPHeader.bof, tFileContextbody.tPHeader.nCmd, tFileContextbody.tPHeader.nCmdType, tFileContextbody.tPHeader.nDataLen, tFileContextbody.tPHeader.nVerify)); if ( m_SocketClient->Write((const LPBYTE)&tFileContextbody,sizeof(STFileContextbody)) == -1) { LOG4C((LOG_NOTICE,"文件内容传输出错")); break; } if ( tFileContextbody.nFileContextLen < MAX_FILETRANSFERLENGTH ) { break; } tfInfo.nFileLength -= MAX_FILETRANSFERLENGTH; //Sleep(200); } while (tfInfo.nFileLength); LOG4C((LOG_NOTICE,"发送文件结束")); m_bStopbeat = FALSE; } #endif