rwmutex.d 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081
  1. /**
  2. * The read/write mutex module provides a primitive for maintaining shared read
  3. * access and mutually exclusive write access.
  4. *
  5. * Copyright: Copyright Sean Kelly 2005 - 2009.
  6. * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
  7. * Authors: Sean Kelly
  8. * Source: $(DRUNTIMESRC core/sync/_rwmutex.d)
  9. */
  10. /* Copyright Sean Kelly 2005 - 2009.
  11. * Distributed under the Boost Software License, Version 1.0.
  12. * (See accompanying file LICENSE or copy at
  13. * http://www.boost.org/LICENSE_1_0.txt)
  14. */
  15. module core.sync.rwmutex;
  16. public import core.sync.exception;
  17. import core.sync.condition;
  18. import core.sync.mutex;
  19. import core.memory;
  20. version (Posix)
  21. {
  22. import core.sys.posix.pthread;
  23. }
  24. ////////////////////////////////////////////////////////////////////////////////
  25. // ReadWriteMutex
  26. //
  27. // Reader reader();
  28. // Writer writer();
  29. ////////////////////////////////////////////////////////////////////////////////
  30. /**
  31. * This class represents a mutex that allows any number of readers to enter,
  32. * but when a writer enters, all other readers and writers are blocked.
  33. *
  34. * Please note that this mutex is not recursive and is intended to guard access
  35. * to data only. Also, no deadlock checking is in place because doing so would
  36. * require dynamic memory allocation, which would reduce performance by an
  37. * unacceptable amount. As a result, any attempt to recursively acquire this
  38. * mutex may well deadlock the caller, particularly if a write lock is acquired
  39. * while holding a read lock, or vice-versa. In practice, this should not be
  40. * an issue however, because it is uncommon to call deeply into unknown code
  41. * while holding a lock that simply protects data.
  42. */
  43. class ReadWriteMutex
  44. {
  45. /**
  46. * Defines the policy used by this mutex. Currently, two policies are
  47. * defined.
  48. *
  49. * The first will queue writers until no readers hold the mutex, then
  50. * pass the writers through one at a time. If a reader acquires the mutex
  51. * while there are still writers queued, the reader will take precedence.
  52. *
  53. * The second will queue readers if there are any writers queued. Writers
  54. * are passed through one at a time, and once there are no writers present,
  55. * all queued readers will be alerted.
  56. *
  57. * Future policies may offer a more even balance between reader and writer
  58. * precedence.
  59. */
  60. enum Policy
  61. {
  62. PREFER_READERS, /// Readers get preference. This may starve writers.
  63. PREFER_WRITERS /// Writers get preference. This may starve readers.
  64. }
  65. ////////////////////////////////////////////////////////////////////////////
  66. // Initialization
  67. ////////////////////////////////////////////////////////////////////////////
  68. /**
  69. * Initializes a read/write mutex object with the supplied policy.
  70. *
  71. * Params:
  72. * policy = The policy to use.
  73. *
  74. * Throws:
  75. * SyncError on error.
  76. */
  77. this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
  78. {
  79. m_commonMutex = new Mutex;
  80. if ( !m_commonMutex )
  81. throw new SyncError( "Unable to initialize mutex" );
  82. m_readerQueue = new Condition( m_commonMutex );
  83. if ( !m_readerQueue )
  84. throw new SyncError( "Unable to initialize mutex" );
  85. m_writerQueue = new Condition( m_commonMutex );
  86. if ( !m_writerQueue )
  87. throw new SyncError( "Unable to initialize mutex" );
  88. m_policy = policy;
  89. m_reader = new Reader;
  90. m_writer = new Writer;
  91. }
  92. /// ditto
  93. shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
  94. {
  95. m_commonMutex = new shared Mutex;
  96. if ( !m_commonMutex )
  97. throw new SyncError( "Unable to initialize mutex" );
  98. m_readerQueue = new shared Condition( m_commonMutex );
  99. if ( !m_readerQueue )
  100. throw new SyncError( "Unable to initialize mutex" );
  101. m_writerQueue = new shared Condition( m_commonMutex );
  102. if ( !m_writerQueue )
  103. throw new SyncError( "Unable to initialize mutex" );
  104. m_policy = policy;
  105. m_reader = new shared Reader;
  106. m_writer = new shared Writer;
  107. }
  108. ////////////////////////////////////////////////////////////////////////////
  109. // General Properties
  110. ////////////////////////////////////////////////////////////////////////////
  111. /**
  112. * Gets the policy used by this mutex.
  113. *
  114. * Returns:
  115. * The policy used by this mutex.
  116. */
  117. @property Policy policy() @safe nothrow
  118. {
  119. return m_policy;
  120. }
  121. ///ditto
  122. @property Policy policy() shared @safe nothrow
  123. {
  124. return m_policy;
  125. }
  126. ////////////////////////////////////////////////////////////////////////////
  127. // Reader/Writer Handles
  128. ////////////////////////////////////////////////////////////////////////////
  129. /**
  130. * Gets an object representing the reader lock for the associated mutex.
  131. *
  132. * Returns:
  133. * A reader sub-mutex.
  134. */
  135. @property Reader reader() @safe nothrow
  136. {
  137. return m_reader;
  138. }
  139. ///ditto
  140. @property shared(Reader) reader() shared @safe nothrow
  141. {
  142. return m_reader;
  143. }
  144. /**
  145. * Gets an object representing the writer lock for the associated mutex.
  146. *
  147. * Returns:
  148. * A writer sub-mutex.
  149. */
  150. @property Writer writer() @safe nothrow
  151. {
  152. return m_writer;
  153. }
  154. ///ditto
  155. @property shared(Writer) writer() shared @safe nothrow
  156. {
  157. return m_writer;
  158. }
  159. ////////////////////////////////////////////////////////////////////////////
  160. // Reader
  161. ////////////////////////////////////////////////////////////////////////////
  162. /**
  163. * This class can be considered a mutex in its own right, and is used to
  164. * negotiate a read lock for the enclosing mutex.
  165. */
  166. class Reader :
  167. Object.Monitor
  168. {
  169. /**
  170. * Initializes a read/write mutex reader proxy object.
  171. */
  172. this(this Q)() @trusted nothrow
  173. if (is(Q == Reader) || is(Q == shared Reader))
  174. {
  175. m_proxy.link = this;
  176. this.__monitor = cast(void*) &m_proxy;
  177. }
  178. /**
  179. * Acquires a read lock on the enclosing mutex.
  180. */
  181. @trusted void lock()
  182. {
  183. synchronized( m_commonMutex )
  184. {
  185. ++m_numQueuedReaders;
  186. scope(exit) --m_numQueuedReaders;
  187. while ( shouldQueueReader )
  188. m_readerQueue.wait();
  189. ++m_numActiveReaders;
  190. }
  191. }
  192. /// ditto
  193. @trusted void lock() shared
  194. {
  195. synchronized( m_commonMutex )
  196. {
  197. ++(cast()m_numQueuedReaders);
  198. scope(exit) --(cast()m_numQueuedReaders);
  199. while ( shouldQueueReader )
  200. m_readerQueue.wait();
  201. ++(cast()m_numActiveReaders);
  202. }
  203. }
  204. /**
  205. * Releases a read lock on the enclosing mutex.
  206. */
  207. @trusted void unlock()
  208. {
  209. synchronized( m_commonMutex )
  210. {
  211. if ( --m_numActiveReaders < 1 )
  212. {
  213. if ( m_numQueuedWriters > 0 )
  214. m_writerQueue.notify();
  215. }
  216. }
  217. }
  218. /// ditto
  219. @trusted void unlock() shared
  220. {
  221. synchronized( m_commonMutex )
  222. {
  223. if ( --(cast()m_numActiveReaders) < 1 )
  224. {
  225. if ( m_numQueuedWriters > 0 )
  226. m_writerQueue.notify();
  227. }
  228. }
  229. }
  230. /**
  231. * Attempts to acquire a read lock on the enclosing mutex. If one can
  232. * be obtained without blocking, the lock is acquired and true is
  233. * returned. If not, the lock is not acquired and false is returned.
  234. *
  235. * Returns:
  236. * true if the lock was acquired and false if not.
  237. */
  238. @trusted bool tryLock()
  239. {
  240. synchronized( m_commonMutex )
  241. {
  242. if ( shouldQueueReader )
  243. return false;
  244. ++m_numActiveReaders;
  245. return true;
  246. }
  247. }
  248. /// ditto
  249. @trusted bool tryLock() shared
  250. {
  251. synchronized( m_commonMutex )
  252. {
  253. if ( shouldQueueReader )
  254. return false;
  255. ++(cast()m_numActiveReaders);
  256. return true;
  257. }
  258. }
  259. /**
  260. * Attempts to acquire a read lock on the enclosing mutex. If one can
  261. * be obtained without blocking, the lock is acquired and true is
  262. * returned. If not, the function blocks until either the lock can be
  263. * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
  264. * true if the lock was acquired and false if the function timed out.
  265. *
  266. * Params:
  267. * timeout = maximum amount of time to wait for the lock
  268. * Returns:
  269. * true if the lock was acquired and false if not.
  270. */
  271. @trusted bool tryLock(Duration timeout)
  272. {
  273. synchronized( m_commonMutex )
  274. {
  275. if (!shouldQueueReader)
  276. {
  277. ++m_numActiveReaders;
  278. return true;
  279. }
  280. enum zero = Duration.zero();
  281. if (timeout <= zero)
  282. return false;
  283. ++m_numQueuedReaders;
  284. scope(exit) --m_numQueuedReaders;
  285. enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
  286. const initialTime = MonoTime.currTime;
  287. m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
  288. while (shouldQueueReader)
  289. {
  290. const timeElapsed = MonoTime.currTime - initialTime;
  291. if (timeElapsed >= timeout)
  292. return false;
  293. auto nextWait = timeout - timeElapsed;
  294. m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
  295. }
  296. ++m_numActiveReaders;
  297. return true;
  298. }
  299. }
  300. /// ditto
  301. @trusted bool tryLock(Duration timeout) shared
  302. {
  303. const initialTime = MonoTime.currTime;
  304. synchronized( m_commonMutex )
  305. {
  306. ++(cast()m_numQueuedReaders);
  307. scope(exit) --(cast()m_numQueuedReaders);
  308. while (shouldQueueReader)
  309. {
  310. const timeElapsed = MonoTime.currTime - initialTime;
  311. if (timeElapsed >= timeout)
  312. return false;
  313. auto nextWait = timeout - timeElapsed;
  314. // Avoid problems calling wait(Duration) with huge arguments.
  315. enum maxWaitPerCall = dur!"hours"(24 * 365);
  316. m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
  317. }
  318. ++(cast()m_numActiveReaders);
  319. return true;
  320. }
  321. }
  322. private:
  323. @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
  324. if (is(Q == Reader) || is(Q == shared Reader))
  325. {
  326. if ( m_numActiveWriters > 0 )
  327. return true;
  328. switch ( m_policy )
  329. {
  330. case Policy.PREFER_WRITERS:
  331. return m_numQueuedWriters > 0;
  332. case Policy.PREFER_READERS:
  333. default:
  334. break;
  335. }
  336. return false;
  337. }
  338. struct MonitorProxy
  339. {
  340. Object.Monitor link;
  341. }
  342. MonitorProxy m_proxy;
  343. }
  344. ////////////////////////////////////////////////////////////////////////////
  345. // Writer
  346. ////////////////////////////////////////////////////////////////////////////
  347. /**
  348. * This class can be considered a mutex in its own right, and is used to
  349. * negotiate a write lock for the enclosing mutex.
  350. */
  351. class Writer :
  352. Object.Monitor
  353. {
  354. /**
  355. * Initializes a read/write mutex writer proxy object.
  356. */
  357. this(this Q)() @trusted nothrow
  358. if (is(Q == Writer) || is(Q == shared Writer))
  359. {
  360. m_proxy.link = this;
  361. this.__monitor = cast(void*) &m_proxy;
  362. }
  363. /**
  364. * Acquires a write lock on the enclosing mutex.
  365. */
  366. @trusted void lock()
  367. {
  368. synchronized( m_commonMutex )
  369. {
  370. ++m_numQueuedWriters;
  371. scope(exit) --m_numQueuedWriters;
  372. while ( shouldQueueWriter )
  373. m_writerQueue.wait();
  374. ++m_numActiveWriters;
  375. }
  376. }
  377. /// ditto
  378. @trusted void lock() shared
  379. {
  380. synchronized( m_commonMutex )
  381. {
  382. ++(cast()m_numQueuedWriters);
  383. scope(exit) --(cast()m_numQueuedWriters);
  384. while ( shouldQueueWriter )
  385. m_writerQueue.wait();
  386. ++(cast()m_numActiveWriters);
  387. }
  388. }
  389. /**
  390. * Releases a write lock on the enclosing mutex.
  391. */
  392. @trusted void unlock()
  393. {
  394. synchronized( m_commonMutex )
  395. {
  396. if ( --m_numActiveWriters < 1 )
  397. {
  398. switch ( m_policy )
  399. {
  400. default:
  401. case Policy.PREFER_READERS:
  402. if ( m_numQueuedReaders > 0 )
  403. m_readerQueue.notifyAll();
  404. else if ( m_numQueuedWriters > 0 )
  405. m_writerQueue.notify();
  406. break;
  407. case Policy.PREFER_WRITERS:
  408. if ( m_numQueuedWriters > 0 )
  409. m_writerQueue.notify();
  410. else if ( m_numQueuedReaders > 0 )
  411. m_readerQueue.notifyAll();
  412. }
  413. }
  414. }
  415. }
  416. /// ditto
  417. @trusted void unlock() shared
  418. {
  419. synchronized( m_commonMutex )
  420. {
  421. if ( --(cast()m_numActiveWriters) < 1 )
  422. {
  423. switch ( m_policy )
  424. {
  425. default:
  426. case Policy.PREFER_READERS:
  427. if ( m_numQueuedReaders > 0 )
  428. m_readerQueue.notifyAll();
  429. else if ( m_numQueuedWriters > 0 )
  430. m_writerQueue.notify();
  431. break;
  432. case Policy.PREFER_WRITERS:
  433. if ( m_numQueuedWriters > 0 )
  434. m_writerQueue.notify();
  435. else if ( m_numQueuedReaders > 0 )
  436. m_readerQueue.notifyAll();
  437. }
  438. }
  439. }
  440. }
  441. /**
  442. * Attempts to acquire a write lock on the enclosing mutex. If one can
  443. * be obtained without blocking, the lock is acquired and true is
  444. * returned. If not, the lock is not acquired and false is returned.
  445. *
  446. * Returns:
  447. * true if the lock was acquired and false if not.
  448. */
  449. @trusted bool tryLock()
  450. {
  451. synchronized( m_commonMutex )
  452. {
  453. if ( shouldQueueWriter )
  454. return false;
  455. ++m_numActiveWriters;
  456. return true;
  457. }
  458. }
  459. /// ditto
  460. @trusted bool tryLock() shared
  461. {
  462. synchronized( m_commonMutex )
  463. {
  464. if ( shouldQueueWriter )
  465. return false;
  466. ++(cast()m_numActiveWriters);
  467. return true;
  468. }
  469. }
  470. /**
  471. * Attempts to acquire a write lock on the enclosing mutex. If one can
  472. * be obtained without blocking, the lock is acquired and true is
  473. * returned. If not, the function blocks until either the lock can be
  474. * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
  475. * true if the lock was acquired and false if the function timed out.
  476. *
  477. * Params:
  478. * timeout = maximum amount of time to wait for the lock
  479. * Returns:
  480. * true if the lock was acquired and false if not.
  481. */
  482. @trusted bool tryLock(Duration timeout)
  483. {
  484. synchronized( m_commonMutex )
  485. {
  486. if (!shouldQueueWriter)
  487. {
  488. ++m_numActiveWriters;
  489. return true;
  490. }
  491. enum zero = Duration.zero();
  492. if (timeout <= zero)
  493. return false;
  494. ++m_numQueuedWriters;
  495. scope(exit) --m_numQueuedWriters;
  496. enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
  497. const initialTime = MonoTime.currTime;
  498. m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
  499. while (shouldQueueWriter)
  500. {
  501. const timeElapsed = MonoTime.currTime - initialTime;
  502. if (timeElapsed >= timeout)
  503. return false;
  504. auto nextWait = timeout - timeElapsed;
  505. m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
  506. }
  507. ++m_numActiveWriters;
  508. return true;
  509. }
  510. }
  511. /// ditto
  512. @trusted bool tryLock(Duration timeout) shared
  513. {
  514. const initialTime = MonoTime.currTime;
  515. synchronized( m_commonMutex )
  516. {
  517. ++(cast()m_numQueuedWriters);
  518. scope(exit) --(cast()m_numQueuedWriters);
  519. while (shouldQueueWriter)
  520. {
  521. const timeElapsed = MonoTime.currTime - initialTime;
  522. if (timeElapsed >= timeout)
  523. return false;
  524. auto nextWait = timeout - timeElapsed;
  525. // Avoid problems calling wait(Duration) with huge arguments.
  526. enum maxWaitPerCall = dur!"hours"(24 * 365);
  527. m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
  528. }
  529. ++(cast()m_numActiveWriters);
  530. return true;
  531. }
  532. }
  533. private:
  534. @property bool shouldQueueWriter(this Q)()
  535. if (is(Q == Writer) || is(Q == shared Writer))
  536. {
  537. if ( m_numActiveWriters > 0 ||
  538. m_numActiveReaders > 0 )
  539. return true;
  540. switch ( m_policy )
  541. {
  542. case Policy.PREFER_READERS:
  543. return m_numQueuedReaders > 0;
  544. case Policy.PREFER_WRITERS:
  545. default:
  546. break;
  547. }
  548. return false;
  549. }
  550. struct MonitorProxy
  551. {
  552. Object.Monitor link;
  553. }
  554. MonitorProxy m_proxy;
  555. }
  556. private:
  557. Policy m_policy;
  558. Reader m_reader;
  559. Writer m_writer;
  560. Mutex m_commonMutex;
  561. Condition m_readerQueue;
  562. Condition m_writerQueue;
  563. int m_numQueuedReaders;
  564. int m_numActiveReaders;
  565. int m_numQueuedWriters;
  566. int m_numActiveWriters;
  567. }
  568. ////////////////////////////////////////////////////////////////////////////////
  569. // Unit Tests
  570. ////////////////////////////////////////////////////////////////////////////////
  571. unittest
  572. {
  573. import core.atomic, core.thread, core.sync.semaphore;
  574. static void runTest(ReadWriteMutex.Policy policy)
  575. {
  576. scope mutex = new ReadWriteMutex(policy);
  577. scope rdSemA = new Semaphore, rdSemB = new Semaphore,
  578. wrSemA = new Semaphore, wrSemB = new Semaphore;
  579. shared size_t numReaders, numWriters;
  580. void readerFn()
  581. {
  582. synchronized (mutex.reader)
  583. {
  584. atomicOp!"+="(numReaders, 1);
  585. rdSemA.notify();
  586. rdSemB.wait();
  587. atomicOp!"-="(numReaders, 1);
  588. }
  589. }
  590. void writerFn()
  591. {
  592. synchronized (mutex.writer)
  593. {
  594. atomicOp!"+="(numWriters, 1);
  595. wrSemA.notify();
  596. wrSemB.wait();
  597. atomicOp!"-="(numWriters, 1);
  598. }
  599. }
  600. void waitQueued(size_t queuedReaders, size_t queuedWriters)
  601. {
  602. for (;;)
  603. {
  604. synchronized (mutex.m_commonMutex)
  605. {
  606. if (mutex.m_numQueuedReaders == queuedReaders &&
  607. mutex.m_numQueuedWriters == queuedWriters)
  608. break;
  609. }
  610. Thread.yield();
  611. }
  612. }
  613. scope group = new ThreadGroup;
  614. // 2 simultaneous readers
  615. group.create(&readerFn); group.create(&readerFn);
  616. rdSemA.wait(); rdSemA.wait();
  617. assert(numReaders == 2);
  618. rdSemB.notify(); rdSemB.notify();
  619. group.joinAll();
  620. assert(numReaders == 0);
  621. foreach (t; group) group.remove(t);
  622. // 1 writer at a time
  623. group.create(&writerFn); group.create(&writerFn);
  624. wrSemA.wait();
  625. assert(!wrSemA.tryWait());
  626. assert(numWriters == 1);
  627. wrSemB.notify();
  628. wrSemA.wait();
  629. assert(numWriters == 1);
  630. wrSemB.notify();
  631. group.joinAll();
  632. assert(numWriters == 0);
  633. foreach (t; group) group.remove(t);
  634. // reader and writer are mutually exclusive
  635. group.create(&readerFn);
  636. rdSemA.wait();
  637. group.create(&writerFn);
  638. waitQueued(0, 1);
  639. assert(!wrSemA.tryWait());
  640. assert(numReaders == 1 && numWriters == 0);
  641. rdSemB.notify();
  642. wrSemA.wait();
  643. assert(numReaders == 0 && numWriters == 1);
  644. wrSemB.notify();
  645. group.joinAll();
  646. assert(numReaders == 0 && numWriters == 0);
  647. foreach (t; group) group.remove(t);
  648. // writer and reader are mutually exclusive
  649. group.create(&writerFn);
  650. wrSemA.wait();
  651. group.create(&readerFn);
  652. waitQueued(1, 0);
  653. assert(!rdSemA.tryWait());
  654. assert(numReaders == 0 && numWriters == 1);
  655. wrSemB.notify();
  656. rdSemA.wait();
  657. assert(numReaders == 1 && numWriters == 0);
  658. rdSemB.notify();
  659. group.joinAll();
  660. assert(numReaders == 0 && numWriters == 0);
  661. foreach (t; group) group.remove(t);
  662. // policy determines whether queued reader or writers progress first
  663. group.create(&writerFn);
  664. wrSemA.wait();
  665. group.create(&readerFn);
  666. group.create(&writerFn);
  667. waitQueued(1, 1);
  668. assert(numReaders == 0 && numWriters == 1);
  669. wrSemB.notify();
  670. if (policy == ReadWriteMutex.Policy.PREFER_READERS)
  671. {
  672. rdSemA.wait();
  673. assert(numReaders == 1 && numWriters == 0);
  674. rdSemB.notify();
  675. wrSemA.wait();
  676. assert(numReaders == 0 && numWriters == 1);
  677. wrSemB.notify();
  678. }
  679. else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
  680. {
  681. wrSemA.wait();
  682. assert(numReaders == 0 && numWriters == 1);
  683. wrSemB.notify();
  684. rdSemA.wait();
  685. assert(numReaders == 1 && numWriters == 0);
  686. rdSemB.notify();
  687. }
  688. group.joinAll();
  689. assert(numReaders == 0 && numWriters == 0);
  690. foreach (t; group) group.remove(t);
  691. }
  692. runTest(ReadWriteMutex.Policy.PREFER_READERS);
  693. runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
  694. }
  695. unittest
  696. {
  697. import core.atomic, core.thread;
  698. __gshared ReadWriteMutex rwmutex;
  699. shared static bool threadTriedOnceToGetLock;
  700. shared static bool threadFinallyGotLock;
  701. rwmutex = new ReadWriteMutex();
  702. atomicFence;
  703. const maxTimeAllowedForTest = dur!"seconds"(20);
  704. // Test ReadWriteMutex.Reader.tryLock(Duration).
  705. {
  706. static void testReaderTryLock()
  707. {
  708. assert(!rwmutex.reader.tryLock(Duration.min));
  709. threadTriedOnceToGetLock.atomicStore(true);
  710. assert(rwmutex.reader.tryLock(Duration.max));
  711. threadFinallyGotLock.atomicStore(true);
  712. rwmutex.reader.unlock;
  713. }
  714. assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
  715. auto otherThread = new Thread(&testReaderTryLock).start;
  716. const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
  717. Thread.yield;
  718. // We started otherThread with the writer lock held so otherThread's
  719. // first rwlock.reader.tryLock with timeout Duration.min should fail.
  720. while (!threadTriedOnceToGetLock.atomicLoad)
  721. {
  722. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  723. Thread.yield;
  724. }
  725. rwmutex.writer.unlock;
  726. // Soon after we release the writer lock otherThread's second
  727. // rwlock.reader.tryLock with timeout Duration.max should succeed.
  728. while (!threadFinallyGotLock.atomicLoad)
  729. {
  730. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  731. Thread.yield;
  732. }
  733. otherThread.join;
  734. }
  735. threadTriedOnceToGetLock.atomicStore(false); // Reset.
  736. threadFinallyGotLock.atomicStore(false); // Reset.
  737. // Test ReadWriteMutex.Writer.tryLock(Duration).
  738. {
  739. static void testWriterTryLock()
  740. {
  741. assert(!rwmutex.writer.tryLock(Duration.min));
  742. threadTriedOnceToGetLock.atomicStore(true);
  743. assert(rwmutex.writer.tryLock(Duration.max));
  744. threadFinallyGotLock.atomicStore(true);
  745. rwmutex.writer.unlock;
  746. }
  747. assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
  748. auto otherThread = new Thread(&testWriterTryLock).start;
  749. const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
  750. Thread.yield;
  751. // We started otherThread with the reader lock held so otherThread's
  752. // first rwlock.writer.tryLock with timeout Duration.min should fail.
  753. while (!threadTriedOnceToGetLock.atomicLoad)
  754. {
  755. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  756. Thread.yield;
  757. }
  758. rwmutex.reader.unlock;
  759. // Soon after we release the reader lock otherThread's second
  760. // rwlock.writer.tryLock with timeout Duration.max should succeed.
  761. while (!threadFinallyGotLock.atomicLoad)
  762. {
  763. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  764. Thread.yield;
  765. }
  766. otherThread.join;
  767. }
  768. }
  769. unittest
  770. {
  771. import core.atomic, core.thread, core.sync.semaphore;
  772. static void runTest(ReadWriteMutex.Policy policy)
  773. {
  774. shared scope mutex = new shared ReadWriteMutex(policy);
  775. scope rdSemA = new Semaphore, rdSemB = new Semaphore,
  776. wrSemA = new Semaphore, wrSemB = new Semaphore;
  777. shared size_t numReaders, numWriters;
  778. void readerFn()
  779. {
  780. synchronized (mutex.reader)
  781. {
  782. atomicOp!"+="(numReaders, 1);
  783. rdSemA.notify();
  784. rdSemB.wait();
  785. atomicOp!"-="(numReaders, 1);
  786. }
  787. }
  788. void writerFn()
  789. {
  790. synchronized (mutex.writer)
  791. {
  792. atomicOp!"+="(numWriters, 1);
  793. wrSemA.notify();
  794. wrSemB.wait();
  795. atomicOp!"-="(numWriters, 1);
  796. }
  797. }
  798. void waitQueued(size_t queuedReaders, size_t queuedWriters)
  799. {
  800. for (;;)
  801. {
  802. synchronized (mutex.m_commonMutex)
  803. {
  804. if (mutex.m_numQueuedReaders == queuedReaders &&
  805. mutex.m_numQueuedWriters == queuedWriters)
  806. break;
  807. }
  808. Thread.yield();
  809. }
  810. }
  811. scope group = new ThreadGroup;
  812. // 2 simultaneous readers
  813. group.create(&readerFn); group.create(&readerFn);
  814. rdSemA.wait(); rdSemA.wait();
  815. assert(numReaders == 2);
  816. rdSemB.notify(); rdSemB.notify();
  817. group.joinAll();
  818. assert(numReaders == 0);
  819. foreach (t; group) group.remove(t);
  820. // 1 writer at a time
  821. group.create(&writerFn); group.create(&writerFn);
  822. wrSemA.wait();
  823. assert(!wrSemA.tryWait());
  824. assert(numWriters == 1);
  825. wrSemB.notify();
  826. wrSemA.wait();
  827. assert(numWriters == 1);
  828. wrSemB.notify();
  829. group.joinAll();
  830. assert(numWriters == 0);
  831. foreach (t; group) group.remove(t);
  832. // reader and writer are mutually exclusive
  833. group.create(&readerFn);
  834. rdSemA.wait();
  835. group.create(&writerFn);
  836. waitQueued(0, 1);
  837. assert(!wrSemA.tryWait());
  838. assert(numReaders == 1 && numWriters == 0);
  839. rdSemB.notify();
  840. wrSemA.wait();
  841. assert(numReaders == 0 && numWriters == 1);
  842. wrSemB.notify();
  843. group.joinAll();
  844. assert(numReaders == 0 && numWriters == 0);
  845. foreach (t; group) group.remove(t);
  846. // writer and reader are mutually exclusive
  847. group.create(&writerFn);
  848. wrSemA.wait();
  849. group.create(&readerFn);
  850. waitQueued(1, 0);
  851. assert(!rdSemA.tryWait());
  852. assert(numReaders == 0 && numWriters == 1);
  853. wrSemB.notify();
  854. rdSemA.wait();
  855. assert(numReaders == 1 && numWriters == 0);
  856. rdSemB.notify();
  857. group.joinAll();
  858. assert(numReaders == 0 && numWriters == 0);
  859. foreach (t; group) group.remove(t);
  860. // policy determines whether queued reader or writers progress first
  861. group.create(&writerFn);
  862. wrSemA.wait();
  863. group.create(&readerFn);
  864. group.create(&writerFn);
  865. waitQueued(1, 1);
  866. assert(numReaders == 0 && numWriters == 1);
  867. wrSemB.notify();
  868. if (policy == ReadWriteMutex.Policy.PREFER_READERS)
  869. {
  870. rdSemA.wait();
  871. assert(numReaders == 1 && numWriters == 0);
  872. rdSemB.notify();
  873. wrSemA.wait();
  874. assert(numReaders == 0 && numWriters == 1);
  875. wrSemB.notify();
  876. }
  877. else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
  878. {
  879. wrSemA.wait();
  880. assert(numReaders == 0 && numWriters == 1);
  881. wrSemB.notify();
  882. rdSemA.wait();
  883. assert(numReaders == 1 && numWriters == 0);
  884. rdSemB.notify();
  885. }
  886. group.joinAll();
  887. assert(numReaders == 0 && numWriters == 0);
  888. foreach (t; group) group.remove(t);
  889. }
  890. runTest(ReadWriteMutex.Policy.PREFER_READERS);
  891. runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
  892. }
  893. unittest
  894. {
  895. import core.atomic, core.thread;
  896. shared static ReadWriteMutex rwmutex;
  897. shared static bool threadTriedOnceToGetLock;
  898. shared static bool threadFinallyGotLock;
  899. rwmutex = new shared ReadWriteMutex();
  900. atomicFence;
  901. const maxTimeAllowedForTest = dur!"seconds"(20);
  902. // Test ReadWriteMutex.Reader.tryLock(Duration).
  903. {
  904. static void testReaderTryLock()
  905. {
  906. assert(!rwmutex.reader.tryLock(Duration.min));
  907. threadTriedOnceToGetLock.atomicStore(true);
  908. assert(rwmutex.reader.tryLock(Duration.max));
  909. threadFinallyGotLock.atomicStore(true);
  910. rwmutex.reader.unlock;
  911. }
  912. assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
  913. auto otherThread = new Thread(&testReaderTryLock).start;
  914. const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
  915. Thread.yield;
  916. // We started otherThread with the writer lock held so otherThread's
  917. // first rwlock.reader.tryLock with timeout Duration.min should fail.
  918. while (!threadTriedOnceToGetLock.atomicLoad)
  919. {
  920. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  921. Thread.yield;
  922. }
  923. rwmutex.writer.unlock;
  924. // Soon after we release the writer lock otherThread's second
  925. // rwlock.reader.tryLock with timeout Duration.max should succeed.
  926. while (!threadFinallyGotLock.atomicLoad)
  927. {
  928. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  929. Thread.yield;
  930. }
  931. otherThread.join;
  932. }
  933. threadTriedOnceToGetLock.atomicStore(false); // Reset.
  934. threadFinallyGotLock.atomicStore(false); // Reset.
  935. // Test ReadWriteMutex.Writer.tryLock(Duration).
  936. {
  937. static void testWriterTryLock()
  938. {
  939. assert(!rwmutex.writer.tryLock(Duration.min));
  940. threadTriedOnceToGetLock.atomicStore(true);
  941. assert(rwmutex.writer.tryLock(Duration.max));
  942. threadFinallyGotLock.atomicStore(true);
  943. rwmutex.writer.unlock;
  944. }
  945. assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
  946. auto otherThread = new Thread(&testWriterTryLock).start;
  947. const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
  948. Thread.yield;
  949. // We started otherThread with the reader lock held so otherThread's
  950. // first rwlock.writer.tryLock with timeout Duration.min should fail.
  951. while (!threadTriedOnceToGetLock.atomicLoad)
  952. {
  953. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  954. Thread.yield;
  955. }
  956. rwmutex.reader.unlock;
  957. // Soon after we release the reader lock otherThread's second
  958. // rwlock.writer.tryLock with timeout Duration.max should succeed.
  959. while (!threadFinallyGotLock.atomicLoad)
  960. {
  961. assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
  962. Thread.yield;
  963. }
  964. otherThread.join;
  965. }
  966. }