RingBuffer.h 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164
  1. /*
  2. * Copyright: JessMA Open Source (ldcsaa@gmail.com)
  3. *
  4. * Version : 2.3.15
  5. * Author : Bruce Liang
  6. * Website : http://www.jessma.org
  7. * Project : https://github.com/ldcsaa
  8. * Blog : http://www.cnblogs.com/ldcsaa
  9. * Wiki : http://www.oschina.net/p/hp-socket
  10. * QQ Group : 75375912
  11. *
  12. * Licensed under the Apache License, Version 2.0 (the "License");
  13. * you may not use this file except in compliance with the License.
  14. * You may obtain a copy of the License at
  15. *
  16. * http://www.apache.org/licenses/LICENSE-2.0
  17. *
  18. * Unless required by applicable law or agreed to in writing, software
  19. * distributed under the License is distributed on an "AS IS" BASIS,
  20. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  21. * See the License for the specific language governing permissions and
  22. * limitations under the License.
  23. */
  24. #pragma once
  25. #include "STLHelper.h"
  26. #include "RWLock.h"
  27. #include "CriticalSection.h"
  28. #define CACHE_LINE 64
  29. #define PACK_SIZE_OF(T) (CACHE_LINE - sizeof(T) % CACHE_LINE)
  30. #if !defined (_WIN64)
  31. #pragma pack(push, 4)
  32. #endif
  33. template <class T, class _PutGuard = CCriSec, class _GetGuard = CCriSec> class CRingBuffer
  34. {
  35. public:
  36. static const UINT DEFAULT_EXPECT = 4096;
  37. public:
  38. BOOL Put(T* pElement)
  39. {
  40. ASSERT(pElement != nullptr);
  41. {
  42. CLocalLock<_PutGuard> locallock(m_csPut);
  43. ULONGLONG seqPut = m_seqPut;
  44. WaitForPut(seqPut);
  45. if(!IsValid()) return FALSE;
  46. DoPut(pElement, seqPut);
  47. }
  48. return TRUE;
  49. }
  50. BOOL TryPut(T* pElement)
  51. {
  52. ASSERT(pElement != nullptr);
  53. if(!IsValid() || !HasPutSpace(m_seqPut))
  54. return FALSE;
  55. {
  56. CLocalLock<_PutGuard> locallock(m_csPut);
  57. ULONGLONG seqPut = m_seqPut;
  58. if(!IsValid() || !HasPutSpace(seqPut))
  59. return FALSE;
  60. DoPut(pElement, seqPut);
  61. }
  62. return TRUE;
  63. }
  64. BOOL PutBatch(T* pElements[], int& iCount)
  65. {
  66. ASSERT(pElements != nullptr && iCount > 0);
  67. {
  68. CLocalLock<_PutGuard> locallock(m_csPut);
  69. ULONGLONG seqPut = m_seqPut;
  70. for(int i = 0; i < iCount; ++i)
  71. {
  72. WaitForPut(seqPut);
  73. if(!IsValid())
  74. {
  75. iCount = i;
  76. return FALSE;
  77. }
  78. DoPut(*(pElements + i), seqPut);
  79. }
  80. }
  81. return TRUE;
  82. }
  83. BOOL TryPutBatch(T* pElements[], int& iCount)
  84. {
  85. ASSERT(pElements != nullptr && iCount > 0);
  86. if(!IsValid() || !HasPutSpace(m_seqPut))
  87. {
  88. iCount = 0;
  89. return FALSE;
  90. }
  91. {
  92. CLocalLock<_PutGuard> locallock(m_csPut);
  93. ULONGLONG seqPut = m_seqPut;
  94. for(int i = 0; i < iCount; ++i)
  95. {
  96. if(!IsValid() || !HasPutSpace(seqPut))
  97. {
  98. iCount = i;
  99. return FALSE;
  100. }
  101. DoPut(*(pElements + i), seqPut);
  102. }
  103. }
  104. return TRUE;
  105. }
  106. BOOL Get(T** pElement)
  107. {
  108. ASSERT(pElement != nullptr);
  109. {
  110. CLocalLock<_GetGuard> locallock(m_csGet);
  111. ULONGLONG seqGet = m_seqGet;
  112. WaitForGet(seqGet);
  113. if(!IsValid()) return FALSE;
  114. DoGet(pElement, seqGet);
  115. }
  116. return TRUE;
  117. }
  118. BOOL TryGet(T** pElement)
  119. {
  120. ASSERT(pElement != nullptr);
  121. if(!IsValid() || !HasGetSpace(m_seqGet))
  122. return FALSE;
  123. {
  124. CLocalLock<_GetGuard> locallock(m_csGet);
  125. ULONGLONG seqGet = m_seqGet;
  126. if(!IsValid() || !HasGetSpace(seqGet))
  127. return FALSE;
  128. DoGet(pElement, seqGet);
  129. }
  130. return TRUE;
  131. }
  132. BOOL GetBatch(T* pElements[], int& iCount)
  133. {
  134. ASSERT(pElements != nullptr && iCount > 0);
  135. {
  136. CLocalLock<_GetGuard> locallock(m_csGet);
  137. ULONGLONG seqGet = m_seqGet;
  138. for(int i = 0; i < iCount; ++i)
  139. {
  140. WaitForGet(seqGet);
  141. if(!IsValid())
  142. {
  143. iCount = i;
  144. return FALSE;
  145. }
  146. DoGet(pElements + i, seqGet);
  147. }
  148. }
  149. return TRUE;
  150. }
  151. BOOL TryGetBatch(T* pElements[], int& iCount)
  152. {
  153. ASSERT(pElements != nullptr && iCount > 0);
  154. if(!IsValid() || !HasGetSpace(m_seqGet))
  155. {
  156. iCount = 0;
  157. return FALSE;
  158. }
  159. {
  160. CLocalLock<_GetGuard> locallock(m_csGet);
  161. ULONGLONG seqGet = m_seqGet;
  162. for(int i = 0; i < iCount; ++i)
  163. {
  164. if(!IsValid() || !HasGetSpace(seqGet))
  165. {
  166. iCount = i;
  167. return FALSE;
  168. }
  169. DoGet(pElements + i, seqGet);
  170. }
  171. }
  172. return TRUE;
  173. }
  174. BOOL Peek(T** pElement)
  175. {
  176. ASSERT(pElement != nullptr);
  177. {
  178. //CLocalLock<_GetGuard> locallock(m_csGet);
  179. ULONGLONG seqGet = m_seqGet;
  180. if(!IsValid() || !HasGetSpace(seqGet))
  181. return FALSE;
  182. DoPeek(pElement, seqGet);
  183. }
  184. return TRUE;
  185. }
  186. BOOL Create(DWORD dwExpect = DEFAULT_EXPECT)
  187. {
  188. ASSERT(!IsValid() && dwExpect > 0);
  189. if(IsValid()) return FALSE;
  190. m_seqPut = 0;
  191. m_seqGet = 0;
  192. m_dwReal = Revise(dwExpect);
  193. m_pv = (T**)malloc(m_dwReal * sizeof(T*));
  194. m_bValid = (m_pv != nullptr);
  195. return IsValid();
  196. }
  197. BOOL Destroy()
  198. {
  199. if(IsValid())
  200. {
  201. m_bValid = FALSE;
  202. CLocalLock<_PutGuard> locallock1(m_csPut);
  203. CLocalLock<_GetGuard> locallock2(m_csGet);
  204. free((void*)m_pv);
  205. m_pv = nullptr;
  206. m_dwReal = 0;
  207. m_seqPut = 0;
  208. m_seqGet = 0;
  209. return TRUE;
  210. }
  211. return FALSE;
  212. }
  213. private:
  214. void DoPut(T* pElement, ULONGLONG& seqPut)
  215. {
  216. DWORD index = seqPut & (m_dwReal - 1);
  217. *(m_pv + index) = pElement;
  218. ++seqPut;
  219. m_seqPut = seqPut;
  220. }
  221. void DoGet(T** pElement, ULONGLONG& seqGet)
  222. {
  223. DWORD index = seqGet & (m_dwReal - 1);
  224. *(pElement) = *(m_pv + index);
  225. ++seqGet;
  226. m_seqGet = seqGet;
  227. }
  228. void DoPeek(T** pElement, ULONGLONG& seqGet)
  229. {
  230. DWORD index = seqGet & (m_dwReal - 1);
  231. *(pElement) = *(m_pv + index);
  232. }
  233. BOOL HasPutSpace(ULONGLONG seqPut)
  234. {
  235. return (seqPut - m_seqGet < m_dwReal);
  236. }
  237. void WaitForPut(ULONGLONG seqPut)
  238. {
  239. for(DWORD w = 0; IsValid(); ++w)
  240. {
  241. if(HasPutSpace(seqPut))
  242. break;
  243. ::YieldThread(w);
  244. }
  245. }
  246. BOOL HasGetSpace(ULONGLONG seqGet)
  247. {
  248. return (m_seqPut - seqGet > 0);
  249. }
  250. void WaitForGet(ULONGLONG seqGet)
  251. {
  252. for(DWORD w = 0; IsValid(); ++w)
  253. {
  254. if(HasGetSpace(seqGet))
  255. break;
  256. ::YieldThread(w);
  257. }
  258. }
  259. DWORD Revise(DWORD dwExpect)
  260. {
  261. int index = 0;
  262. int shift = sizeof(DWORD) * 8 - 1;
  263. for(int i = shift; i >= 0; i--)
  264. {
  265. if(index == 0)
  266. {
  267. if(dwExpect & (1 << i))
  268. {
  269. index = i;
  270. if(index == shift)
  271. break;
  272. }
  273. }
  274. else
  275. {
  276. if(dwExpect & (1 << i))
  277. ++index;
  278. break;
  279. }
  280. }
  281. return 1 << index;
  282. }
  283. public:
  284. CRingBuffer(BOOL bCreate = FALSE, DWORD uiExpect = DEFAULT_EXPECT)
  285. : m_pv(nullptr)
  286. , m_bValid(FALSE)
  287. , m_dwReal(0)
  288. , m_seqPut(0)
  289. , m_seqGet(0)
  290. {
  291. ASSERT(uiExpect > 0);
  292. if(bCreate)
  293. {
  294. Create(uiExpect);
  295. ASSERT(IsValid());
  296. }
  297. }
  298. ~CRingBuffer()
  299. {
  300. Destroy();
  301. }
  302. BOOL IsValid() {return m_bValid;}
  303. private:
  304. CRingBuffer(const CRingBuffer&);
  305. CRingBuffer operator = (const CRingBuffer&);
  306. private:
  307. BOOL m_bValid;
  308. DWORD m_dwReal;
  309. T** m_pv;
  310. char pack1[PACK_SIZE_OF(T**)];
  311. volatile ULONGLONG m_seqPut;
  312. char pack4[PACK_SIZE_OF(ULONGLONG)];
  313. volatile ULONGLONG m_seqGet;
  314. char pack5[PACK_SIZE_OF(ULONGLONG)];
  315. _PutGuard m_csPut;
  316. char pack2[PACK_SIZE_OF(_PutGuard)];
  317. _GetGuard m_csGet;
  318. char pack3[PACK_SIZE_OF(_GetGuard)];
  319. };
  320. typedef CRingBuffer<void, CCriSec, CCriSec> CCSRingBuffer;
  321. typedef CRingBuffer<void, CInterCriSec, CInterCriSec> CICSRingBuffer;
  322. typedef CRingBuffer<void, CSpinGuard, CSpinGuard> CSGRingBuffer;
  323. typedef CRingBuffer<void, CFakeGuard, CFakeGuard> CFKRingBuffer;
  324. template <class T, class index_type = DWORD, bool adjust_index = false> class CRingCache
  325. {
  326. public:
  327. typedef T* TPTR;
  328. typedef volatile T* VTPTR;
  329. typedef unordered_set<index_type> IndexSet;
  330. typedef typename IndexSet::const_iterator IndexSetCI;
  331. typedef typename IndexSet::iterator IndexSetI;
  332. static TPTR const E_EMPTY;
  333. static TPTR const E_LOCKED;
  334. static TPTR const E_MAX_STATUS;
  335. public:
  336. static index_type& INDEX_INC(index_type& dwIndex) {if(adjust_index) ++dwIndex; return dwIndex;}
  337. static index_type& INDEX_DEC(index_type& dwIndex) {if(adjust_index) --dwIndex; return dwIndex;}
  338. private:
  339. VTPTR& INDEX_VAL(index_type dwIndex) {return *(m_pv + dwIndex);}
  340. public:
  341. BOOL Put(TPTR pElement, index_type& dwIndex)
  342. {
  343. ASSERT(pElement != nullptr);
  344. BOOL isOK = FALSE;
  345. while(true)
  346. {
  347. if(!HasSpace())
  348. break;
  349. DWORD dwCurSeq = m_dwCurSeq;
  350. index_type dwCurIndex = dwCurSeq % m_dwSize;
  351. VTPTR& pValue = INDEX_VAL(dwCurIndex);
  352. if(pValue == E_EMPTY)
  353. {
  354. if(::InterlockedCompareExchangePointer((volatile PVOID*)&pValue, pElement, E_EMPTY) == E_EMPTY)
  355. {
  356. ::InterlockedIncrement(&m_dwCount);
  357. ::InterlockedCompareExchange(&m_dwCurSeq, dwCurSeq + 1, dwCurSeq);
  358. if(pElement != E_LOCKED)
  359. EmplaceIndex(dwCurIndex);
  360. dwIndex = INDEX_INC(dwCurIndex);
  361. isOK = TRUE;
  362. break;
  363. }
  364. }
  365. ::InterlockedCompareExchange(&m_dwCurSeq, dwCurSeq + 1, dwCurSeq);
  366. }
  367. return isOK;
  368. }
  369. BOOL Get(index_type dwIndex, TPTR* ppElement)
  370. {
  371. INDEX_DEC(dwIndex);
  372. ASSERT(dwIndex < m_dwSize);
  373. ASSERT(ppElement != nullptr);
  374. if(dwIndex >= m_dwSize)
  375. {
  376. *ppElement = nullptr;
  377. return FALSE;
  378. }
  379. *ppElement = (TPTR)INDEX_VAL(dwIndex);
  380. return IsValidElement(*ppElement);
  381. }
  382. BOOL Set(index_type dwIndex, TPTR pElement, TPTR* ppOldElement = nullptr)
  383. {
  384. TPTR pElement2 = nullptr;
  385. Get(dwIndex, &pElement2);
  386. if(ppOldElement != nullptr)
  387. *ppOldElement = pElement2;
  388. if(pElement == pElement2)
  389. return FALSE;
  390. int f1 = 0;
  391. int f2 = 0;
  392. if(pElement == E_EMPTY)
  393. {
  394. if(pElement2 == E_LOCKED)
  395. f1 = -1;
  396. else
  397. f1 = f2 = -1;
  398. }
  399. else if(pElement == E_LOCKED)
  400. {
  401. if(pElement2 == E_EMPTY)
  402. f1 = 1;
  403. else
  404. f2 = -1;
  405. }
  406. else
  407. {
  408. if(pElement2 == E_EMPTY)
  409. f1 = f2 = 1;
  410. else if(pElement2 == E_LOCKED)
  411. f2 = 1;
  412. }
  413. INDEX_DEC(dwIndex);
  414. BOOL bSetValueFirst = (f1 + f2 >= 0);
  415. if(bSetValueFirst) INDEX_VAL(dwIndex) = pElement;
  416. if(f1 > 0) ::InterlockedIncrement(&m_dwCount);
  417. if(f2 != 0) (f2 > 0) ? EmplaceIndex(dwIndex) : EraseIndex(dwIndex);
  418. if(f1 < 0) ::InterlockedDecrement(&m_dwCount);
  419. if(!bSetValueFirst) INDEX_VAL(dwIndex) = pElement;
  420. ASSERT(Spaces() <= Size());
  421. return TRUE;
  422. }
  423. BOOL Remove(index_type dwIndex, TPTR* ppElement = nullptr)
  424. {
  425. return Set(dwIndex, E_EMPTY, ppElement);
  426. }
  427. BOOL AcquireLock(index_type& dwIndex)
  428. {
  429. return Put(E_LOCKED, dwIndex);
  430. }
  431. BOOL ReleaseLock(index_type dwIndex, TPTR pElement)
  432. {
  433. ASSERT(pElement == nullptr || IsValidElement(pElement));
  434. TPTR pElement2 = nullptr;
  435. Get(dwIndex, &pElement2);
  436. if(pElement2 != E_LOCKED)
  437. return FALSE;
  438. Set(dwIndex, pElement);
  439. return TRUE;
  440. }
  441. public:
  442. void Reset(DWORD dwSize = 0)
  443. {
  444. if(IsValid())
  445. Destroy();
  446. if(dwSize > 0)
  447. Create(dwSize);
  448. }
  449. BOOL GetAllElementIndexes(index_type ids[], DWORD& dwCount, BOOL bCopy = TRUE)
  450. {
  451. if(ids == nullptr || dwCount == 0)
  452. {
  453. dwCount = Elements();
  454. return FALSE;
  455. }
  456. IndexSet* pIndexes = nullptr;
  457. IndexSet indexes;
  458. if(bCopy)
  459. pIndexes = &CopyIndexes(indexes);
  460. else
  461. pIndexes = &m_indexes;
  462. BOOL isOK = FALSE;
  463. DWORD dwSize = (DWORD)pIndexes->size();
  464. if(dwSize > 0 && dwSize <= dwCount)
  465. {
  466. IndexSetCI it = pIndexes->begin();
  467. IndexSetCI end = pIndexes->end();
  468. for(int i = 0; it != end; ++it, ++i)
  469. {
  470. index_type index = *it;
  471. ids[i] = INDEX_INC(index);
  472. }
  473. isOK = TRUE;
  474. }
  475. dwCount = dwSize;
  476. return isOK;
  477. }
  478. unique_ptr<index_type[]> GetAllElementIndexes(DWORD& dwCount, BOOL bCopy = TRUE)
  479. {
  480. IndexSet* pIndexes = nullptr;
  481. IndexSet indexes;
  482. if(bCopy)
  483. pIndexes = &CopyIndexes(indexes);
  484. else
  485. pIndexes = &m_indexes;
  486. unique_ptr<index_type[]> ids;
  487. dwCount = (DWORD)pIndexes->size();
  488. if(dwCount > 0)
  489. {
  490. ids.reset(new index_type[dwCount]);
  491. IndexSetCI it = pIndexes->begin();
  492. IndexSetCI end = pIndexes->end();
  493. for(int i = 0; it != end; ++it, ++i)
  494. {
  495. index_type index = *it;
  496. ids[i] = INDEX_INC(index);
  497. }
  498. }
  499. return ids;
  500. }
  501. static BOOL IsValidElement(TPTR pElement) {return pElement > E_MAX_STATUS;}
  502. DWORD Size () {return m_dwSize;}
  503. DWORD Elements () {return (DWORD)m_indexes.size();}
  504. DWORD Spaces () {return m_dwSize - m_dwCount;}
  505. BOOL HasSpace () {return m_dwCount < m_dwSize;}
  506. BOOL IsEmpty () {return m_dwCount == 0;}
  507. BOOL IsValid () {return m_pv != nullptr;}
  508. private:
  509. void Create(DWORD dwSize)
  510. {
  511. ASSERT(!IsValid() && dwSize > 0);
  512. m_dwCurSeq = 0;
  513. m_dwCount = 0;
  514. m_dwSize = dwSize;
  515. m_pv = (VTPTR*)malloc(m_dwSize * sizeof(TPTR));
  516. ::ZeroMemory(m_pv, m_dwSize * sizeof(TPTR));
  517. }
  518. void Destroy()
  519. {
  520. ASSERT(IsValid());
  521. m_indexes.clear();
  522. free((void*)m_pv);
  523. m_pv = nullptr;
  524. m_dwSize = 0;
  525. m_dwCount = 0;
  526. m_dwCurSeq = 0;
  527. }
  528. IndexSet& CopyIndexes(IndexSet& indexes)
  529. {
  530. {
  531. CReadLock locallock(m_cs);
  532. indexes = m_indexes;
  533. }
  534. return indexes;
  535. }
  536. void EmplaceIndex(index_type dwIndex)
  537. {
  538. CWriteLock locallock(m_cs);
  539. m_indexes.emplace(dwIndex);
  540. }
  541. void EraseIndex(index_type dwIndex)
  542. {
  543. CWriteLock locallock(m_cs);
  544. m_indexes.erase(dwIndex);
  545. }
  546. public:
  547. CRingCache (DWORD dwSize = 0)
  548. : m_pv (nullptr)
  549. , m_dwSize (0)
  550. , m_dwCount (0)
  551. , m_dwCurSeq(0)
  552. {
  553. Reset(dwSize);
  554. }
  555. ~CRingCache()
  556. {
  557. Reset(0);
  558. }
  559. private:
  560. CRingCache(const CRingCache&);
  561. CRingCache operator = (const CRingCache&);
  562. private:
  563. DWORD m_dwSize;
  564. VTPTR* m_pv;
  565. char pack1[PACK_SIZE_OF(VTPTR*)];
  566. volatile DWORD m_dwCurSeq;
  567. char pack2[PACK_SIZE_OF(DWORD)];
  568. volatile DWORD m_dwCount;
  569. char pack3[PACK_SIZE_OF(DWORD)];
  570. CSimpleRWLock m_cs;
  571. IndexSet m_indexes;
  572. };
  573. template <class T, class index_type, bool adjust_index> T* const CRingCache<T, index_type, adjust_index>::E_EMPTY = (T*)0x00;
  574. template <class T, class index_type, bool adjust_index> T* const CRingCache<T, index_type, adjust_index>::E_LOCKED = (T*)0x01;
  575. template <class T, class index_type, bool adjust_index> T* const CRingCache<T, index_type, adjust_index>::E_MAX_STATUS = (T*)0x0F;
  576. template <class T> class CRingPool
  577. {
  578. private:
  579. typedef T* TPTR;
  580. typedef volatile T* VTPTR;
  581. static TPTR const E_EMPTY;
  582. static TPTR const E_LOCKED;
  583. static TPTR const E_RELEASED;
  584. static TPTR const E_OCCUPIED;
  585. static TPTR const E_MAX_STATUS;
  586. private:
  587. VTPTR& INDEX_VAL(DWORD dwIndex) {return *(m_pv + dwIndex);}
  588. public:
  589. BOOL TryPut(TPTR pElement)
  590. {
  591. ASSERT(pElement != nullptr);
  592. BOOL isOK = FALSE;
  593. while(true)
  594. {
  595. BOOL bOccupy = FALSE;
  596. DWORD seqPut = m_seqPut;
  597. if(!HasPutSpace(seqPut))
  598. break;
  599. DWORD dwIndex = seqPut % m_dwSize;
  600. VTPTR& pValue = INDEX_VAL(dwIndex);
  601. if(pValue == E_RELEASED)
  602. {
  603. if(::InterlockedCompareExchangePointer((volatile PVOID*)&pValue, E_OCCUPIED, E_RELEASED) == E_RELEASED)
  604. bOccupy = TRUE;
  605. else
  606. continue;
  607. }
  608. if(pValue == E_EMPTY || bOccupy)
  609. {
  610. if(::InterlockedCompareExchange(&m_seqPut, seqPut + 1, seqPut) == seqPut)
  611. {
  612. pValue = pElement;
  613. isOK = TRUE;
  614. break;
  615. }
  616. }
  617. else if(pValue == E_LOCKED)
  618. break;
  619. }
  620. return isOK;
  621. }
  622. BOOL TryGet(TPTR* ppElement)
  623. {
  624. ASSERT(ppElement != nullptr);
  625. BOOL isOK = FALSE;
  626. while(true)
  627. {
  628. DWORD seqGet = m_seqGet;
  629. if(!HasGetSpace(seqGet))
  630. break;
  631. DWORD dwIndex = seqGet % m_dwSize;
  632. VTPTR& pValue = INDEX_VAL(dwIndex);
  633. if(pValue == E_LOCKED)
  634. break;
  635. else if(pValue != E_EMPTY && pValue != E_RELEASED && pValue != E_OCCUPIED)
  636. {
  637. if(::InterlockedCompareExchange(&m_seqGet, seqGet + 1, seqGet) == seqGet)
  638. {
  639. ASSERT(pValue > E_MAX_STATUS);
  640. *(ppElement) = (TPTR)pValue;
  641. pValue = E_EMPTY;
  642. isOK = TRUE;
  643. break;
  644. }
  645. }
  646. }
  647. return isOK;
  648. }
  649. BOOL TryLock(TPTR* ppElement, DWORD& dwIndex)
  650. {
  651. ASSERT(ppElement != nullptr);
  652. BOOL isOK = FALSE;
  653. while(true)
  654. {
  655. DWORD seqGet = m_seqGet;
  656. if(!HasGetSpace(seqGet))
  657. break;
  658. dwIndex = seqGet % m_dwSize;
  659. VTPTR& pValue = INDEX_VAL(dwIndex);
  660. if(pValue == E_LOCKED)
  661. break;
  662. else if(pValue != E_EMPTY && pValue != E_RELEASED && pValue != E_OCCUPIED)
  663. {
  664. if(::InterlockedCompareExchange(&m_seqGet, seqGet + 1, seqGet) == seqGet)
  665. {
  666. ASSERT(pValue > E_MAX_STATUS);
  667. *(ppElement) = (TPTR)pValue;
  668. pValue = E_LOCKED;
  669. isOK = TRUE;
  670. break;
  671. }
  672. }
  673. }
  674. return isOK;
  675. }
  676. void ReleaseLock(TPTR pElement, DWORD dwIndex)
  677. {
  678. ASSERT(dwIndex < m_dwSize);
  679. ASSERT(pElement == nullptr || pElement > E_MAX_STATUS);
  680. VTPTR& pValue = INDEX_VAL(dwIndex);
  681. VERIFY(pValue == E_LOCKED);
  682. if(pElement != nullptr)
  683. {
  684. for(DWORD i = 0; ; i++)
  685. {
  686. if(TryPut(pElement))
  687. break;
  688. DWORD dwPutIndex = m_seqPut % m_dwSize;
  689. if(dwIndex == dwPutIndex)
  690. {
  691. pValue = pElement;
  692. ::InterlockedIncrement(&m_seqPut);
  693. return;
  694. }
  695. ::YieldThread(i);
  696. }
  697. }
  698. pValue = E_RELEASED;
  699. }
  700. public:
  701. void Reset(DWORD dwSize = 0)
  702. {
  703. if(IsValid())
  704. Destroy();
  705. if(dwSize > 0)
  706. Create(dwSize);
  707. }
  708. DWORD Size() {return m_dwSize;}
  709. DWORD Elements() {return m_seqPut - m_seqGet;}
  710. BOOL IsFull() {return Elements() == Size();}
  711. BOOL IsEmpty() {return Elements() == 0;}
  712. BOOL IsValid() {return m_pv != nullptr;}
  713. private:
  714. BOOL HasPutSpace(DWORD seqPut)
  715. {
  716. return (seqPut - m_seqGet < m_dwSize);
  717. }
  718. BOOL HasGetSpace(DWORD seqGet)
  719. {
  720. return (m_seqPut - seqGet > 0);
  721. }
  722. void Create(DWORD dwSize)
  723. {
  724. ASSERT(!IsValid() && dwSize > 0);
  725. m_seqPut = 0;
  726. m_seqGet = 0;
  727. m_dwSize = dwSize;
  728. m_pv = (VTPTR*)malloc(m_dwSize * sizeof(TPTR));
  729. ::ZeroMemory(m_pv, m_dwSize * sizeof(TPTR));
  730. }
  731. void Destroy()
  732. {
  733. ASSERT(IsValid());
  734. free((void*)m_pv);
  735. m_pv = nullptr;
  736. m_dwSize = 0;
  737. m_seqPut = 0;
  738. m_seqGet = 0;
  739. }
  740. public:
  741. CRingPool(DWORD dwSize = 0)
  742. : m_pv(nullptr)
  743. , m_dwSize(0)
  744. , m_seqPut(0)
  745. , m_seqGet(0)
  746. {
  747. Reset(dwSize);
  748. }
  749. ~CRingPool()
  750. {
  751. Reset(0);
  752. }
  753. private:
  754. CRingPool(const CRingPool&);
  755. CRingPool operator = (const CRingPool&);
  756. private:
  757. DWORD m_dwSize;
  758. VTPTR* m_pv;
  759. char pack1[PACK_SIZE_OF(VTPTR*)];
  760. volatile DWORD m_seqPut;
  761. char pack2[PACK_SIZE_OF(DWORD)];
  762. volatile DWORD m_seqGet;
  763. char pack3[PACK_SIZE_OF(DWORD)];
  764. };
  765. template <class T> T* const CRingPool<T>::E_EMPTY = (T*)0x00;
  766. template <class T> T* const CRingPool<T>::E_LOCKED = (T*)0x01;
  767. template <class T> T* const CRingPool<T>::E_RELEASED = (T*)0x02;
  768. template <class T> T* const CRingPool<T>::E_OCCUPIED = (T*)0x03;
  769. template <class T> T* const CRingPool<T>::E_MAX_STATUS = (T*)0x0F;
  770. template <class T> class CCASQueue
  771. {
  772. private:
  773. struct Node;
  774. typedef Node* NPTR;
  775. typedef volatile Node* VNPTR;
  776. typedef volatile ULONG VLONG;
  777. struct Node
  778. {
  779. T* pValue;
  780. VNPTR pNext;
  781. Node(T* val, NPTR next = nullptr)
  782. : pValue(val), pNext(next)
  783. {
  784. }
  785. };
  786. public:
  787. void PushBack(T* pVal)
  788. {
  789. ASSERT(pVal != nullptr);
  790. VNPTR pTail = nullptr;
  791. NPTR pNode = new Node(pVal);
  792. while(true)
  793. {
  794. pTail = m_pTail;
  795. if(::InterlockedCompareExchangePointer((volatile PVOID*)&m_pTail, (PVOID)pNode, (PVOID)pTail) == pTail)
  796. {
  797. pTail->pNext = pNode;
  798. break;
  799. }
  800. }
  801. ::InterlockedIncrement(&m_lSize);
  802. }
  803. void UnsafePushBack(T* pVal)
  804. {
  805. ASSERT(pVal != nullptr);
  806. NPTR pNode = new Node(pVal);
  807. m_pTail->pNext = pNode;
  808. m_pTail = pNode;
  809. ::InterlockedIncrement(&m_lSize);
  810. }
  811. BOOL PopFront(T** ppVal)
  812. {
  813. ASSERT(ppVal != nullptr);
  814. if(IsEmpty())
  815. return FALSE;
  816. BOOL isOK = FALSE;
  817. NPTR pHead = nullptr;
  818. NPTR pNext = nullptr;
  819. T* pVal = nullptr;
  820. while(true)
  821. {
  822. while(::InterlockedCompareExchange(&m_lLock, 1, 0) != 0)
  823. ::YieldProcessor();
  824. pHead = (NPTR)m_pHead;
  825. pNext = (NPTR)pHead->pNext;
  826. if(pNext == nullptr)
  827. {
  828. m_lLock = 0;
  829. break;
  830. }
  831. *ppVal = pNext->pValue;
  832. m_pHead = pNext;
  833. m_lLock = 0;
  834. isOK = TRUE;
  835. ::InterlockedDecrement(&m_lSize);
  836. delete pHead;
  837. break;
  838. }
  839. return isOK;
  840. }
  841. BOOL UnsafePopFront(T** ppVal)
  842. {
  843. if(!UnsafePeekFront(ppVal))
  844. return FALSE;
  845. NPTR pHead = (NPTR)m_pHead;
  846. NPTR pNext = (NPTR)pHead->pNext;
  847. m_pHead = pNext;
  848. ::InterlockedDecrement(&m_lSize);
  849. delete pHead;
  850. return TRUE;
  851. }
  852. BOOL UnsafePeekFront(T** ppVal)
  853. {
  854. ASSERT(ppVal != nullptr);
  855. NPTR pNext = (NPTR)m_pHead->pNext;
  856. if(pNext == nullptr)
  857. return FALSE;
  858. *ppVal = pNext->pValue;
  859. return TRUE;
  860. }
  861. public:
  862. ULONG Size() {return m_lSize;}
  863. BOOL IsEmpty() {return m_lSize == 0;}
  864. public:
  865. CCASQueue() : m_lLock(0), m_lSize(0)
  866. {
  867. NPTR pHead = new Node(nullptr);
  868. m_pHead = m_pTail = pHead;
  869. }
  870. ~CCASQueue()
  871. {
  872. ASSERT(m_lLock == 0);
  873. ASSERT(m_lSize == 0);
  874. ASSERT(m_pHead != nullptr);
  875. ASSERT(m_pHead->pNext == nullptr);
  876. while(m_pHead != nullptr)
  877. {
  878. VNPTR pNode = m_pHead->pNext;
  879. delete m_pHead;
  880. m_pHead = pNode;
  881. }
  882. }
  883. private:
  884. VLONG m_lLock;
  885. VLONG m_lSize;
  886. VNPTR m_pHead;
  887. VNPTR m_pTail;
  888. };
  889. #if !defined (_WIN64)
  890. #pragma pack(pop)
  891. #endif