async.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. /* Copyright (C) 2018-2022 Free Software Foundation, Inc.
  2. Contributed by Nicolas Koenig
  3. This file is part of the GNU Fortran runtime library (libgfortran).
  4. Libgfortran is free software; you can redistribute it and/or modify
  5. it under the terms of the GNU General Public License as published by
  6. the Free Software Foundation; either version 3, or (at your option)
  7. any later version.
  8. Libgfortran is distributed in the hope that it will be useful,
  9. but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. GNU General Public License for more details.
  12. Under Section 7 of GPL version 3, you are granted additional
  13. permissions described in the GCC Runtime Library Exception, version
  14. 3.1, as published by the Free Software Foundation.
  15. You should have received a copy of the GNU General Public License and
  16. a copy of the GCC Runtime Library Exception along with this program;
  17. see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
  18. <http://www.gnu.org/licenses/>. */
  19. #include "libgfortran.h"
  20. #define _GTHREAD_USE_COND_INIT_FUNC
  21. #include "../../libgcc/gthr.h"
  22. #include "io.h"
  23. #include "fbuf.h"
  24. #include "format.h"
  25. #include "unix.h"
  26. #include <string.h>
  27. #include <assert.h>
  28. #include <sys/types.h>
  29. #include "async.h"
  30. #if ASYNC_IO
  31. DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
  32. DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
  33. DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
  34. /* Current unit for asynchronous I/O. Needed for error reporting. */
  35. __thread gfc_unit *thread_unit = NULL;
  36. /* Queue entry for the asynchronous I/O entry. */
  37. typedef struct transfer_queue
  38. {
  39. enum aio_do type;
  40. struct transfer_queue *next;
  41. struct st_parameter_dt *new_pdt;
  42. transfer_args arg;
  43. _Bool has_id;
  44. int read_flag;
  45. } transfer_queue;
  46. struct error {
  47. st_parameter_dt *dtp;
  48. int id;
  49. };
  50. /* Helper function to exchange the old vs. a new PDT. */
  51. static void
  52. update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
  53. st_parameter_dt *temp;
  54. NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
  55. temp = *old;
  56. *old = new;
  57. if (temp)
  58. free (temp);
  59. }
  60. /* Destroy an adv_cond structure. */
  61. static void
  62. destroy_adv_cond (struct adv_cond *ac)
  63. {
  64. T_ERROR (__gthread_cond_destroy, &ac->signal);
  65. }
  66. /* Function invoked as start routine for a new asynchronous I/O unit.
  67. Contains the main loop for accepting requests and handling them. */
  68. static void *
  69. async_io (void *arg)
  70. {
  71. DEBUG_LINE (aio_prefix = TPREFIX);
  72. transfer_queue *ctq = NULL, *prev = NULL;
  73. gfc_unit *u = (gfc_unit *) arg;
  74. async_unit *au = u->au;
  75. LOCK (&au->lock);
  76. thread_unit = u;
  77. au->thread = __gthread_self ();
  78. while (true)
  79. {
  80. /* Main loop. At this point, au->lock is always held. */
  81. WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
  82. LOCK (&au->lock);
  83. ctq = au->head;
  84. prev = NULL;
  85. /* Loop over the queue entries until they are finished. */
  86. while (ctq)
  87. {
  88. if (prev)
  89. free (prev);
  90. prev = ctq;
  91. if (!au->error.has_error)
  92. {
  93. UNLOCK (&au->lock);
  94. switch (ctq->type)
  95. {
  96. case AIO_WRITE_DONE:
  97. NOTE ("Finalizing write");
  98. st_write_done_worker (au->pdt, false);
  99. UNLOCK (&au->io_lock);
  100. break;
  101. case AIO_READ_DONE:
  102. NOTE ("Finalizing read");
  103. st_read_done_worker (au->pdt, false);
  104. UNLOCK (&au->io_lock);
  105. break;
  106. case AIO_DATA_TRANSFER_INIT:
  107. NOTE ("Data transfer init");
  108. LOCK (&au->io_lock);
  109. update_pdt (&au->pdt, ctq->new_pdt);
  110. data_transfer_init_worker (au->pdt, ctq->read_flag);
  111. break;
  112. case AIO_TRANSFER_SCALAR:
  113. NOTE ("Starting scalar transfer");
  114. ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
  115. ctq->arg.scalar.data,
  116. ctq->arg.scalar.i,
  117. ctq->arg.scalar.s1,
  118. ctq->arg.scalar.s2);
  119. break;
  120. case AIO_TRANSFER_ARRAY:
  121. NOTE ("Starting array transfer");
  122. NOTE ("ctq->arg.array.desc = %p",
  123. (void *) (ctq->arg.array.desc));
  124. transfer_array_inner (au->pdt, ctq->arg.array.desc,
  125. ctq->arg.array.kind,
  126. ctq->arg.array.charlen);
  127. free (ctq->arg.array.desc);
  128. break;
  129. case AIO_CLOSE:
  130. NOTE ("Received AIO_CLOSE");
  131. LOCK (&au->lock);
  132. goto finish_thread;
  133. default:
  134. internal_error (NULL, "Invalid queue type");
  135. break;
  136. }
  137. LOCK (&au->lock);
  138. if (unlikely (au->error.has_error))
  139. au->error.last_good_id = au->id.low - 1;
  140. }
  141. else
  142. {
  143. if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
  144. {
  145. UNLOCK (&au->io_lock);
  146. }
  147. else if (ctq->type == AIO_CLOSE)
  148. {
  149. NOTE ("Received AIO_CLOSE during error condition");
  150. goto finish_thread;
  151. }
  152. }
  153. NOTE ("Next ctq, current id: %d", au->id.low);
  154. if (ctq->has_id && au->id.waiting == au->id.low++)
  155. SIGNAL (&au->id.done);
  156. ctq = ctq->next;
  157. }
  158. au->tail = NULL;
  159. au->head = NULL;
  160. au->empty = 1;
  161. SIGNAL (&au->emptysignal);
  162. }
  163. finish_thread:
  164. au->tail = NULL;
  165. au->head = NULL;
  166. au->empty = 1;
  167. SIGNAL (&au->emptysignal);
  168. free (ctq);
  169. UNLOCK (&au->lock);
  170. return NULL;
  171. }
  172. /* Free an asynchronous unit. */
  173. static void
  174. free_async_unit (async_unit *au)
  175. {
  176. if (au->tail)
  177. internal_error (NULL, "Trying to free nonempty asynchronous unit");
  178. destroy_adv_cond (&au->work);
  179. destroy_adv_cond (&au->emptysignal);
  180. destroy_adv_cond (&au->id.done);
  181. T_ERROR (__gthread_mutex_destroy, &au->lock);
  182. free (au);
  183. }
  184. /* Initialize an adv_cond structure. */
  185. static void
  186. init_adv_cond (struct adv_cond *ac)
  187. {
  188. ac->pending = 0;
  189. __GTHREAD_COND_INIT_FUNCTION (&ac->signal);
  190. }
  191. /* Initialize an asyncronous unit, returning zero on success,
  192. nonzero on failure. It also sets u->au. */
  193. void
  194. init_async_unit (gfc_unit *u)
  195. {
  196. async_unit *au;
  197. if (!__gthread_active_p ())
  198. {
  199. u->au = NULL;
  200. return;
  201. }
  202. au = (async_unit *) xmalloc (sizeof (async_unit));
  203. u->au = au;
  204. init_adv_cond (&au->work);
  205. init_adv_cond (&au->emptysignal);
  206. __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
  207. __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
  208. LOCK (&au->lock);
  209. T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
  210. au->pdt = NULL;
  211. au->head = NULL;
  212. au->tail = NULL;
  213. au->empty = true;
  214. au->id.waiting = -1;
  215. au->id.low = 0;
  216. au->id.high = 0;
  217. au->error.fatal_error = 0;
  218. au->error.has_error = 0;
  219. au->error.last_good_id = 0;
  220. init_adv_cond (&au->id.done);
  221. UNLOCK (&au->lock);
  222. }
  223. /* Enqueue a transfer statement. */
  224. void
  225. enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
  226. {
  227. transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
  228. tq->arg = *arg;
  229. tq->type = type;
  230. tq->has_id = 0;
  231. LOCK (&au->lock);
  232. if (!au->tail)
  233. au->head = tq;
  234. else
  235. au->tail->next = tq;
  236. au->tail = tq;
  237. REVOKE_SIGNAL (&(au->emptysignal));
  238. au->empty = false;
  239. SIGNAL (&au->work);
  240. UNLOCK (&au->lock);
  241. }
  242. /* Enqueue an st_write_done or st_read_done which contains an ID. */
  243. int
  244. enqueue_done_id (async_unit *au, enum aio_do type)
  245. {
  246. int ret;
  247. transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
  248. tq->type = type;
  249. tq->has_id = 1;
  250. LOCK (&au->lock);
  251. if (!au->tail)
  252. au->head = tq;
  253. else
  254. au->tail->next = tq;
  255. au->tail = tq;
  256. REVOKE_SIGNAL (&(au->emptysignal));
  257. au->empty = false;
  258. ret = au->id.high++;
  259. NOTE ("Enqueue id: %d", ret);
  260. SIGNAL (&au->work);
  261. UNLOCK (&au->lock);
  262. return ret;
  263. }
  264. /* Enqueue an st_write_done or st_read_done without an ID. */
  265. void
  266. enqueue_done (async_unit *au, enum aio_do type)
  267. {
  268. transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
  269. tq->type = type;
  270. tq->has_id = 0;
  271. LOCK (&au->lock);
  272. if (!au->tail)
  273. au->head = tq;
  274. else
  275. au->tail->next = tq;
  276. au->tail = tq;
  277. REVOKE_SIGNAL (&(au->emptysignal));
  278. au->empty = false;
  279. SIGNAL (&au->work);
  280. UNLOCK (&au->lock);
  281. }
  282. /* Enqueue a CLOSE statement. */
  283. void
  284. enqueue_close (async_unit *au)
  285. {
  286. transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
  287. tq->type = AIO_CLOSE;
  288. LOCK (&au->lock);
  289. if (!au->tail)
  290. au->head = tq;
  291. else
  292. au->tail->next = tq;
  293. au->tail = tq;
  294. REVOKE_SIGNAL (&(au->emptysignal));
  295. au->empty = false;
  296. SIGNAL (&au->work);
  297. UNLOCK (&au->lock);
  298. }
  299. /* The asynchronous unit keeps the currently active PDT around.
  300. This function changes that to the current one. */
  301. void
  302. enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
  303. {
  304. st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
  305. transfer_queue *tq = xmalloc (sizeof (transfer_queue));
  306. memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
  307. NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
  308. NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
  309. tq->next = NULL;
  310. tq->type = AIO_DATA_TRANSFER_INIT;
  311. tq->read_flag = read_flag;
  312. tq->has_id = 0;
  313. tq->new_pdt = new;
  314. LOCK (&au->lock);
  315. if (!au->tail)
  316. au->head = tq;
  317. else
  318. au->tail->next = tq;
  319. au->tail = tq;
  320. REVOKE_SIGNAL (&(au->emptysignal));
  321. au->empty = false;
  322. SIGNAL (&au->work);
  323. UNLOCK (&au->lock);
  324. }
  325. /* Collect the errors that may have happened asynchronously. Return true if
  326. an error has been encountered. */
  327. bool
  328. collect_async_errors (st_parameter_common *cmp, async_unit *au)
  329. {
  330. bool has_error = au->error.has_error;
  331. if (has_error)
  332. {
  333. if (generate_error_common (cmp, au->error.family, au->error.message))
  334. {
  335. au->error.has_error = 0;
  336. au->error.cmp = NULL;
  337. }
  338. else
  339. {
  340. /* The program will exit later. */
  341. au->error.fatal_error = true;
  342. }
  343. }
  344. return has_error;
  345. }
  346. /* Perform a wait operation on an asynchronous unit with an ID specified,
  347. which means collecting the errors that may have happened asynchronously.
  348. Return true if an error has been encountered. */
  349. bool
  350. async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
  351. {
  352. bool ret;
  353. if (au == NULL)
  354. return false;
  355. if (cmp == NULL)
  356. cmp = au->error.cmp;
  357. if (au->error.has_error)
  358. {
  359. if (i <= au->error.last_good_id)
  360. return false;
  361. return collect_async_errors (cmp, au);
  362. }
  363. LOCK (&au->lock);
  364. if (i > au->id.high)
  365. {
  366. generate_error_common (cmp, LIBERROR_BAD_WAIT_ID, NULL);
  367. UNLOCK (&au->lock);
  368. return true;
  369. }
  370. NOTE ("Waiting for id %d", i);
  371. if (au->id.waiting < i)
  372. au->id.waiting = i;
  373. SIGNAL (&(au->work));
  374. WAIT_SIGNAL_MUTEX (&(au->id.done),
  375. (au->id.low >= au->id.waiting || au->empty), &au->lock);
  376. LOCK (&au->lock);
  377. ret = collect_async_errors (cmp, au);
  378. UNLOCK (&au->lock);
  379. return ret;
  380. }
  381. /* Perform a wait operation an an asynchronous unit without an ID. */
  382. bool
  383. async_wait (st_parameter_common *cmp, async_unit *au)
  384. {
  385. bool ret;
  386. if (au == NULL)
  387. return false;
  388. if (cmp == NULL)
  389. cmp = au->error.cmp;
  390. LOCK (&(au->lock));
  391. SIGNAL (&(au->work));
  392. if (au->empty)
  393. {
  394. ret = collect_async_errors (cmp, au);
  395. UNLOCK (&au->lock);
  396. return ret;
  397. }
  398. WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
  399. ret = collect_async_errors (cmp, au);
  400. return ret;
  401. }
  402. /* Close an asynchronous unit. */
  403. void
  404. async_close (async_unit *au)
  405. {
  406. if (au == NULL)
  407. return;
  408. NOTE ("Closing async unit");
  409. enqueue_close (au);
  410. T_ERROR (__gthread_join, au->thread, NULL);
  411. free_async_unit (au);
  412. }
  413. #else
  414. /* Only set u->au to NULL so no async I/O will happen. */
  415. void
  416. init_async_unit (gfc_unit *u)
  417. {
  418. u->au = NULL;
  419. return;
  420. }
  421. /* Do-nothing function, which will not be called. */
  422. void
  423. enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
  424. {
  425. return;
  426. }
  427. /* Do-nothing function, which will not be called. */
  428. int
  429. enqueue_done_id (async_unit *au, enum aio_do type)
  430. {
  431. return 0;
  432. }
  433. /* Do-nothing function, which will not be called. */
  434. void
  435. enqueue_done (async_unit *au, enum aio_do type)
  436. {
  437. return;
  438. }
  439. /* Do-nothing function, which will not be called. */
  440. void
  441. enqueue_close (async_unit *au)
  442. {
  443. return;
  444. }
  445. /* Do-nothing function, which will not be called. */
  446. void
  447. enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
  448. {
  449. return;
  450. }
  451. /* Do-nothing function, which will not be called. */
  452. bool
  453. collect_async_errors (st_parameter_common *cmp, async_unit *au)
  454. {
  455. return false;
  456. }
  457. /* Do-nothing function, which will not be called. */
  458. bool
  459. async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
  460. {
  461. return false;
  462. }
  463. /* Do-nothing function, which will not be called. */
  464. bool
  465. async_wait (st_parameter_common *cmp, async_unit *au)
  466. {
  467. return false;
  468. }
  469. /* Do-nothing function, which will not be called. */
  470. void
  471. async_close (async_unit *au)
  472. {
  473. return;
  474. }
  475. #endif