Line data Source code
1 : /* Copyright (C) 2013 The PARI group.
2 :
3 : This file is part of the PARI/GP package.
4 :
5 : PARI/GP is free software; you can redistribute it and/or modify it under the
6 : terms of the GNU General Public License as published by the Free Software
7 : Foundation; either version 2 of the License, or (at your option) any later
8 : version. It is distributed in the hope that it will be useful, but WITHOUT
9 : ANY WARRANTY WHATSOEVER.
10 :
11 : Check the License for details. You should have received a copy of it, along
12 : with the package; see the file 'COPYING'. If not, write to the Free Software
13 : Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */
14 : #include <pthread.h>
15 : #include "pari.h"
16 : #include "paripriv.h"
17 : #include "mt.h"
18 : #if defined(_WIN32)
19 : # include "../systems/mingw/mingw.h"
20 : #endif
21 :
22 : #define DEBUGLEVEL DEBUGLEVEL_mt
23 :
24 : struct mt_queue
25 : {
26 : long no;
27 : pari_sp avma;
28 : struct pari_mainstack *mainstack;
29 : GEN input, output;
30 : GEN worker;
31 : long workid;
32 : pthread_cond_t cond;
33 : pthread_mutex_t mut;
34 : pthread_cond_t *pcond;
35 : pthread_mutex_t *pmut;
36 : };
37 :
38 : struct mt_pstate
39 : {
40 : pthread_t *th;
41 : struct pari_thread *pth;
42 : struct mt_queue *mq;
43 : long n, nbint, last;
44 : long pending;
45 : pthread_cond_t pcond;
46 : pthread_mutex_t pmut;
47 : };
48 :
49 : static THREAD long mt_thread_no = -1;
50 : static struct mt_pstate *pari_mt;
51 :
52 : #define LOCK(x) pthread_mutex_lock(x); do
53 : #define UNLOCK(x) while(0); pthread_mutex_unlock(x)
54 :
55 : void
56 204920137 : mt_sigint_block(void)
57 : {
58 204920137 : if (mt_thread_no>=0)
59 28328044 : pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
60 206477732 : }
61 :
62 : void
63 205454379 : mt_sigint_unblock(void)
64 : {
65 205454379 : if (mt_thread_no>=0)
66 28849145 : pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
67 206472816 : }
68 :
69 : void
70 1847 : mt_err_recover(long er)
71 : {
72 1847 : if (mt_thread_no>=0)
73 : {
74 11 : struct mt_pstate *mt = pari_mt;
75 11 : struct mt_queue *mq = mt->mq+mt_thread_no;
76 11 : GEN err = pari_err_last();
77 11 : err = err_get_num(err)==e_STACK ? err_e_STACK: bin_copy(copy_bin(err));
78 11 : pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
79 11 : LOCK(mq->pmut)
80 : {
81 11 : mq->output = err;
82 11 : pthread_cond_signal(mq->pcond);
83 11 : } UNLOCK(mq->pmut);
84 11 : pthread_exit((void*)1);
85 : }
86 1836 : else mtsingle_err_recover(er);
87 1836 : }
88 :
89 : void
90 0 : mt_break_recover(void)
91 : {
92 0 : if (mt_thread_no<0) mtsingle_err_recover(0);
93 0 : }
94 :
95 : void
96 0 : mt_sigint(void)
97 : {
98 0 : if (pari_mt) pthread_cond_broadcast(&pari_mt->pcond);
99 0 : }
100 :
101 : int
102 220403 : mt_is_parallel(void)
103 : {
104 220403 : return !!pari_mt;
105 : }
106 :
107 : int
108 30907610 : mt_is_thread(void)
109 : {
110 30907610 : return mt_thread_no>=0 ? 1: mtsingle_is_thread();
111 : }
112 :
113 : long
114 383189 : mt_nbthreads(void)
115 : {
116 383189 : return pari_mt ? 1: pari_mt_nbthreads;
117 : }
118 :
119 : void
120 333133 : mt_thread_init(void) { mt_thread_no = 0; }
121 :
122 : void
123 13 : mt_export_add(const char *str, GEN val)
124 : {
125 13 : if (pari_mt)
126 0 : pari_err(e_MISC,"export() not allowed during parallel sections");
127 13 : export_add(str, val);
128 13 : }
129 :
130 : void
131 8 : mt_export_del(const char *str)
132 : {
133 8 : if (pari_mt)
134 0 : pari_err(e_MISC,"unexport() not allowed during parallel sections");
135 8 : export_del(str);
136 8 : }
137 :
138 1 : void mt_broadcast(GEN code) {(void) code;}
139 :
140 266 : void pari_mt_init(void)
141 : {
142 266 : pari_mt = NULL;
143 : #ifdef _SC_NPROCESSORS_CONF
144 266 : if (!pari_mt_nbthreads) pari_mt_nbthreads = sysconf(_SC_NPROCESSORS_CONF);
145 : #elif defined(_WIN32)
146 : if (!pari_mt_nbthreads) pari_mt_nbthreads = win32_nbthreads();
147 : #else
148 : pari_mt_nbthreads = 1;
149 : #endif
150 266 : }
151 :
152 266 : void pari_mt_close(void) { }
153 :
154 : static void
155 334041 : mt_queue_cleanup(void *arg)
156 : {
157 : (void) arg;
158 334041 : pari_thread_close();
159 332921 : }
160 :
161 : static void
162 333653 : mt_queue_unlock(void *arg)
163 333653 : { pthread_mutex_unlock((pthread_mutex_t*) arg); }
164 :
165 : static void*
166 335501 : mt_queue_run(void *arg)
167 : {
168 335501 : GEN args = pari_thread_start((struct pari_thread*) arg);
169 333093 : pari_sp av = avma;
170 333093 : struct mt_queue *mq = (struct mt_queue *) args;
171 333093 : mt_thread_no = mq->no;
172 333093 : pthread_cleanup_push(mt_queue_cleanup,NULL);
173 333832 : LOCK(mq->pmut)
174 : {
175 335506 : mq->mainstack = pari_mainstack;
176 335506 : mq->avma = av;
177 335506 : pthread_cond_signal(mq->pcond);
178 335506 : } UNLOCK(mq->pmut);
179 : for(;;)
180 445957 : {
181 : GEN work, done;
182 781429 : LOCK(&mq->mut)
183 : {
184 781414 : pthread_cleanup_push(mt_queue_unlock, &mq->mut);
185 1203907 : while(!mq->input)
186 757951 : pthread_cond_wait(&mq->cond, &mq->mut);
187 445956 : pthread_cleanup_pop(0);
188 445931 : } UNLOCK(&mq->mut);
189 445949 : pari_mainstack = mq->mainstack;
190 445949 : set_avma(mq->avma);
191 445864 : work = mq->input;
192 445864 : pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
193 445985 : done = closure_callgenvec(mq->worker,work);
194 444699 : pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL);
195 445831 : LOCK(mq->pmut)
196 : {
197 445979 : mq->mainstack = pari_mainstack;
198 445979 : mq->avma = av;
199 445979 : mq->input = NULL;
200 445979 : mq->output = done;
201 445979 : pthread_cond_signal(mq->pcond);
202 445979 : } UNLOCK(mq->pmut);
203 : }
204 : pthread_cleanup_pop(1);
205 : #ifdef __GNUC__
206 : return NULL; /* LCOV_EXCL_LINE */
207 : #endif
208 : }
209 :
210 : static long
211 618618 : mt_queue_check(struct mt_pstate *mt)
212 : {
213 : long i;
214 4555226 : for(i=0; i<mt->n; i++)
215 : {
216 4382589 : struct mt_queue *mq = mt->mq+i;
217 4382589 : if (mq->output) return i;
218 : }
219 172637 : return -1;
220 : }
221 :
222 : static GEN
223 718747 : mtpthread_queue_get(struct mt_state *junk, long *workid, long *pending)
224 : {
225 718747 : struct mt_pstate *mt = pari_mt;
226 : struct mt_queue *mq;
227 718747 : GEN done = NULL;
228 : long last;
229 : (void) junk;
230 718747 : if (mt->nbint<mt->n)
231 : {
232 272764 : mt->last = mt->nbint;
233 272764 : *pending = mt->pending;
234 272764 : return NULL;
235 : }
236 445983 : BLOCK_SIGINT_START
237 445983 : LOCK(&mt->pmut)
238 : {
239 618618 : while ((last = mt_queue_check(mt)) < 0)
240 : {
241 172637 : pthread_cond_wait(&mt->pcond, &mt->pmut);
242 172637 : if (PARI_SIGINT_pending)
243 : {
244 2 : int sig = PARI_SIGINT_pending;
245 2 : PARI_SIGINT_pending = 0;
246 2 : pthread_mutex_unlock(&mt->pmut);
247 2 : PARI_SIGINT_block = 0;
248 2 : raise(sig);
249 0 : PARI_SIGINT_block = 1;
250 0 : pthread_mutex_lock(&mt->pmut);
251 : }
252 : }
253 445981 : } UNLOCK(&mt->pmut);
254 445981 : BLOCK_SIGINT_END
255 445981 : mq = mt->mq+last;
256 445981 : done = gcopy(mq->output);
257 445981 : mq->output = NULL;
258 445981 : if (workid) *workid = mq->workid;
259 445981 : if (typ(done) == t_ERROR)
260 : {
261 5 : if (err_get_num(done)==e_STACK)
262 0 : pari_err(e_STACKTHREAD);
263 : else
264 5 : pari_err(0,done);
265 : }
266 445976 : mt->last = last;
267 445976 : mt->pending--;
268 445976 : *pending = mt->pending;
269 445976 : return done;
270 : }
271 :
272 : static void
273 718747 : mtpthread_queue_submit(struct mt_state *junk, long workid, GEN work)
274 : {
275 718747 : struct mt_pstate *mt = pari_mt;
276 718747 : struct mt_queue *mq = mt->mq+mt->last;
277 : (void) junk;
278 718747 : if (!work) { mt->nbint=mt->n; return; }
279 446037 : BLOCK_SIGINT_START
280 446037 : if (mt->nbint<mt->n)
281 : {
282 335069 : mt->nbint++;
283 335069 : LOCK(mq->pmut)
284 : {
285 411619 : while(!mq->avma)
286 76550 : pthread_cond_wait(mq->pcond, mq->pmut);
287 335069 : } UNLOCK(mq->pmut);
288 : }
289 446037 : LOCK(&mq->mut)
290 : {
291 446037 : mq->output = NULL;
292 446037 : mq->workid = workid;
293 446037 : BLOCK_SIGINT_START
294 : {
295 446037 : pari_sp av = avma;
296 446037 : struct pari_mainstack *st = pari_mainstack;
297 446037 : pari_mainstack = mq->mainstack;
298 446037 : set_avma(mq->avma);
299 446037 : mq->input = gcopy(work);
300 446037 : mq->avma = avma;
301 446037 : mq->mainstack = pari_mainstack;
302 446037 : pari_mainstack = st;
303 446037 : set_avma(av);
304 : }
305 446037 : BLOCK_SIGINT_END
306 446037 : pthread_cond_signal(&mq->cond);
307 446037 : } UNLOCK(&mq->mut);
308 446037 : mt->pending++;
309 446037 : BLOCK_SIGINT_END
310 : }
311 :
312 : void
313 62650 : mt_queue_reset(void)
314 : {
315 62650 : struct mt_pstate *mt = pari_mt;
316 : long i;
317 62650 : BLOCK_SIGINT_START
318 398156 : for (i=0; i<mt->n; i++)
319 335506 : pthread_cancel(mt->th[i]);
320 398156 : for (i=0; i<mt->n; i++)
321 335506 : pthread_join(mt->th[i],NULL);
322 62650 : pari_mt = NULL;
323 62650 : BLOCK_SIGINT_END
324 62650 : if (DEBUGLEVEL) pari_warn(warner,"stopping %ld threads", mt->n);
325 62650 : BLOCK_SIGINT_START
326 398156 : for (i=0;i<mt->n;i++)
327 : {
328 335506 : struct mt_queue *mq = mt->mq+i;
329 335506 : pthread_cond_destroy(&mq->cond);
330 335506 : pthread_mutex_destroy(&mq->mut);
331 335506 : pari_thread_free(&mt->pth[i]);
332 : }
333 62650 : pthread_cond_destroy(&mt->pcond);
334 62650 : pthread_mutex_destroy(&mt->pmut);
335 62650 : BLOCK_SIGINT_END
336 62650 : pari_free(mt->mq);
337 62650 : pari_free(mt->pth);
338 62650 : pari_free(mt->th);
339 62650 : pari_free(mt);
340 62650 : }
341 :
342 : static long
343 62650 : closure_has_clone(GEN fun)
344 : {
345 62650 : if (isclone(fun)) return 1;
346 62644 : if (lg(fun) >= 8)
347 : {
348 62138 : GEN f = closure_get_frame(fun);
349 62138 : long i, l = lg(f);
350 229976 : for (i = 1; i < l; i++)
351 169352 : if (isclone(gel(f,i))) return 1;
352 : }
353 61130 : return 0;
354 : }
355 :
356 : void
357 141620 : mt_queue_start_lim(struct pari_mt *pt, GEN worker, long lim)
358 : {
359 141620 : if (lim==0) lim = pari_mt_nbthreads;
360 141601 : else lim = minss(pari_mt_nbthreads, lim);
361 141620 : if (mt_thread_no >= 0)
362 42370 : mtsequential_queue_start(pt, worker);
363 99250 : else if (pari_mt || lim <= 1)
364 36600 : mtsingle_queue_start(pt, worker);
365 : else
366 : {
367 : struct mt_pstate *mt =
368 62650 : (struct mt_pstate*) pari_malloc(sizeof(struct mt_pstate));
369 62650 : long mtparisize = GP_DATA->threadsize? GP_DATA->threadsize: pari_mainstack->rsize;
370 62650 : long mtparisizemax = GP_DATA->threadsizemax;
371 : long i;
372 62650 : if (closure_has_clone(worker))
373 1520 : worker = gcopy(worker); /* to avoid clone_lock race */
374 62650 : mt->mq = (struct mt_queue *) pari_malloc(sizeof(*mt->mq)*lim);
375 62650 : mt->th = (pthread_t *) pari_malloc(sizeof(*mt->th)*lim);
376 62650 : mt->pth = (struct pari_thread *) pari_malloc(sizeof(*mt->pth)*lim);
377 62650 : mt->pending = 0;
378 62650 : mt->n = lim;
379 62650 : mt->nbint = 0;
380 62650 : mt->last = 0;
381 62650 : pthread_cond_init(&mt->pcond,NULL);
382 62650 : pthread_mutex_init(&mt->pmut,NULL);
383 398156 : for (i=0;i<lim;i++)
384 : {
385 335506 : struct mt_queue *mq = mt->mq+i;
386 335506 : mq->no = i;
387 335506 : mq->avma = 0;
388 335506 : mq->mainstack = NULL;
389 335506 : mq->worker = worker;
390 335506 : mq->input = NULL;
391 335506 : mq->output = NULL;
392 335506 : mq->pcond = &mt->pcond;
393 335506 : mq->pmut = &mt->pmut;
394 335506 : pthread_cond_init(&mq->cond,NULL);
395 335506 : pthread_mutex_init(&mq->mut,NULL);
396 335506 : if (mtparisizemax)
397 0 : pari_thread_valloc(&mt->pth[i],mtparisize,mtparisizemax,(GEN)mq);
398 : else
399 335506 : pari_thread_alloc(&mt->pth[i],mtparisize,(GEN)mq);
400 : }
401 62650 : if (DEBUGLEVEL) pari_warn(warner,"starting %ld threads", lim);
402 62650 : BLOCK_SIGINT_START
403 398156 : for (i=0;i<lim;i++)
404 335506 : pthread_create(&mt->th[i],NULL, &mt_queue_run, (void*)&mt->pth[i]);
405 62650 : pari_mt = mt;
406 62650 : BLOCK_SIGINT_END
407 62650 : pt->get=&mtpthread_queue_get;
408 62650 : pt->submit=&mtpthread_queue_submit;
409 62650 : pt->end=&mt_queue_reset;
410 : }
411 141620 : }
|