]> git.lizzy.rs Git - rust.git/blob - src/libcore/pipes.rs
core: Remove use of deprecated `drop`
[rust.git] / src / libcore / pipes.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*! Runtime support for message passing with protocol enforcement.
12
13
14 Pipes consist of two endpoints. One endpoint can send messages and
15 the other can receive messages. The set of legal messages and which
16 directions they can flow at any given point are determined by a
17 protocol. Below is an example protocol.
18
19 ~~~
20 proto! pingpong (
21     ping: send {
22         ping -> pong
23     }
24     pong: recv {
25         pong -> ping
26     }
27 )
28 ~~~
29
30 The `proto!` syntax extension will convert this into a module called
31 `pingpong`, which includes a set of types and functions that can be
32 used to write programs that follow the pingpong protocol.
33
34 */
35
36 /* IMPLEMENTATION NOTES
37
38 The initial design for this feature is available at:
39
40 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
41
42 Much of the design in that document is still accurate. There are
43 several components for the pipe implementation. First of all is the
44 syntax extension. To see how that works, it is best see comments in
45 libsyntax/ext/pipes.rs.
46
47 This module includes two related pieces of the runtime
48 implementation: support for unbounded and bounded
49 protocols. The main difference between the two is the type of the
50 buffer that is carried along in the endpoint data structures.
51
52
53 The heart of the implementation is the packet type. It contains a
54 header and a payload field. Much of the code in this module deals with
55 the header field. This is where the synchronization information is
56 stored. In the case of a bounded protocol, the header also includes a
57 pointer to the buffer the packet is contained in.
58
59 Packets represent a single message in a protocol. The payload field
60 gets instatiated at the type of the message, which is usually an enum
61 generated by the pipe compiler. Packets are conceptually single use,
62 although in bounded protocols they are reused each time around the
63 loop.
64
65
66 Packets are usually handled through a send_packet_buffered or
67 recv_packet_buffered object. Each packet is referenced by one
68 send_packet and one recv_packet, and these wrappers enforce that only
69 one end can send and only one end can receive. The structs also
70 include a destructor that marks packets are terminated if the sender
71 or receiver destroys the object before sending or receiving a value.
72
73 The *_packet_buffered structs take two type parameters. The first is
74 the message type for the current packet (or state). The second
75 represents the type of the whole buffer. For bounded protocols, the
76 protocol compiler generates a struct with a field for each protocol
77 state. This generated struct is used as the buffer type parameter. For
78 unbounded protocols, the buffer is simply one packet, so there is a
79 shorthand struct called send_packet and recv_packet, where the buffer
80 type is just `packet<T>`. Using the same underlying structure for both
81 bounded and unbounded protocols allows for less code duplication.
82
83 */
84
85 use cast::{forget, transmute, transmute_copy};
86 use either::{Either, Left, Right};
87 use kinds::Owned;
88 use libc;
89 use ops::Drop;
90 use option::{None, Option, Some};
91 use unstable::intrinsics;
92 use ptr;
93 use task;
94 use vec;
95
96 static SPIN_COUNT: uint = 0;
97
98 macro_rules! move_it (
99     { $x:expr } => ( unsafe { let y = *ptr::to_unsafe_ptr(&($x)); y } )
100 )
101
102 #[deriving(Eq)]
103 enum State {
104     Empty,
105     Full,
106     Blocked,
107     Terminated
108 }
109
110 pub struct BufferHeader {
111     // Tracks whether this buffer needs to be freed. We can probably
112     // get away with restricting it to 0 or 1, if we're careful.
113     mut ref_count: int,
114
115     // We may want a drop, and to be careful about stringing this
116     // thing along.
117 }
118
119 pub fn BufferHeader() -> BufferHeader {
120     BufferHeader {
121         ref_count: 0
122     }
123 }
124
125 // This is for protocols to associate extra data to thread around.
126 pub struct Buffer<T> {
127     header: BufferHeader,
128     data: T,
129 }
130
131 pub struct PacketHeader {
132     mut state: State,
133     mut blocked_task: *rust_task,
134
135     // This is a transmute_copy of a ~buffer, that can also be cast
136     // to a buffer_header if need be.
137     mut buffer: *libc::c_void,
138 }
139
140 pub fn PacketHeader() -> PacketHeader {
141     PacketHeader {
142         state: Empty,
143         blocked_task: ptr::null(),
144         buffer: ptr::null()
145     }
146 }
147
148 pub impl PacketHeader {
149     // Returns the old state.
150     unsafe fn mark_blocked(&self, this: *rust_task) -> State {
151         rustrt::rust_task_ref(this);
152         let old_task = swap_task(&mut self.blocked_task, this);
153         assert!(old_task.is_null());
154         swap_state_acq(&mut self.state, Blocked)
155     }
156
157     unsafe fn unblock(&self) {
158         let old_task = swap_task(&mut self.blocked_task, ptr::null());
159         if !old_task.is_null() {
160             rustrt::rust_task_deref(old_task)
161         }
162         match swap_state_acq(&mut self.state, Empty) {
163           Empty | Blocked => (),
164           Terminated => self.state = Terminated,
165           Full => self.state = Full
166         }
167     }
168
169     // unsafe because this can do weird things to the space/time
170     // continuum. It ends making multiple unique pointers to the same
171     // thing. You'll proobably want to forget them when you're done.
172     unsafe fn buf_header(&self) -> ~BufferHeader {
173         assert!(self.buffer.is_not_null());
174         transmute_copy(&self.buffer)
175     }
176
177     fn set_buffer<T:Owned>(&self, b: ~Buffer<T>) {
178         unsafe {
179             self.buffer = transmute_copy(&b);
180         }
181     }
182 }
183
184 pub struct Packet<T> {
185     header: PacketHeader,
186     mut payload: Option<T>,
187 }
188
189 pub trait HasBuffer {
190     fn set_buffer(&self, b: *libc::c_void);
191 }
192
193 impl<T:Owned> HasBuffer for Packet<T> {
194     fn set_buffer(&self, b: *libc::c_void) {
195         self.header.buffer = b;
196     }
197 }
198
199 pub fn mk_packet<T:Owned>() -> Packet<T> {
200     Packet {
201         header: PacketHeader(),
202         payload: None,
203     }
204 }
205 fn unibuffer<T>() -> ~Buffer<Packet<T>> {
206     let b = ~Buffer {
207         header: BufferHeader(),
208         data: Packet {
209             header: PacketHeader(),
210             payload: None,
211         }
212     };
213
214     unsafe {
215         b.data.header.buffer = transmute_copy(&b);
216     }
217     b
218 }
219
220 pub fn packet<T>() -> *Packet<T> {
221     let b = unibuffer();
222     let p = ptr::to_unsafe_ptr(&(b.data));
223     // We'll take over memory management from here.
224     unsafe { forget(b) }
225     p
226 }
227
228 pub fn entangle_buffer<T:Owned,Tstart:Owned>(
229     buffer: ~Buffer<T>,
230     init: &fn(*libc::c_void, x: &T) -> *Packet<Tstart>)
231     -> (SendPacketBuffered<Tstart, T>, RecvPacketBuffered<Tstart, T>)
232 {
233     let p = init(unsafe { transmute_copy(&buffer) }, &buffer.data);
234     unsafe { forget(buffer) }
235     (SendPacketBuffered(p), RecvPacketBuffered(p))
236 }
237
238 pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
239     // It might be worth making both acquire and release versions of
240     // this.
241     unsafe {
242         transmute(intrinsics::atomic_xchg(transmute(dst), src as int))
243     }
244 }
245
246 #[allow(non_camel_case_types)]
247 pub type rust_task = libc::c_void;
248
249 pub mod rustrt {
250     use libc;
251     use super::rust_task;
252
253     pub extern {
254         #[rust_stack]
255         unsafe fn rust_get_task() -> *rust_task;
256         #[rust_stack]
257         unsafe fn rust_task_ref(task: *rust_task);
258         unsafe fn rust_task_deref(task: *rust_task);
259
260         #[rust_stack]
261         unsafe fn task_clear_event_reject(task: *rust_task);
262
263         unsafe fn task_wait_event(this: *rust_task,
264                                   killed: &mut *libc::c_void)
265                                -> bool;
266         unsafe fn task_signal_event(target: *rust_task, event: *libc::c_void);
267     }
268 }
269
270 fn wait_event(this: *rust_task) -> *libc::c_void {
271     unsafe {
272         let mut event = ptr::null();
273
274         let killed = rustrt::task_wait_event(this, &mut event);
275         if killed && !task::failing() {
276             fail!(~"killed")
277         }
278         event
279     }
280 }
281
282 fn swap_state_acq(dst: &mut State, src: State) -> State {
283     unsafe {
284         transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int))
285     }
286 }
287
288 fn swap_state_rel(dst: &mut State, src: State) -> State {
289     unsafe {
290         transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int))
291     }
292 }
293
294 pub unsafe fn get_buffer<T>(p: *PacketHeader) -> ~Buffer<T> {
295     transmute((*p).buf_header())
296 }
297
298 // This could probably be done with SharedMutableState to avoid move_it!().
299 struct BufferResource<T> {
300     buffer: ~Buffer<T>,
301
302 }
303
304 #[unsafe_destructor]
305 impl<T> ::ops::Drop for BufferResource<T> {
306     fn finalize(&self) {
307         unsafe {
308             let b = move_it!(self.buffer);
309             //let p = ptr::to_unsafe_ptr(*b);
310             //error!("drop %?", p);
311             let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1);
312             //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
313             if old_count == 1 {
314                 // The new count is 0.
315
316                 // go go gadget drop glue
317             }
318             else {
319                 forget(b)
320             }
321         }
322     }
323 }
324
325 fn BufferResource<T>(b: ~Buffer<T>) -> BufferResource<T> {
326     //let p = ptr::to_unsafe_ptr(*b);
327     //error!("take %?", p);
328     unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) };
329
330     BufferResource {
331         // tjc: ????
332         buffer: b
333     }
334 }
335
336 pub fn send<T,Tbuffer>(p: SendPacketBuffered<T,Tbuffer>, payload: T) -> bool {
337     let header = p.header();
338     let p_ = p.unwrap();
339     let p = unsafe { &*p_ };
340     assert!(ptr::to_unsafe_ptr(&(p.header)) == header);
341     assert!(p.payload.is_none());
342     p.payload = Some(payload);
343     let old_state = swap_state_rel(&mut p.header.state, Full);
344     match old_state {
345         Empty => {
346             // Yay, fastpath.
347
348             // The receiver will eventually clean this up.
349             //unsafe { forget(p); }
350             return true;
351         }
352         Full => fail!(~"duplicate send"),
353         Blocked => {
354             debug!("waking up task for %?", p_);
355             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
356             if !old_task.is_null() {
357                 unsafe {
358                     rustrt::task_signal_event(
359                         old_task,
360                         ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
361                     rustrt::rust_task_deref(old_task);
362                 }
363             }
364
365             // The receiver will eventually clean this up.
366             //unsafe { forget(p); }
367             return true;
368         }
369         Terminated => {
370             // The receiver will never receive this. Rely on drop_glue
371             // to clean everything up.
372             return false;
373         }
374     }
375 }
376
377 /** Receives a message from a pipe.
378
379 Fails if the sender closes the connection.
380
381 */
382 pub fn recv<T:Owned,Tbuffer:Owned>(
383     p: RecvPacketBuffered<T, Tbuffer>) -> T {
384     try_recv(p).expect("connection closed")
385 }
386
387 /** Attempts to receive a message from a pipe.
388
389 Returns `None` if the sender has closed the connection without sending
390 a message, or `Some(T)` if a message was received.
391
392 */
393 pub fn try_recv<T:Owned,Tbuffer:Owned>(p: RecvPacketBuffered<T, Tbuffer>)
394     -> Option<T>
395 {
396     let p_ = p.unwrap();
397     let p = unsafe { &*p_ };
398
399     struct DropState<'self> {
400         p: &'self PacketHeader,
401     }
402
403     #[unsafe_destructor]
404     impl<'self> Drop for DropState<'self> {
405         fn finalize(&self) {
406             unsafe {
407                 if task::failing() {
408                     self.p.state = Terminated;
409                     let old_task = swap_task(&mut self.p.blocked_task,
410                                              ptr::null());
411                     if !old_task.is_null() {
412                         rustrt::rust_task_deref(old_task);
413                     }
414                 }
415             }
416         }
417     }
418
419     let _drop_state = DropState { p: &p.header };
420
421     // optimistic path
422     match p.header.state {
423       Full => {
424         let mut payload = None;
425         payload <-> p.payload;
426         p.header.state = Empty;
427         return Some(payload.unwrap())
428       },
429       Terminated => return None,
430       _ => {}
431     }
432
433     // regular path
434     let this = unsafe { rustrt::rust_get_task() };
435     unsafe {
436         rustrt::task_clear_event_reject(this);
437         rustrt::rust_task_ref(this);
438     };
439     debug!("blocked = %x this = %x", p.header.blocked_task as uint,
440            this as uint);
441     let old_task = swap_task(&mut p.header.blocked_task, this);
442     debug!("blocked = %x this = %x old_task = %x",
443            p.header.blocked_task as uint,
444            this as uint, old_task as uint);
445     assert!(old_task.is_null());
446     let mut first = true;
447     let mut count = SPIN_COUNT;
448     loop {
449         unsafe {
450             rustrt::task_clear_event_reject(this);
451         }
452
453         let old_state = swap_state_acq(&mut p.header.state,
454                                        Blocked);
455         match old_state {
456           Empty => {
457             debug!("no data available on %?, going to sleep.", p_);
458             if count == 0 {
459                 wait_event(this);
460             }
461             else {
462                 count -= 1;
463                 // FIXME (#524): Putting the yield here destroys a lot
464                 // of the benefit of spinning, since we still go into
465                 // the scheduler at every iteration. However, without
466                 // this everything spins too much because we end up
467                 // sometimes blocking the thing we are waiting on.
468                 task::yield();
469             }
470             debug!("woke up, p.state = %?", copy p.header.state);
471           }
472           Blocked => if first {
473             fail!(~"blocking on already blocked packet")
474           },
475           Full => {
476             let mut payload = None;
477             payload <-> p.payload;
478             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
479             if !old_task.is_null() {
480                 unsafe {
481                     rustrt::rust_task_deref(old_task);
482                 }
483             }
484             p.header.state = Empty;
485             return Some(payload.unwrap())
486           }
487           Terminated => {
488             // This assert detects when we've accidentally unsafely
489             // casted too big of a number to a state.
490             assert!(old_state == Terminated);
491
492             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
493             if !old_task.is_null() {
494                 unsafe {
495                     rustrt::rust_task_deref(old_task);
496                 }
497             }
498             return None;
499           }
500         }
501         first = false;
502     }
503 }
504
505 /// Returns true if messages are available.
506 pub fn peek<T:Owned,Tb:Owned>(p: &RecvPacketBuffered<T, Tb>) -> bool {
507     match unsafe {(*p.header()).state} {
508       Empty | Terminated => false,
509       Blocked => fail!(~"peeking on blocked packet"),
510       Full => true
511     }
512 }
513
514 fn sender_terminate<T:Owned>(p: *Packet<T>) {
515     let p = unsafe { &*p };
516     match swap_state_rel(&mut p.header.state, Terminated) {
517       Empty => {
518         // The receiver will eventually clean up.
519       }
520       Blocked => {
521         // wake up the target
522         let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
523         if !old_task.is_null() {
524             unsafe {
525                 rustrt::task_signal_event(
526                     old_task,
527                     ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
528                 rustrt::rust_task_deref(old_task);
529             }
530         }
531         // The receiver will eventually clean up.
532       }
533       Full => {
534         // This is impossible
535         fail!(~"you dun goofed")
536       }
537       Terminated => {
538         assert!(p.header.blocked_task.is_null());
539         // I have to clean up, use drop_glue
540       }
541     }
542 }
543
544 fn receiver_terminate<T:Owned>(p: *Packet<T>) {
545     let p = unsafe { &*p };
546     match swap_state_rel(&mut p.header.state, Terminated) {
547       Empty => {
548         assert!(p.header.blocked_task.is_null());
549         // the sender will clean up
550       }
551       Blocked => {
552         let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
553         if !old_task.is_null() {
554             unsafe {
555                 rustrt::rust_task_deref(old_task);
556                 assert!(old_task == rustrt::rust_get_task());
557             }
558         }
559       }
560       Terminated | Full => {
561         assert!(p.header.blocked_task.is_null());
562         // I have to clean up, use drop_glue
563       }
564     }
565 }
566
567 /** Returns when one of the packet headers reports data is available.
568
569 This function is primarily intended for building higher level waiting
570 functions, such as `select`, `select2`, etc.
571
572 It takes a vector slice of packet_headers and returns an index into
573 that vector. The index points to an endpoint that has either been
574 closed by the sender or has a message waiting to be received.
575
576 */
577 pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
578     let this = unsafe { rustrt::rust_get_task() };
579
580     unsafe {
581         rustrt::task_clear_event_reject(this);
582     }
583
584     let mut data_avail = false;
585     let mut ready_packet = pkts.len();
586     for pkts.eachi |i, p| {
587         unsafe {
588             let p = &*p.header();
589             let old = p.mark_blocked(this);
590             match old {
591               Full | Terminated => {
592                 data_avail = true;
593                 ready_packet = i;
594                 (*p).state = old;
595                 break;
596               }
597               Blocked => fail!(~"blocking on blocked packet"),
598               Empty => ()
599             }
600         }
601     }
602
603     while !data_avail {
604         debug!("sleeping on %? packets", pkts.len());
605         let event = wait_event(this) as *PacketHeader;
606         let pos = vec::position(pkts, |p| p.header() == event);
607
608         match pos {
609           Some(i) => {
610             ready_packet = i;
611             data_avail = true;
612           }
613           None => debug!("ignoring spurious event, %?", event)
614         }
615     }
616
617     debug!("%?", pkts[ready_packet]);
618
619     for pkts.each |p| { unsafe{ (*p.header()).unblock()} }
620
621     debug!("%?, %?", ready_packet, pkts[ready_packet]);
622
623     unsafe {
624         assert!((*pkts[ready_packet].header()).state == Full
625                      || (*pkts[ready_packet].header()).state == Terminated);
626     }
627
628     ready_packet
629 }
630
631 /** The sending end of a pipe. It can be used to send exactly one
632 message.
633
634 */
635 pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
636
637 pub fn SendPacket<T>(p: *Packet<T>) -> SendPacket<T> {
638     SendPacketBuffered(p)
639 }
640
641 pub struct SendPacketBuffered<T, Tbuffer> {
642     mut p: Option<*Packet<T>>,
643     mut buffer: Option<BufferResource<Tbuffer>>,
644 }
645
646 #[unsafe_destructor]
647 impl<T:Owned,Tbuffer:Owned> ::ops::Drop for SendPacketBuffered<T,Tbuffer> {
648     fn finalize(&self) {
649         //if self.p != none {
650         //    debug!("drop send %?", option::get(self.p));
651         //}
652         if self.p != None {
653             let mut p = None;
654             p <-> self.p;
655             sender_terminate(p.unwrap())
656         }
657         //unsafe { error!("send_drop: %?",
658         //                if self.buffer == none {
659         //                    "none"
660         //                } else { "some" }); }
661     }
662 }
663
664 pub fn SendPacketBuffered<T,Tbuffer>(p: *Packet<T>)
665     -> SendPacketBuffered<T, Tbuffer> {
666         //debug!("take send %?", p);
667     SendPacketBuffered {
668         p: Some(p),
669         buffer: unsafe {
670             Some(BufferResource(
671                 get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
672         }
673     }
674 }
675
676 pub impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
677     fn unwrap(&self) -> *Packet<T> {
678         let mut p = None;
679         p <-> self.p;
680         p.unwrap()
681     }
682
683     fn header(&self) -> *PacketHeader {
684         match self.p {
685           Some(packet) => unsafe {
686             let packet = &*packet;
687             let header = ptr::to_unsafe_ptr(&(packet.header));
688             //forget(packet);
689             header
690           },
691           None => fail!(~"packet already consumed")
692         }
693     }
694
695     fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
696         //error!("send reuse_buffer");
697         let mut tmp = None;
698         tmp <-> self.buffer;
699         tmp.unwrap()
700     }
701 }
702
703 /// Represents the receive end of a pipe. It can receive exactly one
704 /// message.
705 pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
706
707 pub fn RecvPacket<T>(p: *Packet<T>) -> RecvPacket<T> {
708     RecvPacketBuffered(p)
709 }
710 pub struct RecvPacketBuffered<T, Tbuffer> {
711     mut p: Option<*Packet<T>>,
712     mut buffer: Option<BufferResource<Tbuffer>>,
713 }
714
715 #[unsafe_destructor]
716 impl<T:Owned,Tbuffer:Owned> ::ops::Drop for RecvPacketBuffered<T,Tbuffer> {
717     fn finalize(&self) {
718         //if self.p != none {
719         //    debug!("drop recv %?", option::get(self.p));
720         //}
721         if self.p != None {
722             let mut p = None;
723             p <-> self.p;
724             receiver_terminate(p.unwrap())
725         }
726         //unsafe { error!("recv_drop: %?",
727         //                if self.buffer == none {
728         //                    "none"
729         //                } else { "some" }); }
730     }
731 }
732
733 pub impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
734     fn unwrap(&self) -> *Packet<T> {
735         let mut p = None;
736         p <-> self.p;
737         p.unwrap()
738     }
739
740     fn reuse_buffer(&self) -> BufferResource<Tbuffer> {
741         //error!("recv reuse_buffer");
742         let mut tmp = None;
743         tmp <-> self.buffer;
744         tmp.unwrap()
745     }
746 }
747
748 impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
749     fn header(&self) -> *PacketHeader {
750         match self.p {
751           Some(packet) => unsafe {
752             let packet = &*packet;
753             let header = ptr::to_unsafe_ptr(&(packet.header));
754             //forget(packet);
755             header
756           },
757           None => fail!(~"packet already consumed")
758         }
759     }
760 }
761
762 pub fn RecvPacketBuffered<T,Tbuffer>(p: *Packet<T>)
763     -> RecvPacketBuffered<T,Tbuffer> {
764     //debug!("take recv %?", p);
765     RecvPacketBuffered {
766         p: Some(p),
767         buffer: unsafe {
768             Some(BufferResource(
769                 get_buffer(ptr::to_unsafe_ptr(&((*p).header)))))
770         }
771     }
772 }
773
774 pub fn entangle<T>() -> (SendPacket<T>, RecvPacket<T>) {
775     let p = packet();
776     (SendPacket(p), RecvPacket(p))
777 }
778
779 /** Receives a message from one of two endpoints.
780
781 The return value is `left` if the first endpoint received something,
782 or `right` if the second endpoint receives something. In each case,
783 the result includes the other endpoint as well so it can be used
784 again. Below is an example of using `select2`.
785
786 ~~~
787 match select2(a, b) {
788   left((none, b)) {
789     // endpoint a was closed.
790   }
791   right((a, none)) {
792     // endpoint b was closed.
793   }
794   left((Some(_), b)) {
795     // endpoint a received a message
796   }
797   right(a, Some(_)) {
798     // endpoint b received a message.
799   }
800 }
801 ~~~
802
803 Sometimes messages will be available on both endpoints at once. In
804 this case, `select2` may return either `left` or `right`.
805
806 */
807 pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
808     a: RecvPacketBuffered<A, Ab>,
809     b: RecvPacketBuffered<B, Bb>)
810     -> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
811               (RecvPacketBuffered<A, Ab>, Option<B>)>
812 {
813     let i = wait_many([a.header(), b.header()]);
814
815     match i {
816       0 => Left((try_recv(a), b)),
817       1 => Right((a, try_recv(b))),
818       _ => fail!(~"select2 return an invalid packet")
819     }
820 }
821
822 pub trait Selectable {
823     fn header(&self) -> *PacketHeader;
824 }
825
826 impl Selectable for *PacketHeader {
827     fn header(&self) -> *PacketHeader { *self }
828 }
829
830 /// Returns the index of an endpoint that is ready to receive.
831 pub fn selecti<T:Selectable>(endpoints: &[T]) -> uint {
832     wait_many(endpoints)
833 }
834
835 /// Returns 0 or 1 depending on which endpoint is ready to receive
836 pub fn select2i<A:Selectable,B:Selectable>(a: &A, b: &B) ->
837         Either<(), ()> {
838     match wait_many([a.header(), b.header()]) {
839       0 => Left(()),
840       1 => Right(()),
841       _ => fail!(~"wait returned unexpected index")
842     }
843 }
844
845 /** Waits on a set of endpoints. Returns a message, its index, and a
846  list of the remaining endpoints.
847
848 */
849 pub fn select<T:Owned,Tb:Owned>(endpoints: ~[RecvPacketBuffered<T, Tb>])
850     -> (uint, Option<T>, ~[RecvPacketBuffered<T, Tb>])
851 {
852     let ready = wait_many(endpoints.map(|p| p.header()));
853     let mut remaining = endpoints;
854     let port = remaining.swap_remove(ready);
855     let result = try_recv(port);
856     (ready, result, remaining)
857 }
858
859 pub mod rt {
860     use option::{None, Option, Some};
861
862     // These are used to hide the option constructors from the
863     // compiler because their names are changing
864     pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
865     pub fn make_none<T>() -> Option<T> { None }
866 }
867
868 #[cfg(test)]
869 mod test {
870     use either::Right;
871     use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
872                GenericChan, Peekable};
873
874     #[test]
875     fn test_select2() {
876         let (p1, c1) = stream();
877         let (p2, c2) = stream();
878
879         c1.send(~"abc");
880
881         match (p1, p2).select() {
882           Right(_) => fail!(),
883           _ => ()
884         }
885
886         c2.send(123);
887     }
888
889     #[test]
890     fn test_oneshot() {
891         let (p, c) = oneshot();
892
893         c.send(());
894
895         recv_one(p)
896     }
897
898     #[test]
899     fn test_peek_terminated() {
900         let (port, chan): (Port<int>, Chan<int>) = stream();
901
902         {
903             // Destroy the channel
904             let _chan = chan;
905         }
906
907         assert!(!port.peek());
908     }
909 }