]> git.lizzy.rs Git - rust.git/blob - src/libstd/pipes.rs
Change finalize -> drop.
[rust.git] / src / libstd / pipes.rs
1 // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*! Runtime support for message passing with protocol enforcement.
12
13
14 Pipes consist of two endpoints. One endpoint can send messages and
15 the other can receive messages. The set of legal messages and which
16 directions they can flow at any given point are determined by a
17 protocol. Below is an example protocol.
18
19 ~~~ {.rust}
20 proto! pingpong (
21     ping: send {
22         ping -> pong
23     }
24     pong: recv {
25         pong -> ping
26     }
27 )
28 ~~~
29
30 The `proto!` syntax extension will convert this into a module called
31 `pingpong`, which includes a set of types and functions that can be
32 used to write programs that follow the pingpong protocol.
33
34 */
35
36 /* IMPLEMENTATION NOTES
37
38 The initial design for this feature is available at:
39
40 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
41
42 Much of the design in that document is still accurate. There are
43 several components for the pipe implementation. First of all is the
44 syntax extension. To see how that works, it is best see comments in
45 libsyntax/ext/pipes.rs.
46
47 This module includes two related pieces of the runtime
48 implementation: support for unbounded and bounded
49 protocols. The main difference between the two is the type of the
50 buffer that is carried along in the endpoint data structures.
51
52
53 The heart of the implementation is the packet type. It contains a
54 header and a payload field. Much of the code in this module deals with
55 the header field. This is where the synchronization information is
56 stored. In the case of a bounded protocol, the header also includes a
57 pointer to the buffer the packet is contained in.
58
59 Packets represent a single message in a protocol. The payload field
60 gets instatiated at the type of the message, which is usually an enum
61 generated by the pipe compiler. Packets are conceptually single use,
62 although in bounded protocols they are reused each time around the
63 loop.
64
65
66 Packets are usually handled through a send_packet_buffered or
67 recv_packet_buffered object. Each packet is referenced by one
68 send_packet and one recv_packet, and these wrappers enforce that only
69 one end can send and only one end can receive. The structs also
70 include a destructor that marks packets are terminated if the sender
71 or receiver destroys the object before sending or receiving a value.
72
73 The *_packet_buffered structs take two type parameters. The first is
74 the message type for the current packet (or state). The second
75 represents the type of the whole buffer. For bounded protocols, the
76 protocol compiler generates a struct with a field for each protocol
77 state. This generated struct is used as the buffer type parameter. For
78 unbounded protocols, the buffer is simply one packet, so there is a
79 shorthand struct called send_packet and recv_packet, where the buffer
80 type is just `packet<T>`. Using the same underlying structure for both
81 bounded and unbounded protocols allows for less code duplication.
82
83 */
84
85 #[allow(missing_doc)];
86
87 use container::Container;
88 use cast::{forget, transmute, transmute_copy, transmute_mut};
89 use either::{Either, Left, Right};
90 use iterator::IteratorUtil;
91 use kinds::Owned;
92 use libc;
93 use ops::Drop;
94 use option::{None, Option, Some};
95 use unstable::finally::Finally;
96 use unstable::intrinsics;
97 use ptr;
98 use ptr::RawPtr;
99 use task;
100 use vec::{OwnedVector, MutableVector};
101 use util::replace;
102
103 static SPIN_COUNT: uint = 0;
104
105 #[deriving(Eq)]
106 enum State {
107     Empty,
108     Full,
109     Blocked,
110     Terminated
111 }
112
113 pub struct BufferHeader {
114     // Tracks whether this buffer needs to be freed. We can probably
115     // get away with restricting it to 0 or 1, if we're careful.
116     ref_count: int,
117
118     // We may want a drop, and to be careful about stringing this
119     // thing along.
120 }
121
122 pub fn BufferHeader() -> BufferHeader {
123     BufferHeader {
124         ref_count: 0
125     }
126 }
127
128 // This is for protocols to associate extra data to thread around.
129 pub struct Buffer<T> {
130     header: BufferHeader,
131     data: T,
132 }
133
134 pub struct PacketHeader {
135     state: State,
136     blocked_task: *rust_task,
137
138     // This is a transmute_copy of a ~buffer, that can also be cast
139     // to a buffer_header if need be.
140     buffer: *libc::c_void,
141 }
142
143 pub fn PacketHeader() -> PacketHeader {
144     PacketHeader {
145         state: Empty,
146         blocked_task: ptr::null(),
147         buffer: ptr::null()
148     }
149 }
150
151 impl PacketHeader {
152     // Returns the old state.
153     pub unsafe fn mark_blocked(&mut self, this: *rust_task) -> State {
154         rustrt::rust_task_ref(this);
155         let old_task = swap_task(&mut self.blocked_task, this);
156         assert!(old_task.is_null());
157         swap_state_acq(&mut self.state, Blocked)
158     }
159
160     pub unsafe fn unblock(&mut self) {
161         let old_task = swap_task(&mut self.blocked_task, ptr::null());
162         if !old_task.is_null() {
163             rustrt::rust_task_deref(old_task)
164         }
165         match swap_state_acq(&mut self.state, Empty) {
166           Empty | Blocked => (),
167           Terminated => self.state = Terminated,
168           Full => self.state = Full
169         }
170     }
171
172     // unsafe because this can do weird things to the space/time
173     // continuum. It ends making multiple unique pointers to the same
174     // thing. You'll probably want to forget them when you're done.
175     pub unsafe fn buf_header(&mut self) -> ~BufferHeader {
176         assert!(self.buffer.is_not_null());
177         transmute_copy(&self.buffer)
178     }
179
180     pub fn set_buffer<T:Owned>(&mut self, b: ~Buffer<T>) {
181         unsafe {
182             self.buffer = transmute_copy(&b);
183         }
184     }
185 }
186
187 pub struct Packet<T> {
188     header: PacketHeader,
189     payload: Option<T>,
190 }
191
192 pub trait HasBuffer {
193     fn set_buffer(&mut self, b: *libc::c_void);
194 }
195
196 impl<T:Owned> HasBuffer for Packet<T> {
197     fn set_buffer(&mut self, b: *libc::c_void) {
198         self.header.buffer = b;
199     }
200 }
201
202 pub fn mk_packet<T:Owned>() -> Packet<T> {
203     Packet {
204         header: PacketHeader(),
205         payload: None,
206     }
207 }
208 fn unibuffer<T>() -> ~Buffer<Packet<T>> {
209     let mut b = ~Buffer {
210         header: BufferHeader(),
211         data: Packet {
212             header: PacketHeader(),
213             payload: None,
214         }
215     };
216
217     unsafe {
218         b.data.header.buffer = transmute_copy(&b);
219     }
220     b
221 }
222
223 pub fn packet<T>() -> *mut Packet<T> {
224     let mut b = unibuffer();
225     let p = ptr::to_mut_unsafe_ptr(&mut b.data);
226     // We'll take over memory management from here.
227     unsafe {
228         forget(b);
229     }
230     p
231 }
232
233 pub fn entangle_buffer<T:Owned,Tstart:Owned>(
234     mut buffer: ~Buffer<T>,
235     init: &fn(*libc::c_void, x: &mut T) -> *mut Packet<Tstart>)
236     -> (RecvPacketBuffered<Tstart, T>, SendPacketBuffered<Tstart, T>) {
237     unsafe {
238         let p = init(transmute_copy(&buffer), &mut buffer.data);
239         forget(buffer);
240         (RecvPacketBuffered(p), SendPacketBuffered(p))
241     }
242 }
243
244 pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task {
245     // It might be worth making both acquire and release versions of
246     // this.
247     unsafe {
248         transmute(intrinsics::atomic_xchg(transmute(dst), src as int))
249     }
250 }
251
252 #[allow(non_camel_case_types)]
253 pub type rust_task = libc::c_void;
254
255 pub mod rustrt {
256     use libc;
257     use super::rust_task;
258
259     pub extern {
260         #[rust_stack]
261         unsafe fn rust_get_task() -> *rust_task;
262         #[rust_stack]
263         unsafe fn rust_task_ref(task: *rust_task);
264         unsafe fn rust_task_deref(task: *rust_task);
265
266         #[rust_stack]
267         unsafe fn task_clear_event_reject(task: *rust_task);
268
269         unsafe fn task_wait_event(this: *rust_task,
270                                   killed: &mut *libc::c_void)
271                                -> bool;
272         unsafe fn task_signal_event(target: *rust_task, event: *libc::c_void);
273     }
274 }
275
276 fn wait_event(this: *rust_task) -> *libc::c_void {
277     unsafe {
278         let mut event = ptr::null();
279
280         let killed = rustrt::task_wait_event(this, &mut event);
281         if killed && !task::failing() {
282             fail!("killed")
283         }
284         event
285     }
286 }
287
288 fn swap_state_acq(dst: &mut State, src: State) -> State {
289     unsafe {
290         transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int))
291     }
292 }
293
294 fn swap_state_rel(dst: &mut State, src: State) -> State {
295     unsafe {
296         transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int))
297     }
298 }
299
300 pub unsafe fn get_buffer<T>(p: *mut PacketHeader) -> ~Buffer<T> {
301     transmute((*p).buf_header())
302 }
303
304 // This could probably be done with SharedMutableState to avoid move_it!().
305 struct BufferResource<T> {
306     buffer: ~Buffer<T>,
307
308 }
309
310 #[unsafe_destructor]
311 impl<T> Drop for BufferResource<T> {
312     fn drop(&self) {
313         unsafe {
314             // FIXME(#4330) Need self by value to get mutability.
315             let this: &mut BufferResource<T> = transmute_mut(self);
316
317             let null_buffer: ~Buffer<T> = transmute(ptr::null::<Buffer<T>>());
318             let mut b = replace(&mut this.buffer, null_buffer);
319
320             //let p = ptr::to_unsafe_ptr(*b);
321             //error!("drop %?", p);
322             let old_count = intrinsics::atomic_xsub_rel(
323                 &mut b.header.ref_count,
324                 1);
325             //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
326             if old_count == 1 {
327                 // The new count is 0.
328
329                 // go go gadget drop glue
330             }
331             else {
332                 forget(b)
333             }
334         }
335     }
336 }
337
338 fn BufferResource<T>(mut b: ~Buffer<T>) -> BufferResource<T> {
339     //let p = ptr::to_unsafe_ptr(*b);
340     //error!("take %?", p);
341     unsafe {
342         intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1);
343     }
344
345     BufferResource {
346         // tjc: ????
347         buffer: b
348     }
349 }
350
351 pub fn send<T,Tbuffer>(mut p: SendPacketBuffered<T,Tbuffer>,
352                        payload: T)
353                        -> bool {
354     let header = p.header();
355     let p_ = p.unwrap();
356     let p = unsafe { &mut *p_ };
357     assert_eq!(ptr::to_unsafe_ptr(&(p.header)), header);
358     assert!(p.payload.is_none());
359     p.payload = Some(payload);
360     let old_state = swap_state_rel(&mut p.header.state, Full);
361     match old_state {
362         Empty => {
363             // Yay, fastpath.
364
365             // The receiver will eventually clean this up.
366             //unsafe { forget(p); }
367             return true;
368         }
369         Full => fail!("duplicate send"),
370         Blocked => {
371             debug!("waking up task for %?", p_);
372             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
373             if !old_task.is_null() {
374                 unsafe {
375                     rustrt::task_signal_event(
376                         old_task,
377                         ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
378                     rustrt::rust_task_deref(old_task);
379                 }
380             }
381
382             // The receiver will eventually clean this up.
383             //unsafe { forget(p); }
384             return true;
385         }
386         Terminated => {
387             // The receiver will never receive this. Rely on drop_glue
388             // to clean everything up.
389             return false;
390         }
391     }
392 }
393
394 /** Receives a message from a pipe.
395
396 Fails if the sender closes the connection.
397
398 */
399 pub fn recv<T:Owned,Tbuffer:Owned>(
400     p: RecvPacketBuffered<T, Tbuffer>) -> T {
401     try_recv(p).expect("connection closed")
402 }
403
404 /** Attempts to receive a message from a pipe.
405
406 Returns `None` if the sender has closed the connection without sending
407 a message, or `Some(T)` if a message was received.
408
409 */
410 pub fn try_recv<T:Owned,Tbuffer:Owned>(mut p: RecvPacketBuffered<T, Tbuffer>)
411                                        -> Option<T> {
412     let p_ = p.unwrap();
413     let p = unsafe { &mut *p_ };
414
415     do (|| {
416         try_recv_(p)
417     }).finally {
418         unsafe {
419             if task::failing() {
420                 p.header.state = Terminated;
421                 let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
422                 if !old_task.is_null() {
423                     rustrt::rust_task_deref(old_task);
424                 }
425             }
426         }
427     }
428 }
429
430 fn try_recv_<T:Owned>(p: &mut Packet<T>) -> Option<T> {
431     // optimistic path
432     match p.header.state {
433       Full => {
434         let payload = replace(&mut p.payload, None);
435         p.header.state = Empty;
436         return Some(payload.unwrap())
437       },
438       Terminated => return None,
439       _ => {}
440     }
441
442     // regular path
443     let this = unsafe { rustrt::rust_get_task() };
444     unsafe {
445         rustrt::task_clear_event_reject(this);
446         rustrt::rust_task_ref(this);
447     };
448     debug!("blocked = %x this = %x", p.header.blocked_task as uint,
449            this as uint);
450     let old_task = swap_task(&mut p.header.blocked_task, this);
451     debug!("blocked = %x this = %x old_task = %x",
452            p.header.blocked_task as uint,
453            this as uint, old_task as uint);
454     assert!(old_task.is_null());
455     let mut first = true;
456     let mut count = SPIN_COUNT;
457     loop {
458         unsafe {
459             rustrt::task_clear_event_reject(this);
460         }
461
462         let old_state = swap_state_acq(&mut p.header.state,
463                                        Blocked);
464         match old_state {
465           Empty => {
466             debug!("no data available on %?, going to sleep.", p);
467             if count == 0 {
468                 wait_event(this);
469             }
470             else {
471                 count -= 1;
472                 // FIXME (#524): Putting the yield here destroys a lot
473                 // of the benefit of spinning, since we still go into
474                 // the scheduler at every iteration. However, without
475                 // this everything spins too much because we end up
476                 // sometimes blocking the thing we are waiting on.
477                 task::yield();
478             }
479             debug!("woke up, p.state = %?", copy p.header.state);
480           }
481           Blocked => if first {
482             fail!("blocking on already blocked packet")
483           },
484           Full => {
485             let payload = replace(&mut p.payload, None);
486             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
487             if !old_task.is_null() {
488                 unsafe {
489                     rustrt::rust_task_deref(old_task);
490                 }
491             }
492             p.header.state = Empty;
493             return Some(payload.unwrap())
494           }
495           Terminated => {
496             // This assert detects when we've accidentally unsafely
497             // casted too big of a number to a state.
498             assert_eq!(old_state, Terminated);
499
500             let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
501             if !old_task.is_null() {
502                 unsafe {
503                     rustrt::rust_task_deref(old_task);
504                 }
505             }
506             return None;
507           }
508         }
509         first = false;
510     }
511 }
512
513 /// Returns true if messages are available.
514 pub fn peek<T:Owned,Tb:Owned>(p: &mut RecvPacketBuffered<T, Tb>) -> bool {
515     unsafe {
516         match (*p.header()).state {
517             Empty | Terminated => false,
518             Blocked => fail!("peeking on blocked packet"),
519             Full => true
520         }
521     }
522 }
523
524 fn sender_terminate<T:Owned>(p: *mut Packet<T>) {
525     let p = unsafe {
526         &mut *p
527     };
528     match swap_state_rel(&mut p.header.state, Terminated) {
529       Empty => {
530         // The receiver will eventually clean up.
531       }
532       Blocked => {
533         // wake up the target
534         let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
535         if !old_task.is_null() {
536             unsafe {
537                 rustrt::task_signal_event(
538                     old_task,
539                     ptr::to_unsafe_ptr(&(p.header)) as *libc::c_void);
540                 rustrt::rust_task_deref(old_task);
541             }
542         }
543         // The receiver will eventually clean up.
544       }
545       Full => {
546         // This is impossible
547         fail!("you dun goofed")
548       }
549       Terminated => {
550         assert!(p.header.blocked_task.is_null());
551         // I have to clean up, use drop_glue
552       }
553     }
554 }
555
556 fn receiver_terminate<T:Owned>(p: *mut Packet<T>) {
557     let p = unsafe {
558         &mut *p
559     };
560     match swap_state_rel(&mut p.header.state, Terminated) {
561       Empty => {
562         assert!(p.header.blocked_task.is_null());
563         // the sender will clean up
564       }
565       Blocked => {
566         let old_task = swap_task(&mut p.header.blocked_task, ptr::null());
567         if !old_task.is_null() {
568             unsafe {
569                 rustrt::rust_task_deref(old_task);
570                 assert_eq!(old_task, rustrt::rust_get_task());
571             }
572         }
573       }
574       Terminated | Full => {
575         assert!(p.header.blocked_task.is_null());
576         // I have to clean up, use drop_glue
577       }
578     }
579 }
580
581 /** Returns when one of the packet headers reports data is available.
582
583 This function is primarily intended for building higher level waiting
584 functions, such as `select`, `select2`, etc.
585
586 It takes a vector slice of packet_headers and returns an index into
587 that vector. The index points to an endpoint that has either been
588 closed by the sender or has a message waiting to be received.
589
590 */
591 pub fn wait_many<T: Selectable>(pkts: &mut [T]) -> uint {
592     let this = unsafe {
593         rustrt::rust_get_task()
594     };
595
596     unsafe {
597         rustrt::task_clear_event_reject(this);
598     }
599
600     let mut data_avail = false;
601     let mut ready_packet = pkts.len();
602     for pkts.mut_iter().enumerate().advance |(i, p)| {
603         unsafe {
604             let p = &mut *p.header();
605             let old = p.mark_blocked(this);
606             match old {
607                 Full | Terminated => {
608                     data_avail = true;
609                     ready_packet = i;
610                     (*p).state = old;
611                     break;
612                 }
613                 Blocked => fail!("blocking on blocked packet"),
614                 Empty => ()
615             }
616         }
617     }
618
619     while !data_avail {
620         debug!("sleeping on %? packets", pkts.len());
621         let event = wait_event(this) as *PacketHeader;
622
623         let mut pos = None;
624         for pkts.mut_iter().enumerate().advance |(i, p)| {
625             if p.header() == event {
626                 pos = Some(i);
627                 break;
628             }
629         };
630
631         match pos {
632           Some(i) => {
633             ready_packet = i;
634             data_avail = true;
635           }
636           None => debug!("ignoring spurious event, %?", event)
637         }
638     }
639
640     debug!("%?", &mut pkts[ready_packet]);
641
642     for pkts.mut_iter().advance |p| {
643         unsafe {
644             (*p.header()).unblock()
645         }
646     }
647
648     debug!("%?, %?", ready_packet, &mut pkts[ready_packet]);
649
650     unsafe {
651         assert!((*pkts[ready_packet].header()).state == Full
652                      || (*pkts[ready_packet].header()).state == Terminated);
653     }
654
655     ready_packet
656 }
657
658 /** The sending end of a pipe. It can be used to send exactly one
659 message.
660
661 */
662 pub type SendPacket<T> = SendPacketBuffered<T, Packet<T>>;
663
664 pub fn SendPacket<T>(p: *mut Packet<T>) -> SendPacket<T> {
665     SendPacketBuffered(p)
666 }
667
668 pub struct SendPacketBuffered<T, Tbuffer> {
669     p: Option<*mut Packet<T>>,
670     buffer: Option<BufferResource<Tbuffer>>,
671 }
672
673 #[unsafe_destructor]
674 impl<T:Owned,Tbuffer:Owned> Drop for SendPacketBuffered<T,Tbuffer> {
675     fn drop(&self) {
676         unsafe {
677             let this: &mut SendPacketBuffered<T,Tbuffer> = transmute(self);
678             if this.p != None {
679                 let p = replace(&mut this.p, None);
680                 sender_terminate(p.unwrap())
681             }
682         }
683     }
684 }
685
686 pub fn SendPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
687                                      -> SendPacketBuffered<T,Tbuffer> {
688     SendPacketBuffered {
689         p: Some(p),
690         buffer: unsafe {
691             Some(BufferResource(get_buffer(&mut (*p).header)))
692         }
693     }
694 }
695
696 impl<T,Tbuffer> SendPacketBuffered<T,Tbuffer> {
697     pub fn unwrap(&mut self) -> *mut Packet<T> {
698         replace(&mut self.p, None).unwrap()
699     }
700
701     pub fn header(&mut self) -> *mut PacketHeader {
702         match self.p {
703             Some(packet) => unsafe {
704                 let packet = &mut *packet;
705                 let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
706                 header
707             },
708             None => fail!("packet already consumed")
709         }
710     }
711
712     pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
713         //error!("send reuse_buffer");
714         replace(&mut self.buffer, None).unwrap()
715     }
716 }
717
718 /// Represents the receive end of a pipe. It can receive exactly one
719 /// message.
720 pub type RecvPacket<T> = RecvPacketBuffered<T, Packet<T>>;
721
722 pub fn RecvPacket<T>(p: *mut Packet<T>) -> RecvPacket<T> {
723     RecvPacketBuffered(p)
724 }
725
726 pub struct RecvPacketBuffered<T, Tbuffer> {
727     p: Option<*mut Packet<T>>,
728     buffer: Option<BufferResource<Tbuffer>>,
729 }
730
731 #[unsafe_destructor]
732 impl<T:Owned,Tbuffer:Owned> Drop for RecvPacketBuffered<T,Tbuffer> {
733     fn drop(&self) {
734         unsafe {
735             let this: &mut RecvPacketBuffered<T,Tbuffer> = transmute(self);
736             if this.p != None {
737                 let p = replace(&mut this.p, None);
738                 receiver_terminate(p.unwrap())
739             }
740         }
741     }
742 }
743
744 impl<T:Owned,Tbuffer:Owned> RecvPacketBuffered<T, Tbuffer> {
745     pub fn unwrap(&mut self) -> *mut Packet<T> {
746         replace(&mut self.p, None).unwrap()
747     }
748
749     pub fn reuse_buffer(&mut self) -> BufferResource<Tbuffer> {
750         replace(&mut self.buffer, None).unwrap()
751     }
752 }
753
754 impl<T:Owned,Tbuffer:Owned> Selectable for RecvPacketBuffered<T, Tbuffer> {
755     fn header(&mut self) -> *mut PacketHeader {
756         match self.p {
757             Some(packet) => unsafe {
758                 let packet = &mut *packet;
759                 let header = ptr::to_mut_unsafe_ptr(&mut packet.header);
760                 header
761             },
762             None => fail!("packet already consumed")
763         }
764     }
765 }
766
767 pub fn RecvPacketBuffered<T,Tbuffer>(p: *mut Packet<T>)
768                                      -> RecvPacketBuffered<T,Tbuffer> {
769     RecvPacketBuffered {
770         p: Some(p),
771         buffer: unsafe {
772             Some(BufferResource(get_buffer(&mut (*p).header)))
773         }
774     }
775 }
776
777 pub fn entangle<T>() -> (RecvPacket<T>, SendPacket<T>) {
778     let p = packet();
779     (RecvPacket(p), SendPacket(p))
780 }
781
782 /** Receives a message from one of two endpoints.
783
784 The return value is `left` if the first endpoint received something,
785 or `right` if the second endpoint receives something. In each case,
786 the result includes the other endpoint as well so it can be used
787 again. Below is an example of using `select2`.
788
789 ~~~ {.rust}
790 match select2(a, b) {
791     left((none, b)) {
792         // endpoint a was closed.
793     }
794     right((a, none)) {
795         // endpoint b was closed.
796     }
797     left((Some(_), b)) {
798         // endpoint a received a message
799     }
800     right(a, Some(_)) {
801         // endpoint b received a message.
802     }
803 }
804 ~~~
805
806 Sometimes messages will be available on both endpoints at once. In
807 this case, `select2` may return either `left` or `right`.
808
809 */
810 pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
811     mut a: RecvPacketBuffered<A, Ab>,
812     mut b: RecvPacketBuffered<B, Bb>)
813     -> Either<(Option<A>, RecvPacketBuffered<B, Bb>),
814               (RecvPacketBuffered<A, Ab>, Option<B>)> {
815     let mut endpoints = [ a.header(), b.header() ];
816     let i = wait_many(endpoints);
817     match i {
818         0 => Left((try_recv(a), b)),
819         1 => Right((a, try_recv(b))),
820         _ => fail!("select2 return an invalid packet")
821     }
822 }
823
824 pub trait Selectable {
825     fn header(&mut self) -> *mut PacketHeader;
826 }
827
828 impl Selectable for *mut PacketHeader {
829     fn header(&mut self) -> *mut PacketHeader { *self }
830 }
831
832 /// Returns the index of an endpoint that is ready to receive.
833 pub fn selecti<T:Selectable>(endpoints: &mut [T]) -> uint {
834     wait_many(endpoints)
835 }
836
837 /// Returns 0 or 1 depending on which endpoint is ready to receive
838 pub fn select2i<A:Selectable,B:Selectable>(a: &mut A, b: &mut B)
839                                            -> Either<(), ()> {
840     let mut endpoints = [ a.header(), b.header() ];
841     match wait_many(endpoints) {
842         0 => Left(()),
843         1 => Right(()),
844         _ => fail!("wait returned unexpected index")
845     }
846 }
847
848 /// Waits on a set of endpoints. Returns a message, its index, and a
849 /// list of the remaining endpoints.
850 pub fn select<T:Owned,Tb:Owned>(mut endpoints: ~[RecvPacketBuffered<T, Tb>])
851                                 -> (uint,
852                                     Option<T>,
853                                     ~[RecvPacketBuffered<T, Tb>]) {
854     let mut endpoint_headers = ~[];
855     for endpoints.mut_iter().advance |endpoint| {
856         endpoint_headers.push(endpoint.header());
857     }
858
859     let ready = wait_many(endpoint_headers);
860     let mut remaining = endpoints;
861     let port = remaining.swap_remove(ready);
862     let result = try_recv(port);
863     (ready, result, remaining)
864 }
865
866 pub mod rt {
867     use option::{None, Option, Some};
868
869     // These are used to hide the option constructors from the
870     // compiler because their names are changing
871     pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
872     pub fn make_none<T>() -> Option<T> { None }
873 }
874
875 #[cfg(test)]
876 mod test {
877     use either::Right;
878     use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
879                GenericChan, Peekable};
880
881     #[test]
882     fn test_select2() {
883         let (p1, c1) = stream();
884         let (p2, c2) = stream();
885
886         c1.send(~"abc");
887
888         let mut tuple = (p1, p2);
889         match tuple.select() {
890             Right(_) => fail!(),
891             _ => (),
892         }
893
894         c2.send(123);
895     }
896
897     #[test]
898     fn test_oneshot() {
899         let (p, c) = oneshot();
900
901         c.send(());
902
903         recv_one(p)
904     }
905
906     #[test]
907     fn test_peek_terminated() {
908         let (port, chan): (Port<int>, Chan<int>) = stream();
909
910         {
911             // Destroy the channel
912             let _chan = chan;
913         }
914
915         assert!(!port.peek());
916     }
917 }