Ruby  2.1.10p492(2016-04-01revision54464)
thread.c
Go to the documentation of this file.
1 #include <ruby.h>
2 
3 enum {
5 };
6 
7 enum {
8  QUEUE_QUE = 0,
12 };
13 
14 #define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
15 
16 #define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
17 #define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
18 #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
19 #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
20 #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
21 
22 static VALUE
24 {
25  VALUE ary = RSTRUCT_GET(obj, idx);
26  if (!RB_TYPE_P(ary, T_ARRAY)) {
27  rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
28  }
29  return ary;
30 }
31 
32 static VALUE
34 {
35  return rb_ary_tmp_new(1);
36 }
37 
38 static void
40 {
41  VALUE thread;
42 
43  while (!NIL_P(thread = rb_ary_shift(list))) {
44  if (RTEST(rb_thread_wakeup_alive(thread))) break;
45  }
46 }
47 
48 static void
50 {
51  VALUE thread;
52  long i;
53 
54  for (i=0; i<RARRAY_LEN(list); i++) {
55  thread = RARRAY_AREF(list, i);
56  rb_thread_wakeup_alive(thread);
57  }
58  rb_ary_clear(list);
59 }
60 
61 /*
62  * Document-class: ConditionVariable
63  *
64  * ConditionVariable objects augment class Mutex. Using condition variables,
65  * it is possible to suspend while in the middle of a critical section until a
66  * resource becomes available.
67  *
68  * Example:
69  *
70  * require 'thread'
71  *
72  * mutex = Mutex.new
73  * resource = ConditionVariable.new
74  *
75  * a = Thread.new {
76  * mutex.synchronize {
77  * # Thread 'a' now needs the resource
78  * resource.wait(mutex)
79  * # 'a' can now have the resource
80  * }
81  * }
82  *
83  * b = Thread.new {
84  * mutex.synchronize {
85  * # Thread 'b' has finished using the resource
86  * resource.signal
87  * }
88  * }
89  */
90 
91 /*
92  * Document-method: ConditionVariable::new
93  *
94  * Creates a new condition variable instance.
95  */
96 
97 static VALUE
99 {
101  return self;
102 }
103 
104 struct sleep_call {
107 };
108 
109 static ID id_sleep;
110 
111 static VALUE
113 {
114  struct sleep_call *p = (struct sleep_call *)args;
115  return rb_funcall2(p->mutex, id_sleep, 1, &p->timeout);
116 }
117 
118 static VALUE
120 {
121  return rb_ary_delete(ary, rb_thread_current());
122 }
123 
124 /*
125  * Document-method: ConditionVariable#wait
126  * call-seq: wait(mutex, timeout=nil)
127  *
128  * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
129  *
130  * If +timeout+ is given, this method returns after +timeout+ seconds passed,
131  * even if no other thread doesn't signal.
132  */
133 
134 static VALUE
136 {
137  VALUE waiters = GET_CONDVAR_WAITERS(self);
139  struct sleep_call args;
140 
141  rb_scan_args(argc, argv, "11", &mutex, &timeout);
142 
143  args.mutex = mutex;
144  args.timeout = timeout;
145  rb_ary_push(waiters, rb_thread_current());
146  rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
147 
148  return self;
149 }
150 
151 /*
152  * Document-method: ConditionVariable#signal
153  *
154  * Wakes up the first thread in line waiting for this lock.
155  */
156 
157 static VALUE
159 {
161  return self;
162 }
163 
164 /*
165  * Document-method: ConditionVariable#broadcast
166  *
167  * Wakes up all threads waiting for this lock.
168  */
169 
170 static VALUE
172 {
174  return self;
175 }
176 
177 /*
178  * Document-class: Queue
179  *
180  * This class provides a way to synchronize communication between threads.
181  *
182  * Example:
183  *
184  * require 'thread'
185  * queue = Queue.new
186  *
187  * producer = Thread.new do
188  * 5.times do |i|
189  * sleep rand(i) # simulate expense
190  * queue << i
191  * puts "#{i} produced"
192  * end
193  * end
194  *
195  * consumer = Thread.new do
196  * 5.times do |i|
197  * value = queue.pop
198  * sleep rand(i/2) # simulate expense
199  * puts "consumed #{value}"
200  * end
201  * end
202  *
203  */
204 
205 /*
206  * Document-method: Queue::new
207  *
208  * Creates a new queue instance.
209  */
210 
211 static VALUE
213 {
216  return self;
217 }
218 
219 static VALUE
221 {
222  rb_ary_push(GET_QUEUE_QUE(self), obj);
224  return self;
225 }
226 
227 /*
228  * Document-method: Queue#push
229  * call-seq:
230  * push(object)
231  * enq(object)
232  * <<(object)
233  *
234  * Pushes the given +object+ to the queue.
235  */
236 
237 static VALUE
239 {
240  return queue_do_push(self, obj);
241 }
242 
243 static unsigned long
245 {
246  return RARRAY_LEN(GET_QUEUE_QUE(self));
247 }
248 
249 static unsigned long
251 {
252  return RARRAY_LEN(GET_QUEUE_WAITERS(self));
253 }
254 
258 };
259 
260 static VALUE
262 {
263  rb_ary_delete(p->waiting, p->th);
264  return Qnil;
265 }
266 
267 static VALUE
269 {
271  return Qnil;
272 }
273 
274 static VALUE
275 queue_do_pop(VALUE self, VALUE should_block)
276 {
277  struct waiting_delete args;
278  args.waiting = GET_QUEUE_WAITERS(self);
279  args.th = rb_thread_current();
280 
281  while (queue_length(self) == 0) {
282  if (!(int)should_block) {
283  rb_raise(rb_eThreadError, "queue empty");
284  }
285  rb_ary_push(args.waiting, args.th);
287  }
288 
289  return rb_ary_shift(GET_QUEUE_QUE(self));
290 }
291 
292 static VALUE
294 {
295  VALUE should_block = Qtrue;
296  switch (argc) {
297  case 0:
298  break;
299  case 1:
300  should_block = RTEST(argv[0]) ? Qfalse : Qtrue;
301  break;
302  default:
303  rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
304  }
305  return should_block;
306 }
307 
308 /*
309  * Document-method: Queue#pop
310  * call-seq:
311  * pop(non_block=false)
312  * deq(non_block=false)
313  * shift(non_block=false)
314  *
315  * Retrieves data from the queue.
316  *
317  * If the queue is empty, the calling thread is suspended until data is pushed
318  * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
319  * exception is raised.
320  */
321 
322 static VALUE
324 {
325  VALUE should_block = queue_pop_should_block(argc, argv);
326  return queue_do_pop(self, should_block);
327 }
328 
329 /*
330  * Document-method: Queue#empty?
331  * call-seq: empty?
332  *
333  * Returns +true+ if the queue is empty.
334  */
335 
336 static VALUE
338 {
339  return queue_length(self) == 0 ? Qtrue : Qfalse;
340 }
341 
342 /*
343  * Document-method: Queue#clear
344  *
345  * Removes all objects from the queue.
346  */
347 
348 static VALUE
350 {
352  return self;
353 }
354 
355 /*
356  * Document-method: Queue#length
357  * call-seq:
358  * length
359  * size
360  *
361  * Returns the length of the queue.
362  */
363 
364 static VALUE
366 {
367  unsigned long len = queue_length(self);
368  return ULONG2NUM(len);
369 }
370 
371 /*
372  * Document-method: Queue#num_waiting
373  *
374  * Returns the number of threads waiting on the queue.
375  */
376 
377 static VALUE
379 {
380  unsigned long len = queue_num_waiting(self);
381  return ULONG2NUM(len);
382 }
383 
384 /*
385  * Document-class: SizedQueue
386  *
387  * This class represents queues of specified size capacity. The push operation
388  * may be blocked if the capacity is full.
389  *
390  * See Queue for an example of how a SizedQueue works.
391  */
392 
393 /*
394  * Document-method: SizedQueue::new
395  * call-seq: new(max)
396  *
397  * Creates a fixed-length queue with a maximum size of +max+.
398  */
399 
400 static VALUE
402 {
403  long max;
404 
405  max = NUM2LONG(vmax);
406  if (max <= 0) {
407  rb_raise(rb_eArgError, "queue size must be positive");
408  }
409 
413  RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
414 
415  return self;
416 }
417 
418 /*
419  * Document-method: SizedQueue#max
420  *
421  * Returns the maximum size of the queue.
422  */
423 
424 static VALUE
426 {
427  return GET_SZQUEUE_MAX(self);
428 }
429 
430 /*
431  * Document-method: SizedQueue#max=
432  * call-seq: max=(number)
433  *
434  * Sets the maximum size of the queue to the given +number+.
435  */
436 
437 static VALUE
439 {
440  long max = NUM2LONG(vmax), diff = 0;
441  VALUE t;
442 
443  if (max <= 0) {
444  rb_raise(rb_eArgError, "queue size must be positive");
445  }
446  if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
447  diff = max - GET_SZQUEUE_ULONGMAX(self);
448  }
449  RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
450  while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
452  }
453  return vmax;
454 }
455 
456 /*
457  * Document-method: SizedQueue#push
458  * call-seq:
459  * push(object)
460  * enq(object)
461  * <<(object)
462  *
463  * Pushes +object+ to the queue.
464  *
465  * If there is no space left in the queue, waits until space becomes available.
466  */
467 
468 static VALUE
470 {
471  struct waiting_delete args;
472  args.waiting = GET_SZQUEUE_WAITERS(self);
473  args.th = rb_thread_current();
474 
475  while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
476  rb_ary_push(args.waiting, args.th);
478  }
479  return queue_do_push(self, obj);
480 }
481 
482 static VALUE
483 szqueue_do_pop(VALUE self, VALUE should_block)
484 {
485  VALUE retval = queue_do_pop(self, should_block);
486 
487  if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
489  }
490 
491  return retval;
492 }
493 
494 /*
495  * Document-method: SizedQueue#pop
496  * call-seq:
497  * pop(non_block=false)
498  * deq(non_block=false)
499  * shift(non_block=false)
500  *
501  * Retrieves data from the queue.
502  *
503  * If the queue is empty, the calling thread is suspended until data is pushed
504  * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
505  * exception is raised.
506  */
507 
508 static VALUE
510 {
511  VALUE should_block = queue_pop_should_block(argc, argv);
512  return szqueue_do_pop(self, should_block);
513 }
514 
515 /*
516  * Document-method: Queue#clear
517  *
518  * Removes all objects from the queue.
519  */
520 
521 static VALUE
523 {
526  return self;
527 }
528 
529 /*
530  * Document-method: SizedQueue#num_waiting
531  *
532  * Returns the number of threads waiting on the queue.
533  */
534 
535 static VALUE
537 {
538  long len = queue_num_waiting(self);
539  len += RARRAY_LEN(GET_SZQUEUE_WAITERS(self));
540  return ULONG2NUM(len);
541 }
542 
543 #ifndef UNDER_THREAD
544 #define UNDER_THREAD 1
545 #endif
546 
547 static VALUE
549 {
550  rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
551  UNREACHABLE;
552 }
553 
554 void
556 {
557 #if UNDER_THREAD
558 #define ALIAS_GLOBAL_CONST(name) do { \
559  ID id = rb_intern_const(#name); \
560  if (!rb_const_defined_at(rb_cObject, id)) { \
561  rb_const_set(rb_cObject, id, rb_c##name); \
562  } \
563  } while (0)
564 #define OUTER rb_cThread
565 #else
566 #define ALIAS_GLOBAL_CONST(name) do { /* nothing */ } while (0)
567 #define OUTER 0
568 #endif
569 
570  VALUE rb_cConditionVariable = rb_struct_define_without_accessor_under(
571  OUTER,
572  "ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
573  "waiters", NULL);
575  OUTER,
577  "que", "waiters", NULL);
579  OUTER,
580  "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
581  "que", "waiters", "queue_waiters", "size", NULL);
582 
583 #if 0
584  rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */
585  rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */
586  rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */
587 #endif
588 
589  id_sleep = rb_intern("sleep");
590 
591  rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
592  rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
593  rb_undef_method(rb_cConditionVariable, "initialize_copy");
594  rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
595  rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
596  rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
597 
598  rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
599  rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
600  rb_undef_method(rb_cQueue, "initialize_copy");
601  rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
602  rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
603  rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
604  rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
605  rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
606  rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
607 
608  /* Alias for #push. */
609  rb_define_alias(rb_cQueue, "enq", "push");
610  /* Alias for #push. */
611  rb_define_alias(rb_cQueue, "<<", "push");
612  /* Alias for #pop. */
613  rb_define_alias(rb_cQueue, "deq", "pop");
614  /* Alias for #pop. */
615  rb_define_alias(rb_cQueue, "shift", "pop");
616  /* Alias for #length. */
617  rb_define_alias(rb_cQueue, "size", "length");
618 
619  rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
620  rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
621  rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
622  rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1);
623  rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
624  rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
625  rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
626 
627  /* Alias for #push. */
628  rb_define_alias(rb_cSizedQueue, "enq", "push");
629  /* Alias for #push. */
630  rb_define_alias(rb_cSizedQueue, "<<", "push");
631  /* Alias for #pop. */
632  rb_define_alias(rb_cSizedQueue, "deq", "pop");
633  /* Alias for #pop. */
634  rb_define_alias(rb_cSizedQueue, "shift", "pop");
635 
636  rb_provide("thread.rb");
637  ALIAS_GLOBAL_CONST(ConditionVariable);
638  ALIAS_GLOBAL_CONST(Queue);
639  ALIAS_GLOBAL_CONST(SizedQueue);
640 }
VALUE rb_struct_define_without_accessor_under(VALUE outer, const char *class_name, VALUE super, rb_alloc_func_t alloc,...)
Definition: struct.c:270
#define RB_TYPE_P(obj, type)
static VALUE queue_do_pop(VALUE self, VALUE should_block)
Definition: thread.c:275
#define OUTER
static VALUE rb_condvar_broadcast(VALUE self)
Definition: thread.c:171
rb_funcall2(argv[0], id_yield, argc-1, argv+1)
static int max(int a, int b)
Definition: strftime.c:141
static VALUE rb_queue_push(VALUE self, VALUE obj)
Definition: thread.c:238
#define GET_CONDVAR_WAITERS(cv)
Definition: thread.c:14
static VALUE rb_queue_empty_p(VALUE self)
Definition: thread.c:337
static VALUE do_sleep(VALUE args)
Definition: thread.c:112
static ID id_sleep
Definition: thread.c:109
VALUE rb_ary_shift(VALUE ary)
Definition: array.c:995
VALUE rb_eTypeError
Definition: error.c:548
param thread
Definition: tcltklib.c:4127
#define UNREACHABLE
Definition: ruby.h:42
VALUE rb_ary_push(VALUE ary, VALUE item)
Definition: array.c:900
static VALUE rb_szqueue_num_waiting(VALUE self)
Definition: thread.c:536
VALUE rb_ary_tmp_new(long capa)
Definition: array.c:538
NIL_P(eventloop_thread)
Definition: tcltklib.c:4056
#define T_ARRAY
VALUE waiting
Definition: thread.c:256
void rb_raise(VALUE exc, const char *fmt,...)
Definition: error.c:1857
static VALUE rb_queue_initialize(VALUE self)
Definition: thread.c:212
return Qtrue
Definition: tcltklib.c:9618
VALUE rb_obj_class(VALUE)
Definition: object.c:226
VALUE rb_ary_clear(VALUE ary)
Definition: array.c:3392
void rb_thread_sleep_deadly(void)
Definition: thread.c:1080
static VALUE rb_szqueue_push(VALUE self, VALUE obj)
Definition: thread.c:469
static VALUE rb_condvar_initialize(VALUE self)
Definition: thread.c:98
static void wakeup_all_threads(VALUE list)
Definition: thread.c:49
void rb_undef_method(VALUE klass, const char *name)
Definition: class.c:1497
static VALUE queue_pop_should_block(int argc, VALUE *argv)
Definition: thread.c:293
i
Definition: enum.c:446
VALUE ary
Definition: enum.c:674
void rb_provide(const char *)
Definition: load.c:572
return Qfalse
Definition: tcltklib.c:6790
#define RARRAY_LEN(a)
#define Qnil
Definition: enum.c:67
#define GET_QUEUE_QUE(q)
Definition: thread.c:16
#define RARRAY_AREF(a, i)
unsigned long ID
Definition: ripper.y:89
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition: class.c:611
static VALUE VALUE obj
Definition: tcltklib.c:3150
static void wakeup_first_thread(VALUE list)
Definition: thread.c:39
#define GET_SZQUEUE_WAITERS(q)
Definition: thread.c:18
static VALUE rb_szqueue_max_set(VALUE self, VALUE vmax)
Definition: thread.c:438
static unsigned long queue_num_waiting(VALUE self)
Definition: thread.c:250
int len
Definition: enumerator.c:1332
#define RSTRUCT_SET(st, idx, v)
VALUE arg
Definition: enum.c:2427
static VALUE queue_do_push(VALUE self, VALUE obj)
Definition: thread.c:220
#define GET_QUEUE_WAITERS(q)
Definition: thread.c:17
VALUE * argv
Definition: tcltklib.c:1969
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition: class.c:1688
#define RTEST(v)
static VALUE rb_szqueue_max_get(VALUE self)
Definition: thread.c:425
VALUE rb_thread_current(void)
Definition: thread.c:2405
VALUE rb_ary_delete(VALUE ary, VALUE item)
Definition: array.c:2909
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Definition: class.c:1719
static VALUE szqueue_do_pop(VALUE self, VALUE should_block)
Definition: thread.c:483
VALUE retval
Definition: tcltklib.c:7823
int argc
Definition: tcltklib.c:1968
static VALUE undumpable(VALUE obj)
Definition: thread.c:548
VALUE rb_thread_wakeup_alive(VALUE)
Definition: thread.c:2285
VALUE rb_ensure(VALUE(*b_proc)(ANYARGS), VALUE data1, VALUE(*e_proc)(ANYARGS), VALUE data2)
Definition: eval.c:839
static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self)
Definition: thread.c:323
#define GET_SZQUEUE_ULONGMAX(q)
Definition: thread.c:20
VALUE idx
Definition: enumerator.c:499
void Init_thread(void)
Definition: thread.c:555
static VALUE ary_buf_new(void)
Definition: thread.c:33
#define NUM2LONG(x)
#define GET_SZQUEUE_MAX(q)
Definition: thread.c:19
int t
Definition: ripper.c:14879
static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax)
Definition: thread.c:401
static VALUE rb_szqueue_clear(VALUE self)
Definition: thread.c:522
args[0]
Definition: enum.c:585
static VALUE rb_queue_length(VALUE self)
Definition: thread.c:365
static VALUE delete_current_thread(VALUE ary)
Definition: thread.c:119
RUBY_EXTERN VALUE rb_cObject
Definition: ripper.y:1561
static VALUE queue_sleep(VALUE arg)
Definition: thread.c:268
static Bigint * diff(Bigint *a, Bigint *b)
Definition: util.c:1470
struct rb_encoding_entry * list
Definition: encoding.c:47
VALUE timeout
Definition: thread.c:106
static VALUE rb_condvar_signal(VALUE self)
Definition: thread.c:158
static VALUE get_array(VALUE obj, int idx)
Definition: thread.c:23
static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self)
Definition: thread.c:135
register C_block * p
Definition: crypt.c:309
VALUE mutex
Definition: thread.c:105
#define PRIsVALUE
unsigned long VALUE
Definition: ripper.y:88
#define ALIAS_GLOBAL_CONST(name)
VALUE th
Definition: thread.c:257
#define rb_intern(str)
#define NULL
Definition: _sdbm.c:102
static VALUE rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
Definition: thread.c:509
static VALUE rb_queue_clear(VALUE self)
Definition: thread.c:349
static VALUE rb_queue_num_waiting(VALUE self)
Definition: thread.c:378
void rb_define_method(VALUE klass, const char *name, VALUE(*func)(ANYARGS), int argc)
Definition: class.c:1479
#define ULONG2NUM(x)
VALUE rb_eThreadError
Definition: eval.c:730
VALUE rb_eArgError
Definition: error.c:549
static unsigned long queue_length(VALUE self)
Definition: thread.c:244
VALUE rb_struct_alloc_noinit(VALUE)
Definition: struct.c:235
static VALUE queue_delete_from_waiting(struct waiting_delete *p)
Definition: thread.c:261
#define RSTRUCT_GET(st, idx)