1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081 |
- /**
- * The read/write mutex module provides a primitive for maintaining shared read
- * access and mutually exclusive write access.
- *
- * Copyright: Copyright Sean Kelly 2005 - 2009.
- * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
- * Authors: Sean Kelly
- * Source: $(DRUNTIMESRC core/sync/_rwmutex.d)
- */
- /* Copyright Sean Kelly 2005 - 2009.
- * Distributed under the Boost Software License, Version 1.0.
- * (See accompanying file LICENSE or copy at
- * http://www.boost.org/LICENSE_1_0.txt)
- */
- module core.sync.rwmutex;
- public import core.sync.exception;
- import core.sync.condition;
- import core.sync.mutex;
- import core.memory;
- version (Posix)
- {
- import core.sys.posix.pthread;
- }
- ////////////////////////////////////////////////////////////////////////////////
- // ReadWriteMutex
- //
- // Reader reader();
- // Writer writer();
- ////////////////////////////////////////////////////////////////////////////////
- /**
- * This class represents a mutex that allows any number of readers to enter,
- * but when a writer enters, all other readers and writers are blocked.
- *
- * Please note that this mutex is not recursive and is intended to guard access
- * to data only. Also, no deadlock checking is in place because doing so would
- * require dynamic memory allocation, which would reduce performance by an
- * unacceptable amount. As a result, any attempt to recursively acquire this
- * mutex may well deadlock the caller, particularly if a write lock is acquired
- * while holding a read lock, or vice-versa. In practice, this should not be
- * an issue however, because it is uncommon to call deeply into unknown code
- * while holding a lock that simply protects data.
- */
- class ReadWriteMutex
- {
- /**
- * Defines the policy used by this mutex. Currently, two policies are
- * defined.
- *
- * The first will queue writers until no readers hold the mutex, then
- * pass the writers through one at a time. If a reader acquires the mutex
- * while there are still writers queued, the reader will take precedence.
- *
- * The second will queue readers if there are any writers queued. Writers
- * are passed through one at a time, and once there are no writers present,
- * all queued readers will be alerted.
- *
- * Future policies may offer a more even balance between reader and writer
- * precedence.
- */
- enum Policy
- {
- PREFER_READERS, /// Readers get preference. This may starve writers.
- PREFER_WRITERS /// Writers get preference. This may starve readers.
- }
- ////////////////////////////////////////////////////////////////////////////
- // Initialization
- ////////////////////////////////////////////////////////////////////////////
- /**
- * Initializes a read/write mutex object with the supplied policy.
- *
- * Params:
- * policy = The policy to use.
- *
- * Throws:
- * SyncError on error.
- */
- this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
- {
- m_commonMutex = new Mutex;
- if ( !m_commonMutex )
- throw new SyncError( "Unable to initialize mutex" );
- m_readerQueue = new Condition( m_commonMutex );
- if ( !m_readerQueue )
- throw new SyncError( "Unable to initialize mutex" );
- m_writerQueue = new Condition( m_commonMutex );
- if ( !m_writerQueue )
- throw new SyncError( "Unable to initialize mutex" );
- m_policy = policy;
- m_reader = new Reader;
- m_writer = new Writer;
- }
- /// ditto
- shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
- {
- m_commonMutex = new shared Mutex;
- if ( !m_commonMutex )
- throw new SyncError( "Unable to initialize mutex" );
- m_readerQueue = new shared Condition( m_commonMutex );
- if ( !m_readerQueue )
- throw new SyncError( "Unable to initialize mutex" );
- m_writerQueue = new shared Condition( m_commonMutex );
- if ( !m_writerQueue )
- throw new SyncError( "Unable to initialize mutex" );
- m_policy = policy;
- m_reader = new shared Reader;
- m_writer = new shared Writer;
- }
- ////////////////////////////////////////////////////////////////////////////
- // General Properties
- ////////////////////////////////////////////////////////////////////////////
- /**
- * Gets the policy used by this mutex.
- *
- * Returns:
- * The policy used by this mutex.
- */
- @property Policy policy() @safe nothrow
- {
- return m_policy;
- }
- ///ditto
- @property Policy policy() shared @safe nothrow
- {
- return m_policy;
- }
- ////////////////////////////////////////////////////////////////////////////
- // Reader/Writer Handles
- ////////////////////////////////////////////////////////////////////////////
- /**
- * Gets an object representing the reader lock for the associated mutex.
- *
- * Returns:
- * A reader sub-mutex.
- */
- @property Reader reader() @safe nothrow
- {
- return m_reader;
- }
- ///ditto
- @property shared(Reader) reader() shared @safe nothrow
- {
- return m_reader;
- }
- /**
- * Gets an object representing the writer lock for the associated mutex.
- *
- * Returns:
- * A writer sub-mutex.
- */
- @property Writer writer() @safe nothrow
- {
- return m_writer;
- }
- ///ditto
- @property shared(Writer) writer() shared @safe nothrow
- {
- return m_writer;
- }
- ////////////////////////////////////////////////////////////////////////////
- // Reader
- ////////////////////////////////////////////////////////////////////////////
- /**
- * This class can be considered a mutex in its own right, and is used to
- * negotiate a read lock for the enclosing mutex.
- */
- class Reader :
- Object.Monitor
- {
- /**
- * Initializes a read/write mutex reader proxy object.
- */
- this(this Q)() @trusted nothrow
- if (is(Q == Reader) || is(Q == shared Reader))
- {
- m_proxy.link = this;
- this.__monitor = cast(void*) &m_proxy;
- }
- /**
- * Acquires a read lock on the enclosing mutex.
- */
- @trusted void lock()
- {
- synchronized( m_commonMutex )
- {
- ++m_numQueuedReaders;
- scope(exit) --m_numQueuedReaders;
- while ( shouldQueueReader )
- m_readerQueue.wait();
- ++m_numActiveReaders;
- }
- }
- /// ditto
- @trusted void lock() shared
- {
- synchronized( m_commonMutex )
- {
- ++(cast()m_numQueuedReaders);
- scope(exit) --(cast()m_numQueuedReaders);
- while ( shouldQueueReader )
- m_readerQueue.wait();
- ++(cast()m_numActiveReaders);
- }
- }
- /**
- * Releases a read lock on the enclosing mutex.
- */
- @trusted void unlock()
- {
- synchronized( m_commonMutex )
- {
- if ( --m_numActiveReaders < 1 )
- {
- if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- }
- }
- }
- /// ditto
- @trusted void unlock() shared
- {
- synchronized( m_commonMutex )
- {
- if ( --(cast()m_numActiveReaders) < 1 )
- {
- if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- }
- }
- }
- /**
- * Attempts to acquire a read lock on the enclosing mutex. If one can
- * be obtained without blocking, the lock is acquired and true is
- * returned. If not, the lock is not acquired and false is returned.
- *
- * Returns:
- * true if the lock was acquired and false if not.
- */
- @trusted bool tryLock()
- {
- synchronized( m_commonMutex )
- {
- if ( shouldQueueReader )
- return false;
- ++m_numActiveReaders;
- return true;
- }
- }
- /// ditto
- @trusted bool tryLock() shared
- {
- synchronized( m_commonMutex )
- {
- if ( shouldQueueReader )
- return false;
- ++(cast()m_numActiveReaders);
- return true;
- }
- }
- /**
- * Attempts to acquire a read lock on the enclosing mutex. If one can
- * be obtained without blocking, the lock is acquired and true is
- * returned. If not, the function blocks until either the lock can be
- * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
- * true if the lock was acquired and false if the function timed out.
- *
- * Params:
- * timeout = maximum amount of time to wait for the lock
- * Returns:
- * true if the lock was acquired and false if not.
- */
- @trusted bool tryLock(Duration timeout)
- {
- synchronized( m_commonMutex )
- {
- if (!shouldQueueReader)
- {
- ++m_numActiveReaders;
- return true;
- }
- enum zero = Duration.zero();
- if (timeout <= zero)
- return false;
- ++m_numQueuedReaders;
- scope(exit) --m_numQueuedReaders;
- enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
- const initialTime = MonoTime.currTime;
- m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
- while (shouldQueueReader)
- {
- const timeElapsed = MonoTime.currTime - initialTime;
- if (timeElapsed >= timeout)
- return false;
- auto nextWait = timeout - timeElapsed;
- m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
- }
- ++m_numActiveReaders;
- return true;
- }
- }
- /// ditto
- @trusted bool tryLock(Duration timeout) shared
- {
- const initialTime = MonoTime.currTime;
- synchronized( m_commonMutex )
- {
- ++(cast()m_numQueuedReaders);
- scope(exit) --(cast()m_numQueuedReaders);
- while (shouldQueueReader)
- {
- const timeElapsed = MonoTime.currTime - initialTime;
- if (timeElapsed >= timeout)
- return false;
- auto nextWait = timeout - timeElapsed;
- // Avoid problems calling wait(Duration) with huge arguments.
- enum maxWaitPerCall = dur!"hours"(24 * 365);
- m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
- }
- ++(cast()m_numActiveReaders);
- return true;
- }
- }
- private:
- @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
- if (is(Q == Reader) || is(Q == shared Reader))
- {
- if ( m_numActiveWriters > 0 )
- return true;
- switch ( m_policy )
- {
- case Policy.PREFER_WRITERS:
- return m_numQueuedWriters > 0;
- case Policy.PREFER_READERS:
- default:
- break;
- }
- return false;
- }
- struct MonitorProxy
- {
- Object.Monitor link;
- }
- MonitorProxy m_proxy;
- }
- ////////////////////////////////////////////////////////////////////////////
- // Writer
- ////////////////////////////////////////////////////////////////////////////
- /**
- * This class can be considered a mutex in its own right, and is used to
- * negotiate a write lock for the enclosing mutex.
- */
- class Writer :
- Object.Monitor
- {
- /**
- * Initializes a read/write mutex writer proxy object.
- */
- this(this Q)() @trusted nothrow
- if (is(Q == Writer) || is(Q == shared Writer))
- {
- m_proxy.link = this;
- this.__monitor = cast(void*) &m_proxy;
- }
- /**
- * Acquires a write lock on the enclosing mutex.
- */
- @trusted void lock()
- {
- synchronized( m_commonMutex )
- {
- ++m_numQueuedWriters;
- scope(exit) --m_numQueuedWriters;
- while ( shouldQueueWriter )
- m_writerQueue.wait();
- ++m_numActiveWriters;
- }
- }
- /// ditto
- @trusted void lock() shared
- {
- synchronized( m_commonMutex )
- {
- ++(cast()m_numQueuedWriters);
- scope(exit) --(cast()m_numQueuedWriters);
- while ( shouldQueueWriter )
- m_writerQueue.wait();
- ++(cast()m_numActiveWriters);
- }
- }
- /**
- * Releases a write lock on the enclosing mutex.
- */
- @trusted void unlock()
- {
- synchronized( m_commonMutex )
- {
- if ( --m_numActiveWriters < 1 )
- {
- switch ( m_policy )
- {
- default:
- case Policy.PREFER_READERS:
- if ( m_numQueuedReaders > 0 )
- m_readerQueue.notifyAll();
- else if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- break;
- case Policy.PREFER_WRITERS:
- if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- else if ( m_numQueuedReaders > 0 )
- m_readerQueue.notifyAll();
- }
- }
- }
- }
- /// ditto
- @trusted void unlock() shared
- {
- synchronized( m_commonMutex )
- {
- if ( --(cast()m_numActiveWriters) < 1 )
- {
- switch ( m_policy )
- {
- default:
- case Policy.PREFER_READERS:
- if ( m_numQueuedReaders > 0 )
- m_readerQueue.notifyAll();
- else if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- break;
- case Policy.PREFER_WRITERS:
- if ( m_numQueuedWriters > 0 )
- m_writerQueue.notify();
- else if ( m_numQueuedReaders > 0 )
- m_readerQueue.notifyAll();
- }
- }
- }
- }
- /**
- * Attempts to acquire a write lock on the enclosing mutex. If one can
- * be obtained without blocking, the lock is acquired and true is
- * returned. If not, the lock is not acquired and false is returned.
- *
- * Returns:
- * true if the lock was acquired and false if not.
- */
- @trusted bool tryLock()
- {
- synchronized( m_commonMutex )
- {
- if ( shouldQueueWriter )
- return false;
- ++m_numActiveWriters;
- return true;
- }
- }
- /// ditto
- @trusted bool tryLock() shared
- {
- synchronized( m_commonMutex )
- {
- if ( shouldQueueWriter )
- return false;
- ++(cast()m_numActiveWriters);
- return true;
- }
- }
- /**
- * Attempts to acquire a write lock on the enclosing mutex. If one can
- * be obtained without blocking, the lock is acquired and true is
- * returned. If not, the function blocks until either the lock can be
- * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
- * true if the lock was acquired and false if the function timed out.
- *
- * Params:
- * timeout = maximum amount of time to wait for the lock
- * Returns:
- * true if the lock was acquired and false if not.
- */
- @trusted bool tryLock(Duration timeout)
- {
- synchronized( m_commonMutex )
- {
- if (!shouldQueueWriter)
- {
- ++m_numActiveWriters;
- return true;
- }
- enum zero = Duration.zero();
- if (timeout <= zero)
- return false;
- ++m_numQueuedWriters;
- scope(exit) --m_numQueuedWriters;
- enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
- const initialTime = MonoTime.currTime;
- m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
- while (shouldQueueWriter)
- {
- const timeElapsed = MonoTime.currTime - initialTime;
- if (timeElapsed >= timeout)
- return false;
- auto nextWait = timeout - timeElapsed;
- m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
- }
- ++m_numActiveWriters;
- return true;
- }
- }
- /// ditto
- @trusted bool tryLock(Duration timeout) shared
- {
- const initialTime = MonoTime.currTime;
- synchronized( m_commonMutex )
- {
- ++(cast()m_numQueuedWriters);
- scope(exit) --(cast()m_numQueuedWriters);
- while (shouldQueueWriter)
- {
- const timeElapsed = MonoTime.currTime - initialTime;
- if (timeElapsed >= timeout)
- return false;
- auto nextWait = timeout - timeElapsed;
- // Avoid problems calling wait(Duration) with huge arguments.
- enum maxWaitPerCall = dur!"hours"(24 * 365);
- m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
- }
- ++(cast()m_numActiveWriters);
- return true;
- }
- }
- private:
- @property bool shouldQueueWriter(this Q)()
- if (is(Q == Writer) || is(Q == shared Writer))
- {
- if ( m_numActiveWriters > 0 ||
- m_numActiveReaders > 0 )
- return true;
- switch ( m_policy )
- {
- case Policy.PREFER_READERS:
- return m_numQueuedReaders > 0;
- case Policy.PREFER_WRITERS:
- default:
- break;
- }
- return false;
- }
- struct MonitorProxy
- {
- Object.Monitor link;
- }
- MonitorProxy m_proxy;
- }
- private:
- Policy m_policy;
- Reader m_reader;
- Writer m_writer;
- Mutex m_commonMutex;
- Condition m_readerQueue;
- Condition m_writerQueue;
- int m_numQueuedReaders;
- int m_numActiveReaders;
- int m_numQueuedWriters;
- int m_numActiveWriters;
- }
- ////////////////////////////////////////////////////////////////////////////////
- // Unit Tests
- ////////////////////////////////////////////////////////////////////////////////
- unittest
- {
- import core.atomic, core.thread, core.sync.semaphore;
- static void runTest(ReadWriteMutex.Policy policy)
- {
- scope mutex = new ReadWriteMutex(policy);
- scope rdSemA = new Semaphore, rdSemB = new Semaphore,
- wrSemA = new Semaphore, wrSemB = new Semaphore;
- shared size_t numReaders, numWriters;
- void readerFn()
- {
- synchronized (mutex.reader)
- {
- atomicOp!"+="(numReaders, 1);
- rdSemA.notify();
- rdSemB.wait();
- atomicOp!"-="(numReaders, 1);
- }
- }
- void writerFn()
- {
- synchronized (mutex.writer)
- {
- atomicOp!"+="(numWriters, 1);
- wrSemA.notify();
- wrSemB.wait();
- atomicOp!"-="(numWriters, 1);
- }
- }
- void waitQueued(size_t queuedReaders, size_t queuedWriters)
- {
- for (;;)
- {
- synchronized (mutex.m_commonMutex)
- {
- if (mutex.m_numQueuedReaders == queuedReaders &&
- mutex.m_numQueuedWriters == queuedWriters)
- break;
- }
- Thread.yield();
- }
- }
- scope group = new ThreadGroup;
- // 2 simultaneous readers
- group.create(&readerFn); group.create(&readerFn);
- rdSemA.wait(); rdSemA.wait();
- assert(numReaders == 2);
- rdSemB.notify(); rdSemB.notify();
- group.joinAll();
- assert(numReaders == 0);
- foreach (t; group) group.remove(t);
- // 1 writer at a time
- group.create(&writerFn); group.create(&writerFn);
- wrSemA.wait();
- assert(!wrSemA.tryWait());
- assert(numWriters == 1);
- wrSemB.notify();
- wrSemA.wait();
- assert(numWriters == 1);
- wrSemB.notify();
- group.joinAll();
- assert(numWriters == 0);
- foreach (t; group) group.remove(t);
- // reader and writer are mutually exclusive
- group.create(&readerFn);
- rdSemA.wait();
- group.create(&writerFn);
- waitQueued(0, 1);
- assert(!wrSemA.tryWait());
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- // writer and reader are mutually exclusive
- group.create(&writerFn);
- wrSemA.wait();
- group.create(&readerFn);
- waitQueued(1, 0);
- assert(!rdSemA.tryWait());
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- // policy determines whether queued reader or writers progress first
- group.create(&writerFn);
- wrSemA.wait();
- group.create(&readerFn);
- group.create(&writerFn);
- waitQueued(1, 1);
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- if (policy == ReadWriteMutex.Policy.PREFER_READERS)
- {
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- }
- else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
- {
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- }
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- }
- runTest(ReadWriteMutex.Policy.PREFER_READERS);
- runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
- }
- unittest
- {
- import core.atomic, core.thread;
- __gshared ReadWriteMutex rwmutex;
- shared static bool threadTriedOnceToGetLock;
- shared static bool threadFinallyGotLock;
- rwmutex = new ReadWriteMutex();
- atomicFence;
- const maxTimeAllowedForTest = dur!"seconds"(20);
- // Test ReadWriteMutex.Reader.tryLock(Duration).
- {
- static void testReaderTryLock()
- {
- assert(!rwmutex.reader.tryLock(Duration.min));
- threadTriedOnceToGetLock.atomicStore(true);
- assert(rwmutex.reader.tryLock(Duration.max));
- threadFinallyGotLock.atomicStore(true);
- rwmutex.reader.unlock;
- }
- assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
- auto otherThread = new Thread(&testReaderTryLock).start;
- const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
- Thread.yield;
- // We started otherThread with the writer lock held so otherThread's
- // first rwlock.reader.tryLock with timeout Duration.min should fail.
- while (!threadTriedOnceToGetLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- rwmutex.writer.unlock;
- // Soon after we release the writer lock otherThread's second
- // rwlock.reader.tryLock with timeout Duration.max should succeed.
- while (!threadFinallyGotLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- otherThread.join;
- }
- threadTriedOnceToGetLock.atomicStore(false); // Reset.
- threadFinallyGotLock.atomicStore(false); // Reset.
- // Test ReadWriteMutex.Writer.tryLock(Duration).
- {
- static void testWriterTryLock()
- {
- assert(!rwmutex.writer.tryLock(Duration.min));
- threadTriedOnceToGetLock.atomicStore(true);
- assert(rwmutex.writer.tryLock(Duration.max));
- threadFinallyGotLock.atomicStore(true);
- rwmutex.writer.unlock;
- }
- assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
- auto otherThread = new Thread(&testWriterTryLock).start;
- const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
- Thread.yield;
- // We started otherThread with the reader lock held so otherThread's
- // first rwlock.writer.tryLock with timeout Duration.min should fail.
- while (!threadTriedOnceToGetLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- rwmutex.reader.unlock;
- // Soon after we release the reader lock otherThread's second
- // rwlock.writer.tryLock with timeout Duration.max should succeed.
- while (!threadFinallyGotLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- otherThread.join;
- }
- }
- unittest
- {
- import core.atomic, core.thread, core.sync.semaphore;
- static void runTest(ReadWriteMutex.Policy policy)
- {
- shared scope mutex = new shared ReadWriteMutex(policy);
- scope rdSemA = new Semaphore, rdSemB = new Semaphore,
- wrSemA = new Semaphore, wrSemB = new Semaphore;
- shared size_t numReaders, numWriters;
- void readerFn()
- {
- synchronized (mutex.reader)
- {
- atomicOp!"+="(numReaders, 1);
- rdSemA.notify();
- rdSemB.wait();
- atomicOp!"-="(numReaders, 1);
- }
- }
- void writerFn()
- {
- synchronized (mutex.writer)
- {
- atomicOp!"+="(numWriters, 1);
- wrSemA.notify();
- wrSemB.wait();
- atomicOp!"-="(numWriters, 1);
- }
- }
- void waitQueued(size_t queuedReaders, size_t queuedWriters)
- {
- for (;;)
- {
- synchronized (mutex.m_commonMutex)
- {
- if (mutex.m_numQueuedReaders == queuedReaders &&
- mutex.m_numQueuedWriters == queuedWriters)
- break;
- }
- Thread.yield();
- }
- }
- scope group = new ThreadGroup;
- // 2 simultaneous readers
- group.create(&readerFn); group.create(&readerFn);
- rdSemA.wait(); rdSemA.wait();
- assert(numReaders == 2);
- rdSemB.notify(); rdSemB.notify();
- group.joinAll();
- assert(numReaders == 0);
- foreach (t; group) group.remove(t);
- // 1 writer at a time
- group.create(&writerFn); group.create(&writerFn);
- wrSemA.wait();
- assert(!wrSemA.tryWait());
- assert(numWriters == 1);
- wrSemB.notify();
- wrSemA.wait();
- assert(numWriters == 1);
- wrSemB.notify();
- group.joinAll();
- assert(numWriters == 0);
- foreach (t; group) group.remove(t);
- // reader and writer are mutually exclusive
- group.create(&readerFn);
- rdSemA.wait();
- group.create(&writerFn);
- waitQueued(0, 1);
- assert(!wrSemA.tryWait());
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- // writer and reader are mutually exclusive
- group.create(&writerFn);
- wrSemA.wait();
- group.create(&readerFn);
- waitQueued(1, 0);
- assert(!rdSemA.tryWait());
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- // policy determines whether queued reader or writers progress first
- group.create(&writerFn);
- wrSemA.wait();
- group.create(&readerFn);
- group.create(&writerFn);
- waitQueued(1, 1);
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- if (policy == ReadWriteMutex.Policy.PREFER_READERS)
- {
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- }
- else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
- {
- wrSemA.wait();
- assert(numReaders == 0 && numWriters == 1);
- wrSemB.notify();
- rdSemA.wait();
- assert(numReaders == 1 && numWriters == 0);
- rdSemB.notify();
- }
- group.joinAll();
- assert(numReaders == 0 && numWriters == 0);
- foreach (t; group) group.remove(t);
- }
- runTest(ReadWriteMutex.Policy.PREFER_READERS);
- runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
- }
- unittest
- {
- import core.atomic, core.thread;
- shared static ReadWriteMutex rwmutex;
- shared static bool threadTriedOnceToGetLock;
- shared static bool threadFinallyGotLock;
- rwmutex = new shared ReadWriteMutex();
- atomicFence;
- const maxTimeAllowedForTest = dur!"seconds"(20);
- // Test ReadWriteMutex.Reader.tryLock(Duration).
- {
- static void testReaderTryLock()
- {
- assert(!rwmutex.reader.tryLock(Duration.min));
- threadTriedOnceToGetLock.atomicStore(true);
- assert(rwmutex.reader.tryLock(Duration.max));
- threadFinallyGotLock.atomicStore(true);
- rwmutex.reader.unlock;
- }
- assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
- auto otherThread = new Thread(&testReaderTryLock).start;
- const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
- Thread.yield;
- // We started otherThread with the writer lock held so otherThread's
- // first rwlock.reader.tryLock with timeout Duration.min should fail.
- while (!threadTriedOnceToGetLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- rwmutex.writer.unlock;
- // Soon after we release the writer lock otherThread's second
- // rwlock.reader.tryLock with timeout Duration.max should succeed.
- while (!threadFinallyGotLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- otherThread.join;
- }
- threadTriedOnceToGetLock.atomicStore(false); // Reset.
- threadFinallyGotLock.atomicStore(false); // Reset.
- // Test ReadWriteMutex.Writer.tryLock(Duration).
- {
- static void testWriterTryLock()
- {
- assert(!rwmutex.writer.tryLock(Duration.min));
- threadTriedOnceToGetLock.atomicStore(true);
- assert(rwmutex.writer.tryLock(Duration.max));
- threadFinallyGotLock.atomicStore(true);
- rwmutex.writer.unlock;
- }
- assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
- auto otherThread = new Thread(&testWriterTryLock).start;
- const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
- Thread.yield;
- // We started otherThread with the reader lock held so otherThread's
- // first rwlock.writer.tryLock with timeout Duration.min should fail.
- while (!threadTriedOnceToGetLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- rwmutex.reader.unlock;
- // Soon after we release the reader lock otherThread's second
- // rwlock.writer.tryLock with timeout Duration.max should succeed.
- while (!threadFinallyGotLock.atomicLoad)
- {
- assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
- Thread.yield;
- }
- otherThread.join;
- }
- }
|