123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- /**
- * The semaphore module provides a general use semaphore for synchronization.
- *
- * Copyright: Copyright Sean Kelly 2005 - 2009.
- * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
- * Authors: Sean Kelly
- * Source: $(DRUNTIMESRC core/sync/_semaphore.d)
- */
- /* Copyright Sean Kelly 2005 - 2009.
- * Distributed under the Boost Software License, Version 1.0.
- * (See accompanying file LICENSE or copy at
- * http://www.boost.org/LICENSE_1_0.txt)
- */
- module core.sync.semaphore;
- public import core.sync.exception;
- public import core.time;
- version (OSX)
- version = Darwin;
- else version (iOS)
- version = Darwin;
- else version (TVOS)
- version = Darwin;
- else version (WatchOS)
- version = Darwin;
- version (Windows)
- {
- import core.sys.windows.basetsd /+: HANDLE+/;
- import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE,
- ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
- import core.sys.windows.windef /+: BOOL, DWORD+/;
- import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
- }
- else version (Darwin)
- {
- import core.sync.config;
- import core.stdc.errno;
- import core.sys.posix.time;
- import core.sys.darwin.mach.semaphore;
- }
- else version (Posix)
- {
- import core.sync.config;
- import core.stdc.errno;
- import core.sys.posix.pthread;
- import core.sys.posix.semaphore;
- }
- else
- {
- static assert(false, "Platform not supported");
- }
- ////////////////////////////////////////////////////////////////////////////////
- // Semaphore
- //
- // void wait();
- // void notify();
- // bool tryWait();
- ////////////////////////////////////////////////////////////////////////////////
- /**
- * This class represents a general counting semaphore as concieved by Edsger
- * Dijkstra. As per Mesa type monitors however, "signal" has been replaced
- * with "notify" to indicate that control is not transferred to the waiter when
- * a notification is sent.
- */
- class Semaphore
- {
- ////////////////////////////////////////////////////////////////////////////
- // Initialization
- ////////////////////////////////////////////////////////////////////////////
- /**
- * Initializes a semaphore object with the specified initial count.
- *
- * Params:
- * count = The initial count for the semaphore.
- *
- * Throws:
- * SyncError on error.
- */
- this( uint count = 0 )
- {
- version (Windows)
- {
- m_hndl = CreateSemaphoreA( null, count, int.max, null );
- if ( m_hndl == m_hndl.init )
- throw new SyncError( "Unable to create semaphore" );
- }
- else version (Darwin)
- {
- auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count );
- if ( rc )
- throw new SyncError( "Unable to create semaphore" );
- }
- else version (Posix)
- {
- int rc = sem_init( &m_hndl, 0, count );
- if ( rc )
- throw new SyncError( "Unable to create semaphore" );
- }
- }
- ~this()
- {
- version (Windows)
- {
- BOOL rc = CloseHandle( m_hndl );
- assert( rc, "Unable to destroy semaphore" );
- }
- else version (Darwin)
- {
- auto rc = semaphore_destroy( mach_task_self(), m_hndl );
- assert( !rc, "Unable to destroy semaphore" );
- }
- else version (Posix)
- {
- int rc = sem_destroy( &m_hndl );
- assert( !rc, "Unable to destroy semaphore" );
- }
- }
- ////////////////////////////////////////////////////////////////////////////
- // General Actions
- ////////////////////////////////////////////////////////////////////////////
- /**
- * Wait until the current count is above zero, then atomically decrement
- * the count by one and return.
- *
- * Throws:
- * SyncError on error.
- */
- void wait()
- {
- version (Windows)
- {
- DWORD rc = WaitForSingleObject( m_hndl, INFINITE );
- if ( rc != WAIT_OBJECT_0 )
- throw new SyncError( "Unable to wait for semaphore" );
- }
- else version (Darwin)
- {
- while ( true )
- {
- auto rc = semaphore_wait( m_hndl );
- if ( !rc )
- return;
- if ( rc == KERN_ABORTED && errno == EINTR )
- continue;
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- else version (Posix)
- {
- while ( true )
- {
- if ( !sem_wait( &m_hndl ) )
- return;
- if ( errno != EINTR )
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- }
- /**
- * Suspends the calling thread until the current count moves above zero or
- * until the supplied time period has elapsed. If the count moves above
- * zero in this interval, then atomically decrement the count by one and
- * return true. Otherwise, return false.
- *
- * Params:
- * period = The time to wait.
- *
- * In:
- * period must be non-negative.
- *
- * Throws:
- * SyncError on error.
- *
- * Returns:
- * true if notified before the timeout and false if not.
- */
- bool wait( Duration period )
- in
- {
- assert( !period.isNegative );
- }
- do
- {
- version (Windows)
- {
- auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
- while ( period > maxWaitMillis )
- {
- auto rc = WaitForSingleObject( m_hndl, cast(uint)
- maxWaitMillis.total!"msecs" );
- switch ( rc )
- {
- case WAIT_OBJECT_0:
- return true;
- case WAIT_TIMEOUT:
- period -= maxWaitMillis;
- continue;
- default:
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) )
- {
- case WAIT_OBJECT_0:
- return true;
- case WAIT_TIMEOUT:
- return false;
- default:
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- else version (Darwin)
- {
- mach_timespec_t t = void;
- (cast(byte*) &t)[0 .. t.sizeof] = 0;
- if ( period.total!"seconds" > t.tv_sec.max )
- {
- t.tv_sec = t.tv_sec.max;
- t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs;
- }
- else
- period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec);
- while ( true )
- {
- auto rc = semaphore_timedwait( m_hndl, t );
- if ( !rc )
- return true;
- if ( rc == KERN_OPERATION_TIMED_OUT )
- return false;
- if ( rc != KERN_ABORTED || errno != EINTR )
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- else version (Posix)
- {
- import core.sys.posix.time : clock_gettime, CLOCK_REALTIME;
- timespec t = void;
- clock_gettime( CLOCK_REALTIME, &t );
- mvtspec( t, period );
- while ( true )
- {
- if ( !sem_timedwait( &m_hndl, &t ) )
- return true;
- if ( errno == ETIMEDOUT )
- return false;
- if ( errno != EINTR )
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- }
- /**
- * Atomically increment the current count by one. This will notify one
- * waiter, if there are any in the queue.
- *
- * Throws:
- * SyncError on error.
- */
- void notify()
- {
- version (Windows)
- {
- if ( !ReleaseSemaphore( m_hndl, 1, null ) )
- throw new SyncError( "Unable to notify semaphore" );
- }
- else version (Darwin)
- {
- auto rc = semaphore_signal( m_hndl );
- if ( rc )
- throw new SyncError( "Unable to notify semaphore" );
- }
- else version (Posix)
- {
- int rc = sem_post( &m_hndl );
- if ( rc )
- throw new SyncError( "Unable to notify semaphore" );
- }
- }
- /**
- * If the current count is equal to zero, return. Otherwise, atomically
- * decrement the count by one and return true.
- *
- * Throws:
- * SyncError on error.
- *
- * Returns:
- * true if the count was above zero and false if not.
- */
- bool tryWait()
- {
- version (Windows)
- {
- switch ( WaitForSingleObject( m_hndl, 0 ) )
- {
- case WAIT_OBJECT_0:
- return true;
- case WAIT_TIMEOUT:
- return false;
- default:
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- else version (Darwin)
- {
- return wait( dur!"hnsecs"(0) );
- }
- else version (Posix)
- {
- while ( true )
- {
- if ( !sem_trywait( &m_hndl ) )
- return true;
- if ( errno == EAGAIN )
- return false;
- if ( errno != EINTR )
- throw new SyncError( "Unable to wait for semaphore" );
- }
- }
- }
- protected:
- /// Aliases the operating-system-specific semaphore type.
- version (Windows) alias Handle = HANDLE;
- /// ditto
- else version (Darwin) alias Handle = semaphore_t;
- /// ditto
- else version (Posix) alias Handle = sem_t;
- /// Handle to the system-specific semaphore.
- Handle m_hndl;
- }
- ////////////////////////////////////////////////////////////////////////////////
- // Unit Tests
- ////////////////////////////////////////////////////////////////////////////////
- unittest
- {
- import core.thread, core.atomic;
- void testWait()
- {
- auto semaphore = new Semaphore;
- shared bool stopConsumption = false;
- immutable numToProduce = 20;
- immutable numConsumers = 10;
- shared size_t numConsumed;
- shared size_t numComplete;
- void consumer()
- {
- while (true)
- {
- semaphore.wait();
- if (atomicLoad(stopConsumption))
- break;
- atomicOp!"+="(numConsumed, 1);
- }
- atomicOp!"+="(numComplete, 1);
- }
- void producer()
- {
- assert(!semaphore.tryWait());
- foreach (_; 0 .. numToProduce)
- semaphore.notify();
- // wait until all items are consumed
- while (atomicLoad(numConsumed) != numToProduce)
- Thread.yield();
- // mark consumption as finished
- atomicStore(stopConsumption, true);
- // wake all consumers
- foreach (_; 0 .. numConsumers)
- semaphore.notify();
- // wait until all consumers completed
- while (atomicLoad(numComplete) != numConsumers)
- Thread.yield();
- assert(!semaphore.tryWait());
- semaphore.notify();
- assert(semaphore.tryWait());
- assert(!semaphore.tryWait());
- }
- auto group = new ThreadGroup;
- for ( int i = 0; i < numConsumers; ++i )
- group.create(&consumer);
- group.create(&producer);
- group.joinAll();
- }
- void testWaitTimeout()
- {
- auto sem = new Semaphore;
- shared bool semReady;
- bool alertedOne, alertedTwo;
- void waiter()
- {
- while (!atomicLoad(semReady))
- Thread.yield();
- alertedOne = sem.wait(dur!"msecs"(1));
- alertedTwo = sem.wait(dur!"msecs"(1));
- assert(alertedOne && !alertedTwo);
- }
- auto thread = new Thread(&waiter);
- thread.start();
- sem.notify();
- atomicStore(semReady, true);
- thread.join();
- assert(alertedOne && !alertedTwo);
- }
- testWait();
- testWaitTimeout();
- }
|