TcpClient.cpp 14 KB


  1. /*
  2. * Copyright: JessMA Open Source (ldcsaa@gmail.com)
  3. *
  4. * Version : 3.6.1
  5. * Author : Bruce Liang
  6. * Website : http://www.jessma.org
  7. * Project : https://github.com/ldcsaa
  8. * Blog : http://www.cnblogs.com/ldcsaa
  9. * Wiki : http://www.oschina.net/p/hp-socket
  10. * QQ Group : 75375912
  11. *
  12. * Licensed under the Apache License, Version 2.0 (the "License");
  13. * you may not use this file except in compliance with the License.
  14. * You may obtain a copy of the License at
  15. *
  16. * http://www.apache.org/licenses/LICENSE-2.0
  17. *
  18. * Unless required by applicable law or agreed to in writing, software
  19. * distributed under the License is distributed on an "AS IS" BASIS,
  20. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  21. * See the License for the specific language governing permissions and
  22. * limitations under the License.
  23. */
  24. #include "stdafx.h"
  25. #include <atlfile.h>
  26. #include "TcpClient.h"
  27. #include "../../Common/Src/WaitFor.h"
  28. #include <process.h>
  29. BOOL CTcpClient::Start(LPCTSTR lpszRemoteAddress, USHORT usPort, BOOL bAsyncConnect, LPCTSTR lpszBindAddress)
  30. {
  31. if(!CheckParams() || !CheckStarting())
  32. return FALSE;
  33. PrepareStart();
  34. m_ccContext.Reset();
  35. BOOL isOK = FALSE;
  36. m_bAsyncConnect = bAsyncConnect;
  37. if(CreateClientSocket())
  38. {
  39. if(BindClientSocket(lpszBindAddress))
  40. {
  41. if(FirePrepareConnect(m_soClient) != HR_ERROR)
  42. {
  43. if(ConnectToServer(lpszRemoteAddress, usPort))
  44. {
  45. if(CreateWorkerThread())
  46. isOK = TRUE;
  47. else
  48. SetLastError(SE_WORKER_THREAD_CREATE, __FUNCTION__, ERROR_CREATE_FAILED);
  49. }
  50. else
  51. SetLastError(SE_CONNECT_SERVER, __FUNCTION__, ::WSAGetLastError());
  52. }
  53. else
  54. SetLastError(SE_SOCKET_PREPARE, __FUNCTION__, ERROR_CANCELLED);
  55. }
  56. else
  57. SetLastError(SE_SOCKET_BIND, __FUNCTION__, ::WSAGetLastError());
  58. }
  59. else
  60. SetLastError(SE_SOCKET_CREATE, __FUNCTION__, ::WSAGetLastError());
  61. if(!isOK)
  62. {
  63. m_ccContext.Reset(FALSE);
  64. Stop();
  65. }
  66. return isOK;
  67. }
  68. BOOL CTcpClient::CheckParams()
  69. {
  70. if (((int)m_dwSocketBufferSize > 0) &&
  71. ((int)m_dwFreeBufferPoolSize >= 0) &&
  72. ((int)m_dwFreeBufferPoolHold >= 0) &&
  73. ((int)m_dwKeepAliveTime >= 1000 || m_dwKeepAliveTime == 0) &&
  74. ((int)m_dwKeepAliveInterval >= 1000 || m_dwKeepAliveInterval == 0) )
  75. return TRUE;
  76. SetLastError(SE_INVALID_PARAM, __FUNCTION__, ERROR_INVALID_PARAMETER);
  77. return FALSE;
  78. }
  79. void CTcpClient::PrepareStart()
  80. {
  81. m_itPool.SetItemCapacity((int)m_dwSocketBufferSize);
  82. m_itPool.SetPoolSize((int)m_dwFreeBufferPoolSize);
  83. m_itPool.SetPoolHold((int)m_dwFreeBufferPoolHold);
  84. m_itPool.Prepare();
  85. }
  86. BOOL CTcpClient::CheckStarting()
  87. {
  88. CSpinLock locallock(m_csState);
  89. if(m_enState == SS_STOPPED)
  90. m_enState = SS_STARTING;
  91. else
  92. {
  93. SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_OPERATION);
  94. return FALSE;
  95. }
  96. return TRUE;
  97. }
  98. BOOL CTcpClient::CheckStoping(DWORD dwCurrentThreadID)
  99. {
  100. if(m_enState == SS_STOPPED)
  101. return FALSE;
  102. CSpinLock locallock(m_csState);
  103. if(HasStarted())
  104. {
  105. m_enState = SS_STOPPING;
  106. return TRUE;
  107. }
  108. else if(m_enState == SS_STOPPING)
  109. {
  110. if(dwCurrentThreadID != m_dwWorkerID)
  111. {
  112. while(m_enState != SS_STOPPED)
  113. ::Sleep(30);
  114. }
  115. SetLastError(SE_ILLEGAL_STATE, __FUNCTION__, ERROR_INVALID_OPERATION);
  116. }
  117. return FALSE;
  118. }
  119. BOOL CTcpClient::CreateClientSocket()
  120. {
  121. m_soClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  122. if(m_soClient != INVALID_SOCKET)
  123. {
  124. BOOL bOnOff = (m_dwKeepAliveTime > 0 && m_dwKeepAliveInterval > 0);
  125. VERIFY(::SSO_KeepAliveVals(m_soClient, bOnOff, m_dwKeepAliveTime, m_dwKeepAliveInterval) == NO_ERROR);
  126. m_evSocket = ::WSACreateEvent();
  127. ASSERT(m_evSocket != WSA_INVALID_EVENT);
  128. return TRUE;
  129. }
  130. return FALSE;
  131. }
  132. BOOL CTcpClient::BindClientSocket(LPCTSTR lpszBindAddress)
  133. {
  134. if(lpszBindAddress)
  135. {
  136. SOCKADDR_IN bindAddr;
  137. if(!::sockaddr_A_2_IN(AF_INET, lpszBindAddress, 0, bindAddr))
  138. {
  139. ::WSASetLastError(WSAEADDRNOTAVAIL);
  140. return FALSE;
  141. }
  142. if(::bind(m_soClient, (struct sockaddr*)&bindAddr, sizeof(SOCKADDR_IN)) == SOCKET_ERROR)
  143. return FALSE;
  144. }
  145. m_dwConnID = ::GenerateConnectionID();
  146. return TRUE;
  147. }
  148. BOOL CTcpClient::ConnectToServer(LPCTSTR lpszRemoteAddress, USHORT usPort)
  149. {
  150. TCHAR szAddress[40];
  151. int iAddressLen = sizeof(szAddress) / sizeof(TCHAR);
  152. if(!::GetIPAddress(lpszRemoteAddress, szAddress, iAddressLen))
  153. {
  154. ::WSASetLastError(WSAEADDRNOTAVAIL);
  155. return FALSE;
  156. }
  157. SOCKADDR_IN addr;
  158. if(!::sockaddr_A_2_IN(AF_INET, szAddress, usPort, addr))
  159. {
  160. ::WSASetLastError(WSAEADDRNOTAVAIL);
  161. return FALSE;
  162. }
  163. BOOL isOK = FALSE;
  164. if(m_bAsyncConnect)
  165. {
  166. if(::WSAEventSelect(m_soClient, m_evSocket, FD_CONNECT | FD_CLOSE) != SOCKET_ERROR)
  167. {
  168. int rc = ::connect(m_soClient, (SOCKADDR*)&addr, sizeof(SOCKADDR_IN));
  169. isOK = (rc == NO_ERROR || (rc == SOCKET_ERROR && ::WSAGetLastError() == WSAEWOULDBLOCK));
  170. }
  171. }
  172. else
  173. {
  174. if(::connect(m_soClient, (SOCKADDR*)&addr, sizeof(SOCKADDR_IN)) != SOCKET_ERROR)
  175. {
  176. if(::WSAEventSelect(m_soClient, m_evSocket, FD_READ | FD_WRITE | FD_CLOSE) != SOCKET_ERROR)
  177. {
  178. if(FireConnect() != HR_ERROR)
  179. {
  180. m_enState = SS_STARTED;
  181. isOK = TRUE;
  182. }
  183. }
  184. }
  185. }
  186. return isOK;
  187. }
  188. BOOL CTcpClient::CreateWorkerThread()
  189. {
  190. m_hWorker = (HANDLE)_beginthreadex(nullptr, 0, WorkerThreadProc, (LPVOID)this, 0, &m_dwWorkerID);
  191. return m_hWorker != nullptr;
  192. }
  193. UINT WINAPI CTcpClient::WorkerThreadProc(LPVOID pv)
  194. {
  195. TRACE("---------------> Client Worker Thread 0x%08X started <---------------\n", ::GetCurrentThreadId());
  196. BOOL bCallStop = TRUE;
  197. CTcpClient* pClient = (CTcpClient*)pv;
  198. HANDLE hEvents[] = {pClient->m_evSocket, pClient->m_evBuffer, pClient->m_evWorker};
  199. pClient->m_rcBuffer.Malloc(pClient->m_dwSocketBufferSize);
  200. while(pClient->HasStarted())
  201. {
  202. DWORD retval = ::WSAWaitForMultipleEvents(3, hEvents, FALSE, WSA_INFINITE, FALSE);
  203. if(retval == WSA_WAIT_EVENT_0)
  204. {
  205. if(!pClient->ProcessNetworkEvent())
  206. break;
  207. }
  208. else if(retval == WSA_WAIT_EVENT_0 + 1)
  209. {
  210. if(!pClient->SendData())
  211. break;
  212. }
  213. else if(retval == WSA_WAIT_EVENT_0 + 2)
  214. {
  215. bCallStop = FALSE;
  216. break;
  217. }
  218. else
  219. ASSERT(FALSE);
  220. }
  221. pClient->OnWorkerThreadEnd(::GetCurrentThreadId());
  222. if(bCallStop && pClient->HasStarted())
  223. pClient->Stop();
  224. TRACE("---------------> Client Worker Thread 0x%08X stoped <---------------\n", ::GetCurrentThreadId());
  225. return 0;
  226. }
  227. BOOL CTcpClient::ProcessNetworkEvent()
  228. {
  229. BOOL bContinue = TRUE;
  230. WSANETWORKEVENTS events;
  231. int rc = ::WSAEnumNetworkEvents(m_soClient, m_evSocket, &events);
  232. if(rc == SOCKET_ERROR)
  233. bContinue = HandleError(events);
  234. if(m_bAsyncConnect && bContinue && events.lNetworkEvents & FD_CONNECT)
  235. bContinue = HandleConnect(events);
  236. if(bContinue && events.lNetworkEvents & FD_READ)
  237. bContinue = HandleRead(events);
  238. if(bContinue && events.lNetworkEvents & FD_WRITE)
  239. bContinue = HandleWrite(events);
  240. if(bContinue && events.lNetworkEvents & FD_CLOSE)
  241. bContinue = HandleClose(events);
  242. return bContinue;
  243. }
  244. BOOL CTcpClient::HandleError(WSANETWORKEVENTS& events)
  245. {
  246. int iCode = ::WSAGetLastError();
  247. EnSocketOperation enOperation = SO_UNKNOWN;
  248. if(events.lNetworkEvents & FD_CONNECT)
  249. enOperation = SO_CONNECT;
  250. else if(events.lNetworkEvents & FD_CLOSE)
  251. enOperation = SO_CLOSE;
  252. else if(events.lNetworkEvents & FD_READ)
  253. enOperation = SO_RECEIVE;
  254. else if(events.lNetworkEvents & FD_WRITE)
  255. enOperation = SO_SEND;
  256. VERIFY(::WSAResetEvent(m_evSocket));
  257. m_ccContext.Reset(TRUE, enOperation, iCode);
  258. return FALSE;
  259. }
  260. BOOL CTcpClient::HandleRead(WSANETWORKEVENTS& events)
  261. {
  262. BOOL bContinue = TRUE;
  263. int iCode = events.iErrorCode[FD_READ_BIT];
  264. if(iCode == 0)
  265. bContinue = ReadData();
  266. else
  267. {
  268. m_ccContext.Reset(TRUE, SO_RECEIVE, iCode);
  269. bContinue = FALSE;
  270. }
  271. return bContinue;
  272. }
  273. BOOL CTcpClient::HandleWrite(WSANETWORKEVENTS& events)
  274. {
  275. BOOL bContinue = TRUE;
  276. int iCode = events.iErrorCode[FD_WRITE_BIT];
  277. if(iCode == 0)
  278. bContinue = SendData();
  279. else
  280. {
  281. m_ccContext.Reset(TRUE, SO_SEND, iCode);
  282. bContinue = FALSE;
  283. }
  284. return bContinue;
  285. }
  286. BOOL CTcpClient::HandleConnect(WSANETWORKEVENTS& events)
  287. {
  288. BOOL bContinue = TRUE;
  289. int iCode = events.iErrorCode[FD_CONNECT_BIT];
  290. if(iCode == 0)
  291. {
  292. if(::WSAEventSelect(m_soClient, m_evSocket, FD_READ | FD_WRITE | FD_CLOSE) != SOCKET_ERROR)
  293. {
  294. if(FireConnect() != HR_ERROR)
  295. m_enState = SS_STARTED;
  296. else
  297. iCode = ERROR_CANCELLED;
  298. }
  299. else
  300. iCode = ::WSAGetLastError();
  301. }
  302. if(iCode != 0)
  303. {
  304. if(iCode != ERROR_CANCELLED)
  305. m_ccContext.Reset(TRUE, SO_CONNECT, iCode);
  306. else
  307. m_ccContext.Reset(FALSE);
  308. bContinue = FALSE;
  309. }
  310. return bContinue;
  311. }
  312. BOOL CTcpClient::HandleClose(WSANETWORKEVENTS& events)
  313. {
  314. int iCode = events.iErrorCode[FD_CLOSE_BIT];
  315. if(iCode == 0)
  316. m_ccContext.Reset(TRUE, SO_CLOSE, SE_OK);
  317. else
  318. m_ccContext.Reset(TRUE, SO_CLOSE, iCode);
  319. return FALSE;
  320. }
  321. BOOL CTcpClient::ReadData()
  322. {
  323. while(TRUE)
  324. {
  325. int rc = recv(m_soClient, (char*)(BYTE*)m_rcBuffer, m_dwSocketBufferSize, 0);
  326. if(rc > 0)
  327. {
  328. if(FireReceive(m_rcBuffer, rc) == HR_ERROR)
  329. {
  330. TRACE("<C-CNNID: %Iu> OnReceive() event return 'HR_ERROR', connection will be closed !\n", m_dwConnID);
  331. m_ccContext.Reset(TRUE, SO_RECEIVE, ERROR_CANCELLED);
  332. return FALSE;
  333. }
  334. }
  335. else if(rc == SOCKET_ERROR)
  336. {
  337. int code = ::WSAGetLastError();
  338. if(code == WSAEWOULDBLOCK)
  339. break;
  340. else
  341. {
  342. m_ccContext.Reset(TRUE, SO_RECEIVE, code);
  343. return FALSE;
  344. }
  345. }
  346. else if(rc == 0)
  347. {
  348. m_ccContext.Reset(TRUE, SO_CLOSE, SE_OK);
  349. return FALSE;
  350. }
  351. else
  352. ASSERT(FALSE);
  353. }
  354. return TRUE;
  355. }
  356. BOOL CTcpClient::SendData()
  357. {
  358. BOOL isOK = TRUE;
  359. while(TRUE)
  360. {
  361. TItemPtr itPtr(m_itPool, GetSendBuffer());
  362. if(itPtr.IsValid())
  363. {
  364. ASSERT(!itPtr->IsEmpty());
  365. isOK = DoSendData(itPtr);
  366. if(isOK)
  367. {
  368. if(!itPtr->IsEmpty())
  369. {
  370. CCriSecLock locallock(m_csSend);
  371. m_lsSend.PushFront(itPtr.Detach());
  372. break;
  373. }
  374. }
  375. else
  376. break;
  377. }
  378. else
  379. break;
  380. }
  381. return isOK;
  382. }
  383. TItem* CTcpClient::GetSendBuffer()
  384. {
  385. TItem* pItem = nullptr;
  386. if(m_lsSend.Size() > 0)
  387. {
  388. CCriSecLock locallock(m_csSend);
  389. if(m_lsSend.Size() > 0)
  390. pItem = m_lsSend.PopFront();
  391. }
  392. return pItem;
  393. }
  394. BOOL CTcpClient::DoSendData(TItem* pItem)
  395. {
  396. while(!pItem->IsEmpty())
  397. {
  398. int rc = 0;
  399. {
  400. CCriSecLock locallock(m_csSend);
  401. rc = send(m_soClient, (char*)pItem->Ptr(), min(pItem->Size(), (int)m_dwSocketBufferSize), 0);
  402. if(rc > 0) m_iPending -= rc;
  403. }
  404. if(rc > 0)
  405. {
  406. if(FireSend(pItem->Ptr(), rc) == HR_ERROR)
  407. {
  408. TRACE("<C-CNNID: %Iu> OnSend() event should not return 'HR_ERROR' !!\n", m_dwConnID);
  409. ASSERT(FALSE);
  410. }
  411. pItem->Reduce(rc);
  412. }
  413. else if(rc == SOCKET_ERROR)
  414. {
  415. int code = ::WSAGetLastError();
  416. if(code == WSAEWOULDBLOCK)
  417. break;
  418. else
  419. {
  420. m_ccContext.Reset(TRUE, SO_SEND, code);
  421. return FALSE;
  422. }
  423. }
  424. else
  425. ASSERT(FALSE);
  426. }
  427. return TRUE;
  428. }
  429. BOOL CTcpClient::Stop()
  430. {
  431. DWORD dwCurrentThreadID = ::GetCurrentThreadId();
  432. if(!CheckStoping(dwCurrentThreadID))
  433. return FALSE;
  434. WaitForWorkerThreadEnd(dwCurrentThreadID);
  435. if(m_ccContext.bFireOnClose)
  436. FireClose(m_ccContext.enOperation, m_ccContext.iErrorCode);
  437. if(m_evSocket != nullptr)
  438. {
  439. ::WSACloseEvent(m_evSocket);
  440. m_evSocket = nullptr;
  441. }
  442. if(m_soClient != INVALID_SOCKET)
  443. {
  444. shutdown(m_soClient, SD_SEND);
  445. closesocket(m_soClient);
  446. m_soClient = INVALID_SOCKET;
  447. }
  448. Reset();
  449. return TRUE;
  450. }
  451. void CTcpClient::Reset()
  452. {
  453. CCriSecLock locallock(m_csSend);
  454. m_rcBuffer.Free();
  455. m_evBuffer.Reset();
  456. m_evWorker.Reset();
  457. m_lsSend.Clear();
  458. m_itPool.Clear();
  459. m_iPending = 0;
  460. m_enState = SS_STOPPED;
  461. }
  462. void CTcpClient::WaitForWorkerThreadEnd(DWORD dwCurrentThreadID)
  463. {
  464. if(m_hWorker != nullptr)
  465. {
  466. if(dwCurrentThreadID != m_dwWorkerID)
  467. {
  468. m_evWorker.Set();
  469. VERIFY(::WaitForSingleObject(m_hWorker, INFINITE) == WAIT_OBJECT_0);
  470. }
  471. ::CloseHandle(m_hWorker);
  472. m_hWorker = nullptr;
  473. m_dwWorkerID = 0;
  474. }
  475. }
  476. BOOL CTcpClient::Send(const BYTE* pBuffer, int iLength, int iOffset)
  477. {
  478. ASSERT(pBuffer && iLength > 0);
  479. if(iOffset != 0) pBuffer += iOffset;
  480. WSABUF buffer;
  481. buffer.len = iLength;
  482. buffer.buf = (char*)pBuffer;
  483. return SendPackets(&buffer, 1);
  484. }
  485. BOOL CTcpClient::DoSendPackets(const WSABUF pBuffers[], int iCount)
  486. {
  487. ASSERT(pBuffers && iCount > 0);
  488. int result = NO_ERROR;
  489. if(pBuffers && iCount > 0)
  490. {
  491. if(HasStarted())
  492. {
  493. CCriSecLock locallock(m_csSend);
  494. if(HasStarted())
  495. result = SendInternal(pBuffers, iCount);
  496. else
  497. result = ERROR_INVALID_STATE;
  498. }
  499. else
  500. result = ERROR_INVALID_STATE;
  501. }
  502. else
  503. result = ERROR_INVALID_PARAMETER;
  504. if(result != NO_ERROR)
  505. ::SetLastError(result);
  506. return (result == NO_ERROR);
  507. }
  508. BOOL CTcpClient::SendInternal(const WSABUF pBuffers[], int iCount)
  509. {
  510. int result = NO_ERROR;
  511. ASSERT(m_iPending >= 0);
  512. int iPending = m_iPending;
  513. BOOL isPending = m_iPending > 0;
  514. for(int i = 0; i < iCount; i++)
  515. {
  516. int iBufLen = pBuffers[i].len;
  517. if(iBufLen > 0)
  518. {
  519. BYTE* pBuffer = (BYTE*)pBuffers[i].buf;
  520. ASSERT(pBuffer);
  521. m_lsSend.Cat(pBuffer, iBufLen);
  522. m_iPending += iBufLen;
  523. }
  524. }
  525. if(!isPending && m_iPending > iPending) m_evBuffer.Set();
  526. return result;
  527. }
  528. BOOL CTcpClient::SendSmallFile(LPCTSTR lpszFileName, const LPWSABUF pHead, const LPWSABUF pTail)
  529. {
  530. ASSERT(lpszFileName != nullptr);
  531. CAtlFile file;
  532. HRESULT hr = file.Create(lpszFileName, GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING);
  533. if(SUCCEEDED(hr))
  534. {
  535. ULONGLONG ullLen;
  536. hr = file.GetSize(ullLen);
  537. if(SUCCEEDED(hr))
  538. {
  539. ULONGLONG ullTotal = ullLen + (pHead ? pHead->len : 0) + (pTail ? pTail->len : 0);
  540. if(ullLen > 0 && ullTotal <= MAX_SMALL_FILE_SIZE)
  541. {
  542. CAtlFileMapping<> fmap;
  543. hr = fmap.MapFile(file);
  544. if(SUCCEEDED(hr))
  545. {
  546. WSABUF bufs[3] = {0};
  547. bufs[1].len = (ULONG)ullLen;
  548. bufs[1].buf = fmap;
  549. if(pHead) memcpy(&bufs[0], pHead, sizeof(WSABUF));
  550. if(pTail) memcpy(&bufs[2], pTail, sizeof(WSABUF));
  551. return SendPackets(bufs, 3);
  552. }
  553. }
  554. else if(ullLen == 0)
  555. hr = HRESULT_FROM_WIN32(ERROR_FILE_INVALID);
  556. else
  557. hr = HRESULT_FROM_WIN32(ERROR_FILE_TOO_LARGE);
  558. }
  559. }
  560. ::SetLastError(hr & 0x0000FFFF);
  561. return FALSE;
  562. }
  563. void CTcpClient::SetLastError(EnSocketError code, LPCSTR func, int ec)
  564. {
  565. TRACE("%s --> Error: %d, EC: %d\n", func, code, ec);
  566. m_enLastError = code;
  567. ::SetLastError(ec);
  568. }
  569. BOOL CTcpClient::GetLocalAddress(TCHAR lpszAddress[], int& iAddressLen, USHORT& usPort)
  570. {
  571. ASSERT(lpszAddress != nullptr && iAddressLen > 0);
  572. return ::GetSocketLocalAddress(m_soClient, lpszAddress, iAddressLen, usPort);
  573. }