123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- /**
- * The event module provides a primitive for lightweight signaling of other threads
- * (emulating Windows events on Posix)
- *
- * Copyright: Copyright (c) 2019 D Language Foundation
- * License: Distributed under the
- * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
- * (See accompanying file LICENSE)
- * Authors: Rainer Schuetze
- * Source: $(DRUNTIMESRC core/sync/event.d)
- */
- module core.sync.event;
- version (Windows)
- {
- import core.sys.windows.basetsd /+: HANDLE +/;
- import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
- import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
- WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
- }
- else version (Posix)
- {
- import core.sys.posix.pthread;
- import core.sys.posix.sys.types;
- import core.sys.posix.time;
- }
- else
- {
- static assert(false, "Platform not supported");
- }
- import core.time;
- import core.internal.abort : abort;
- /**
- * represents an event. Clients of an event are suspended while waiting
- * for the event to be "signaled".
- *
- * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
- * `CreateEvent` and `SetEvent` on Windows.
- ---
- import core.sync.event, core.thread, std.file;
- struct ProcessFile
- {
- ThreadGroup group;
- Event event;
- void[] buffer;
- void doProcess()
- {
- event.wait();
- // process buffer
- }
- void process(string filename)
- {
- event.initialize(true, false);
- group = new ThreadGroup;
- for (int i = 0; i < 10; ++i)
- group.create(&doProcess);
- buffer = std.file.read(filename);
- event.set();
- group.joinAll();
- event.terminate();
- }
- }
- ---
- */
- struct Event
- {
- nothrow @nogc:
- /**
- * Creates an event object.
- *
- * Params:
- * manualReset = the state of the event is not reset automatically after resuming waiting clients
- * initialState = initial state of the signal
- */
- this(bool manualReset, bool initialState)
- {
- initialize(manualReset, initialState);
- }
- /**
- * Initializes an event object. Does nothing if the event is already initialized.
- *
- * Params:
- * manualReset = the state of the event is not reset automatically after resuming waiting clients
- * initialState = initial state of the signal
- */
- void initialize(bool manualReset, bool initialState)
- {
- version (Windows)
- {
- if (m_event)
- return;
- m_event = CreateEvent(null, manualReset, initialState, null);
- m_event || abort("Error: CreateEvent failed.");
- }
- else version (Posix)
- {
- if (m_initalized)
- return;
- pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
- abort("Error: pthread_mutex_init failed.");
- static if ( is( typeof( pthread_condattr_setclock ) ) )
- {
- pthread_condattr_t attr = void;
- pthread_condattr_init(&attr) == 0 ||
- abort("Error: pthread_condattr_init failed.");
- pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
- abort("Error: pthread_condattr_setclock failed.");
- pthread_cond_init(&m_cond, &attr) == 0 ||
- abort("Error: pthread_cond_init failed.");
- pthread_condattr_destroy(&attr) == 0 ||
- abort("Error: pthread_condattr_destroy failed.");
- }
- else
- {
- pthread_cond_init(&m_cond, null) == 0 ||
- abort("Error: pthread_cond_init failed.");
- }
- m_state = initialState;
- m_manualReset = manualReset;
- m_initalized = true;
- }
- }
- // copying not allowed, can produce resource leaks
- @disable this(this);
- @disable void opAssign(Event);
- ~this()
- {
- terminate();
- }
- /**
- * deinitialize event. Does nothing if the event is not initialized. There must not be
- * threads currently waiting for the event to be signaled.
- */
- void terminate()
- {
- version (Windows)
- {
- if (m_event)
- CloseHandle(m_event);
- m_event = null;
- }
- else version (Posix)
- {
- if (m_initalized)
- {
- pthread_mutex_destroy(&m_mutex) == 0 ||
- abort("Error: pthread_mutex_destroy failed.");
- pthread_cond_destroy(&m_cond) == 0 ||
- abort("Error: pthread_cond_destroy failed.");
- m_initalized = false;
- }
- }
- }
- /// Set the event to "signaled", so that waiting clients are resumed
- void set()
- {
- version (Windows)
- {
- if (m_event)
- SetEvent(m_event);
- }
- else version (Posix)
- {
- if (m_initalized)
- {
- pthread_mutex_lock(&m_mutex);
- m_state = true;
- pthread_cond_broadcast(&m_cond);
- pthread_mutex_unlock(&m_mutex);
- }
- }
- }
- /// Reset the event manually
- void reset()
- {
- version (Windows)
- {
- if (m_event)
- ResetEvent(m_event);
- }
- else version (Posix)
- {
- if (m_initalized)
- {
- pthread_mutex_lock(&m_mutex);
- m_state = false;
- pthread_mutex_unlock(&m_mutex);
- }
- }
- }
- /**
- * Wait for the event to be signaled without timeout.
- *
- * Returns:
- * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
- */
- bool wait()
- {
- version (Windows)
- {
- return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
- }
- else version (Posix)
- {
- return wait(Duration.max);
- }
- }
- /**
- * Wait for the event to be signaled with timeout.
- *
- * Params:
- * tmout = the maximum time to wait
- * Returns:
- * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
- * the event is uninitialized or another error occured
- */
- bool wait(Duration tmout)
- {
- version (Windows)
- {
- if (!m_event)
- return false;
- auto maxWaitMillis = dur!("msecs")(uint.max - 1);
- while (tmout > maxWaitMillis)
- {
- auto res = WaitForSingleObject(m_event, uint.max - 1);
- if (res != WAIT_TIMEOUT)
- return res == WAIT_OBJECT_0;
- tmout -= maxWaitMillis;
- }
- auto ms = cast(uint)(tmout.total!"msecs");
- return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
- }
- else version (Posix)
- {
- if (!m_initalized)
- return false;
- pthread_mutex_lock(&m_mutex);
- int result = 0;
- if (!m_state)
- {
- if (tmout == Duration.max)
- {
- result = pthread_cond_wait(&m_cond, &m_mutex);
- }
- else
- {
- import core.sync.config;
- timespec t = void;
- mktspec(t, tmout);
- result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
- }
- }
- if (result == 0 && !m_manualReset)
- m_state = false;
- pthread_mutex_unlock(&m_mutex);
- return result == 0;
- }
- }
- private:
- version (Windows)
- {
- HANDLE m_event;
- }
- else version (Posix)
- {
- pthread_mutex_t m_mutex;
- pthread_cond_t m_cond;
- bool m_initalized;
- bool m_state;
- bool m_manualReset;
- }
- }
- // Test single-thread (non-shared) use.
- @nogc nothrow unittest
- {
- // auto-reset, initial state false
- Event ev1 = Event(false, false);
- assert(!ev1.wait(1.dur!"msecs"));
- ev1.set();
- assert(ev1.wait());
- assert(!ev1.wait(1.dur!"msecs"));
- // manual-reset, initial state true
- Event ev2 = Event(true, true);
- assert(ev2.wait());
- assert(ev2.wait());
- ev2.reset();
- assert(!ev2.wait(1.dur!"msecs"));
- }
- unittest
- {
- import core.thread, core.atomic;
- scope event = new Event(true, false);
- int numThreads = 10;
- shared int numRunning = 0;
- void testFn()
- {
- event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
- numRunning.atomicOp!"+="(1);
- }
- auto group = new ThreadGroup;
- for (int i = 0; i < numThreads; ++i)
- group.create(&testFn);
- auto start = MonoTime.currTime;
- assert(numRunning == 0);
- event.set();
- group.joinAll();
- assert(numRunning == numThreads);
- assert(MonoTime.currTime - start < 5.dur!"seconds");
- }
|