]> git.lizzy.rs Git - rust.git/blob - src/libcore/pipes.rs
bc6d89e92b6cbf13d03b42841ea1f10d6c18d71d
[rust.git] / src / libcore / pipes.rs
1 /*! Runtime support for message passing with protocol enforcement.
2
3
4 Pipes consist of two endpoints. One endpoint can send messages and
5 the other can receive messages. The set of legal messages and which
6 directions they can flow at any given point are determined by a
7 protocol. Below is an example protocol.
8
9 ~~~
10 proto! pingpong {
11     ping: send {
12         ping -> pong
13     }
14     pong: recv {
15         pong -> ping
16     }
17 }
18 ~~~
19
20 The `proto!` syntax extension will convert this into a module called
21 `pingpong`, which includes a set of types and functions that can be
22 used to write programs that follow the pingpong protocol.
23
24 */
25
26 /* IMPLEMENTATION NOTES
27
28 The initial design for this feature is available at:
29
30 https://github.com/eholk/rust/wiki/Proposal-for-channel-contracts
31
32 Much of the design in that document is still accurate. There are
33 several components for the pipe implementation. First of all is the
34 syntax extension. To see how that works, it is best see comments in
35 libsyntax/ext/pipes.rs.
36
37 This module includes two related pieces of the runtime
38 implementation: support for unbounded and bounded
39 protocols. The main difference between the two is the type of the
40 buffer that is carried along in the endpoint data structures.
41
42
43 The heart of the implementation is the packet type. It contains a
44 header and a payload field. Much of the code in this module deals with
45 the header field. This is where the synchronization information is
46 stored. In the case of a bounded protocol, the header also includes a
47 pointer to the buffer the packet is contained in.
48
49 Packets represent a single message in a protocol. The payload field
50 gets instatiated at the type of the message, which is usually an enum
51 generated by the pipe compiler. Packets are conceptually single use,
52 although in bounded protocols they are reused each time around the
53 loop.
54
55
56 Packets are usually handled through a send_packet_buffered or
57 recv_packet_buffered object. Each packet is referenced by one
58 send_packet and one recv_packet, and these wrappers enforce that only
59 one end can send and only one end can receive. The structs also
60 include a destructor that marks packets are terminated if the sender
61 or receiver destroys the object before sending or receiving a value.
62
63 The *_packet_buffered structs take two type parameters. The first is
64 the message type for the current packet (or state). The second
65 represents the type of the whole buffer. For bounded protocols, the
66 protocol compiler generates a struct with a field for each protocol
67 state. This generated struct is used as the buffer type parameter. For
68 unbounded protocols, the buffer is simply one packet, so there is a
69 shorthand struct called send_packet and recv_packet, where the buffer
70 type is just `packet<T>`. Using the same underlying structure for both
71 bounded and unbounded protocols allows for less code duplication.
72
73 */
74
75 import unsafe::{forget, reinterpret_cast, transmute};
76 import either::{either, left, right};
77 import option::unwrap;
78
79 // Things used by code generated by the pipe compiler.
80 export entangle, get_buffer, drop_buffer;
81 export send_packet_buffered, recv_packet_buffered;
82 export packet, mk_packet, entangle_buffer, has_buffer, buffer_header;
83
84 // export these so we can find them in the buffer_resource
85 // destructor. This is probably a symptom of #3005.
86 export atomic_add_acq, atomic_sub_rel;
87
88 // User-level things
89 export send_packet, recv_packet, send, recv, try_recv, peek;
90 export select, select2, selecti, select2i, selectable;
91 export spawn_service, spawn_service_recv;
92 export stream, port, chan, shared_chan, port_set, channel;
93 export oneshot, chan_one, port_one;
94 export recv_one, try_recv_one, send_one, try_send_one;
95
96 #[doc(hidden)]
97 const SPIN_COUNT: uint = 0;
98
99 macro_rules! move_it {
100     { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
101 }
102
103 #[doc(hidden)]
104 enum state {
105     empty,
106     full,
107     blocked,
108     terminated
109 }
110
111 struct buffer_header {
112     // Tracks whether this buffer needs to be freed. We can probably
113     // get away with restricting it to 0 or 1, if we're careful.
114     let mut ref_count: int;
115
116     new() { self.ref_count = 0; }
117
118     // We may want a drop, and to be careful about stringing this
119     // thing along.
120 }
121
122 // This is for protocols to associate extra data to thread around.
123 #[doc(hidden)]
124 type buffer<T: send> = {
125     header: buffer_header,
126     data: T,
127 };
128
129 struct packet_header {
130     let mut state: state;
131     let mut blocked_task: *rust_task;
132
133     // This is a reinterpret_cast of a ~buffer, that can also be cast
134     // to a buffer_header if need be.
135     let mut buffer: *libc::c_void;
136
137     new() {
138         self.state = empty;
139         self.blocked_task = ptr::null();
140         self.buffer = ptr::null();
141     }
142
143     // Returns the old state.
144     unsafe fn mark_blocked(this: *rust_task) -> state {
145         rustrt::rust_task_ref(this);
146         let old_task = swap_task(self.blocked_task, this);
147         assert old_task.is_null();
148         swap_state_acq(self.state, blocked)
149     }
150
151     unsafe fn unblock() {
152         let old_task = swap_task(self.blocked_task, ptr::null());
153         if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
154         match swap_state_acq(self.state, empty) {
155           empty | blocked => (),
156           terminated => self.state = terminated,
157           full => self.state = full
158         }
159     }
160
161     // unsafe because this can do weird things to the space/time
162     // continuum. It ends making multiple unique pointers to the same
163     // thing. You'll proobably want to forget them when you're done.
164     unsafe fn buf_header() -> ~buffer_header {
165         assert self.buffer.is_not_null();
166         reinterpret_cast(self.buffer)
167     }
168
169     fn set_buffer<T: send>(b: ~buffer<T>) unsafe {
170         self.buffer = reinterpret_cast(b);
171     }
172 }
173
174 #[doc(hidden)]
175 type packet<T: send> = {
176     header: packet_header,
177     mut payload: option<T>,
178 };
179
180 #[doc(hidden)]
181 trait has_buffer {
182     fn set_buffer(b: *libc::c_void);
183 }
184
185 impl<T: send> packet<T>: has_buffer {
186     fn set_buffer(b: *libc::c_void) {
187         self.header.buffer = b;
188     }
189 }
190
191 #[doc(hidden)]
192 fn mk_packet<T: send>() -> packet<T> {
193     {
194         header: packet_header(),
195         mut payload: none
196     }
197 }
198
199 #[doc(hidden)]
200 fn unibuffer<T: send>() -> ~buffer<packet<T>> {
201     let b = ~{
202         header: buffer_header(),
203         data: {
204             header: packet_header(),
205             mut payload: none,
206         }
207     };
208
209     unsafe {
210         b.data.header.buffer = reinterpret_cast(b);
211     }
212
213     b
214 }
215
216 #[doc(hidden)]
217 fn packet<T: send>() -> *packet<T> {
218     let b = unibuffer();
219     let p = ptr::addr_of(b.data);
220     // We'll take over memory management from here.
221     unsafe { forget(b) }
222     p
223 }
224
225 #[doc(hidden)]
226 fn entangle_buffer<T: send, Tstart: send>(
227     -buffer: ~buffer<T>,
228     init: fn(*libc::c_void, x: &T) -> *packet<Tstart>)
229     -> (send_packet_buffered<Tstart, T>, recv_packet_buffered<Tstart, T>)
230 {
231     let p = init(unsafe { reinterpret_cast(buffer) }, &buffer.data);
232     unsafe { forget(buffer) }
233     (send_packet_buffered(p), recv_packet_buffered(p))
234 }
235
236 #[abi = "rust-intrinsic"]
237 #[doc(hidden)]
238 extern mod rusti {
239     fn atomic_xchng(&dst: int, src: int) -> int;
240     fn atomic_xchng_acq(&dst: int, src: int) -> int;
241     fn atomic_xchng_rel(&dst: int, src: int) -> int;
242
243     fn atomic_add_acq(&dst: int, src: int) -> int;
244     fn atomic_sub_rel(&dst: int, src: int) -> int;
245 }
246
247 // If I call the rusti versions directly from a polymorphic function,
248 // I get link errors. This is a bug that needs investigated more.
249 #[doc(hidden)]
250 fn atomic_xchng_rel(&dst: int, src: int) -> int {
251     rusti::atomic_xchng_rel(dst, src)
252 }
253
254 #[doc(hidden)]
255 fn atomic_add_acq(&dst: int, src: int) -> int {
256     rusti::atomic_add_acq(dst, src)
257 }
258
259 #[doc(hidden)]
260 fn atomic_sub_rel(&dst: int, src: int) -> int {
261     rusti::atomic_sub_rel(dst, src)
262 }
263
264 #[doc(hidden)]
265 fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
266     // It might be worth making both acquire and release versions of
267     // this.
268     unsafe {
269         reinterpret_cast(rusti::atomic_xchng(
270             *(ptr::mut_addr_of(dst) as *mut int),
271             src as int))
272     }
273 }
274
275 #[doc(hidden)]
276 type rust_task = libc::c_void;
277
278 #[doc(hidden)]
279 extern mod rustrt {
280     #[rust_stack]
281     fn rust_get_task() -> *rust_task;
282     #[rust_stack]
283     fn rust_task_ref(task: *rust_task);
284     fn rust_task_deref(task: *rust_task);
285
286     #[rust_stack]
287     fn task_clear_event_reject(task: *rust_task);
288
289     fn task_wait_event(this: *rust_task, killed: &mut *libc::c_void) -> bool;
290     pure fn task_signal_event(target: *rust_task, event: *libc::c_void);
291 }
292
293 #[doc(hidden)]
294 fn wait_event(this: *rust_task) -> *libc::c_void {
295     let mut event = ptr::null();
296
297     let killed = rustrt::task_wait_event(this, &mut event);
298     if killed && !task::failing() {
299         fail ~"killed"
300     }
301     event
302 }
303
304 #[doc(hidden)]
305 fn swap_state_acq(&dst: state, src: state) -> state {
306     unsafe {
307         reinterpret_cast(rusti::atomic_xchng_acq(
308             *(ptr::mut_addr_of(dst) as *mut int),
309             src as int))
310     }
311 }
312
313 #[doc(hidden)]
314 fn swap_state_rel(&dst: state, src: state) -> state {
315     unsafe {
316         reinterpret_cast(rusti::atomic_xchng_rel(
317             *(ptr::mut_addr_of(dst) as *mut int),
318             src as int))
319     }
320 }
321
322 #[doc(hidden)]
323 unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
324     transmute((*p).buf_header())
325 }
326
327 struct buffer_resource<T: send> {
328     let buffer: ~buffer<T>;
329     new(+b: ~buffer<T>) {
330         //let p = ptr::addr_of(*b);
331         //error!{"take %?", p};
332         atomic_add_acq(b.header.ref_count, 1);
333         self.buffer = b;
334     }
335
336     drop unsafe {
337         let b = move_it!{self.buffer};
338         //let p = ptr::addr_of(*b);
339         //error!{"drop %?", p};
340         let old_count = atomic_sub_rel(b.header.ref_count, 1);
341         //let old_count = atomic_xchng_rel(b.header.ref_count, 0);
342         if old_count == 1 {
343             // The new count is 0.
344
345             // go go gadget drop glue
346         }
347         else {
348             forget(b)
349         }
350     }
351 }
352
353 #[doc(hidden)]
354 fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
355                                 -payload: T) -> bool {
356     let header = p.header();
357     let p_ = p.unwrap();
358     let p = unsafe { &*p_ };
359     assert ptr::addr_of(p.header) == header;
360     assert p.payload == none;
361     p.payload <- some(payload);
362     let old_state = swap_state_rel(p.header.state, full);
363     match old_state {
364         empty => {
365             // Yay, fastpath.
366
367             // The receiver will eventually clean this up.
368             //unsafe { forget(p); }
369             return true;
370         }
371         full => fail ~"duplicate send",
372         blocked => {
373             debug!{"waking up task for %?", p_};
374             let old_task = swap_task(p.header.blocked_task, ptr::null());
375             if !old_task.is_null() {
376                 rustrt::task_signal_event(
377                     old_task, ptr::addr_of(p.header) as *libc::c_void);
378                 rustrt::rust_task_deref(old_task);
379             }
380
381             // The receiver will eventually clean this up.
382             //unsafe { forget(p); }
383             return true;
384         }
385         terminated => {
386             // The receiver will never receive this. Rely on drop_glue
387             // to clean everything up.
388             return false;
389         }
390     }
391 }
392
393 /** Receives a message from a pipe.
394
395 Fails if the sender closes the connection.
396
397 */
398 fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
399     option::unwrap_expect(try_recv(p), "connection closed")
400 }
401
402 /** Attempts to receive a message from a pipe.
403
404 Returns `none` if the sender has closed the connection without sending
405 a message, or `some(T)` if a message was received.
406
407 */
408 fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
409     -> option<T>
410 {
411     let p_ = p.unwrap();
412     let p = unsafe { &*p_ };
413
414     struct drop_state {
415         p: &packet_header;
416
417         drop {
418             if task::failing() {
419                 self.p.state = terminated;
420                 let old_task = swap_task(self.p.blocked_task, ptr::null());
421                 if !old_task.is_null() {
422                     rustrt::rust_task_deref(old_task);
423                 }
424             }
425         }
426     };
427
428     let _drop_state = drop_state { p: &p.header };
429
430     // optimistic path
431     match p.header.state {
432       full => {
433         let mut payload = none;
434         payload <-> p.payload;
435         p.header.state = empty;
436         return some(option::unwrap(payload))
437       },
438       terminated => return none,
439       _ => {}
440     }
441
442     // regular path
443     let this = rustrt::rust_get_task();
444     rustrt::task_clear_event_reject(this);
445     rustrt::rust_task_ref(this);
446     let old_task = swap_task(p.header.blocked_task, this);
447     assert old_task.is_null();
448     let mut first = true;
449     let mut count = SPIN_COUNT;
450     loop {
451         rustrt::task_clear_event_reject(this);
452         let old_state = swap_state_acq(p.header.state,
453                                        blocked);
454         match old_state {
455           empty => {
456             debug!{"no data available on %?, going to sleep.", p_};
457             if count == 0 {
458                 wait_event(this);
459             }
460             else {
461                 count -= 1;
462                 // FIXME (#524): Putting the yield here destroys a lot
463                 // of the benefit of spinning, since we still go into
464                 // the scheduler at every iteration. However, without
465                 // this everything spins too much because we end up
466                 // sometimes blocking the thing we are waiting on.
467                 task::yield();
468             }
469             debug!{"woke up, p.state = %?", copy p.header.state};
470           }
471           blocked => if first {
472             fail ~"blocking on already blocked packet"
473           },
474           full => {
475             let mut payload = none;
476             payload <-> p.payload;
477             let old_task = swap_task(p.header.blocked_task, ptr::null());
478             if !old_task.is_null() {
479                 rustrt::rust_task_deref(old_task);
480             }
481             p.header.state = empty;
482             return some(option::unwrap(payload))
483           }
484           terminated => {
485             // This assert detects when we've accidentally unsafely
486             // casted too big of a number to a state.
487             assert old_state == terminated;
488
489             let old_task = swap_task(p.header.blocked_task, ptr::null());
490             if !old_task.is_null() {
491                 rustrt::rust_task_deref(old_task);
492             }
493             return none;
494           }
495         }
496         first = false;
497     }
498 }
499
500 /// Returns true if messages are available.
501 pure fn peek<T: send, Tb: send>(p: recv_packet_buffered<T, Tb>) -> bool {
502     match unsafe {(*p.header()).state} {
503       empty => false,
504       blocked => fail ~"peeking on blocked packet",
505       full | terminated => true
506     }
507 }
508
509 impl<T: send, Tb: send> recv_packet_buffered<T, Tb> {
510     pure fn peek() -> bool {
511         peek(self)
512     }
513 }
514
515 #[doc(hidden)]
516 fn sender_terminate<T: send>(p: *packet<T>) {
517     let p = unsafe { &*p };
518     match swap_state_rel(p.header.state, terminated) {
519       empty => {
520         // The receiver will eventually clean up.
521       }
522       blocked => {
523         // wake up the target
524         let old_task = swap_task(p.header.blocked_task, ptr::null());
525         if !old_task.is_null() {
526             rustrt::task_signal_event(
527                 old_task,
528                 ptr::addr_of(p.header) as *libc::c_void);
529             rustrt::rust_task_deref(old_task);
530         }
531         // The receiver will eventually clean up.
532       }
533       full => {
534         // This is impossible
535         fail ~"you dun goofed"
536       }
537       terminated => {
538         assert p.header.blocked_task.is_null();
539         // I have to clean up, use drop_glue
540       }
541     }
542 }
543
544 #[doc(hidden)]
545 fn receiver_terminate<T: send>(p: *packet<T>) {
546     let p = unsafe { &*p };
547     match swap_state_rel(p.header.state, terminated) {
548       empty => {
549         assert p.header.blocked_task.is_null();
550         // the sender will clean up
551       }
552       blocked => {
553         let old_task = swap_task(p.header.blocked_task, ptr::null());
554         if !old_task.is_null() {
555             rustrt::rust_task_deref(old_task);
556             assert old_task == rustrt::rust_get_task();
557         }
558       }
559       terminated | full => {
560         assert p.header.blocked_task.is_null();
561         // I have to clean up, use drop_glue
562       }
563     }
564 }
565
566 /** Returns when one of the packet headers reports data is available.
567
568 This function is primarily intended for building higher level waiting
569 functions, such as `select`, `select2`, etc.
570
571 It takes a vector slice of packet_headers and returns an index into
572 that vector. The index points to an endpoint that has either been
573 closed by the sender or has a message waiting to be received.
574
575 */
576 fn wait_many(pkts: &[*packet_header]) -> uint {
577     let this = rustrt::rust_get_task();
578
579     rustrt::task_clear_event_reject(this);
580     let mut data_avail = false;
581     let mut ready_packet = pkts.len();
582     for pkts.eachi |i, p| unsafe {
583         let p = unsafe { &*p };
584         let old = p.mark_blocked(this);
585         match old {
586           full | terminated => {
587             data_avail = true;
588             ready_packet = i;
589             (*p).state = old;
590             break;
591           }
592           blocked => fail ~"blocking on blocked packet",
593           empty => ()
594         }
595     }
596
597     while !data_avail {
598         debug!{"sleeping on %? packets", pkts.len()};
599         let event = wait_event(this) as *packet_header;
600         let pos = vec::position(pkts, |p| p == event);
601
602         match pos {
603           some(i) => {
604             ready_packet = i;
605             data_avail = true;
606           }
607           none => debug!{"ignoring spurious event, %?", event}
608         }
609     }
610
611     debug!{"%?", pkts[ready_packet]};
612
613     for pkts.each |p| { unsafe{ (*p).unblock()} }
614
615     debug!("%?, %?", ready_packet, pkts[ready_packet]);
616
617     unsafe {
618         assert (*pkts[ready_packet]).state == full
619             || (*pkts[ready_packet]).state == terminated;
620     }
621
622     ready_packet
623 }
624
625 /** Receives a message from one of two endpoints.
626
627 The return value is `left` if the first endpoint received something,
628 or `right` if the second endpoint receives something. In each case,
629 the result includes the other endpoint as well so it can be used
630 again. Below is an example of using `select2`.
631
632 ~~~
633 match select2(a, b) {
634   left((none, b)) {
635     // endpoint a was closed.
636   }
637   right((a, none)) {
638     // endpoint b was closed.
639   }
640   left((some(_), b)) {
641     // endpoint a received a message
642   }
643   right(a, some(_)) {
644     // endpoint b received a message.
645   }
646 }
647 ~~~
648
649 Sometimes messages will be available on both endpoints at once. In
650 this case, `select2` may return either `left` or `right`.
651
652 */
653 fn select2<A: send, Ab: send, B: send, Bb: send>(
654     +a: recv_packet_buffered<A, Ab>,
655     +b: recv_packet_buffered<B, Bb>)
656     -> either<(option<A>, recv_packet_buffered<B, Bb>),
657               (recv_packet_buffered<A, Ab>, option<B>)>
658 {
659     let i = wait_many([a.header(), b.header()]/_);
660
661     unsafe {
662         match i {
663           0 => left((try_recv(a), b)),
664           1 => right((a, try_recv(b))),
665           _ => fail ~"select2 return an invalid packet"
666         }
667     }
668 }
669
670 #[doc(hidden)]
671 trait selectable {
672     pure fn header() -> *packet_header;
673 }
674
675 impl *packet_header: selectable {
676     pure fn header() -> *packet_header { self }
677 }
678
679 /// Returns the index of an endpoint that is ready to receive.
680 fn selecti<T: selectable>(endpoints: &[T]) -> uint {
681     wait_many(endpoints.map(|p| p.header()))
682 }
683
684 /// Returns 0 or 1 depending on which endpoint is ready to receive
685 fn select2i<A: selectable, B: selectable>(a: A, b: B) -> either<(), ()> {
686     match wait_many([a.header(), b.header()]/_) {
687       0 => left(()),
688       1 => right(()),
689       _ => fail ~"wait returned unexpected index"
690     }
691 }
692
693 /** Waits on a set of endpoints. Returns a message, its index, and a
694  list of the remaining endpoints.
695
696 */
697 fn select<T: send, Tb: send>(+endpoints: ~[recv_packet_buffered<T, Tb>])
698     -> (uint, option<T>, ~[recv_packet_buffered<T, Tb>])
699 {
700     let ready = wait_many(endpoints.map(|p| p.header()));
701     let mut remaining = ~[];
702     let mut result = none;
703     do vec::consume(endpoints) |i, p| {
704         if i == ready {
705             result = try_recv(p);
706         }
707         else {
708             vec::push(remaining, p);
709         }
710     }
711
712     (ready, result, remaining)
713 }
714
715 /** The sending end of a pipe. It can be used to send exactly one
716 message.
717
718 */
719 type send_packet<T: send> = send_packet_buffered<T, packet<T>>;
720
721 #[doc(hidden)]
722 fn send_packet<T: send>(p: *packet<T>) -> send_packet<T> {
723     send_packet_buffered(p)
724 }
725
726 struct send_packet_buffered<T: send, Tbuffer: send> {
727     let mut p: option<*packet<T>>;
728     let mut buffer: option<buffer_resource<Tbuffer>>;
729     new(p: *packet<T>) {
730         //debug!{"take send %?", p};
731         self.p = some(p);
732         unsafe {
733             self.buffer = some(
734                 buffer_resource(
735                     get_buffer(ptr::addr_of((*p).header))));
736         };
737     }
738     drop {
739         //if self.p != none {
740         //    debug!{"drop send %?", option::get(self.p)};
741         //}
742         if self.p != none {
743             let mut p = none;
744             p <-> self.p;
745             sender_terminate(option::unwrap(p))
746         }
747         //unsafe { error!{"send_drop: %?",
748         //                if self.buffer == none {
749         //                    "none"
750         //                } else { "some" }}; }
751     }
752     fn unwrap() -> *packet<T> {
753         let mut p = none;
754         p <-> self.p;
755         option::unwrap(p)
756     }
757
758     pure fn header() -> *packet_header {
759         match self.p {
760           some(packet) => unsafe {
761             let packet = &*packet;
762             let header = ptr::addr_of(packet.header);
763             //forget(packet);
764             header
765           },
766           none => fail ~"packet already consumed"
767         }
768     }
769
770     fn reuse_buffer() -> buffer_resource<Tbuffer> {
771         //error!{"send reuse_buffer"};
772         let mut tmp = none;
773         tmp <-> self.buffer;
774         option::unwrap(tmp)
775     }
776 }
777
778 /// Represents the receive end of a pipe. It can receive exactly one
779 /// message.
780 type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
781
782 #[doc(hidden)]
783 fn recv_packet<T: send>(p: *packet<T>) -> recv_packet<T> {
784     recv_packet_buffered(p)
785 }
786
787 struct recv_packet_buffered<T: send, Tbuffer: send> : selectable {
788     let mut p: option<*packet<T>>;
789     let mut buffer: option<buffer_resource<Tbuffer>>;
790     new(p: *packet<T>) {
791         //debug!{"take recv %?", p};
792         self.p = some(p);
793         unsafe {
794             self.buffer = some(
795                 buffer_resource(
796                     get_buffer(ptr::addr_of((*p).header))));
797         };
798     }
799     drop {
800         //if self.p != none {
801         //    debug!{"drop recv %?", option::get(self.p)};
802         //}
803         if self.p != none {
804             let mut p = none;
805             p <-> self.p;
806             receiver_terminate(option::unwrap(p))
807         }
808         //unsafe { error!{"recv_drop: %?",
809         //                if self.buffer == none {
810         //                    "none"
811         //                } else { "some" }}; }
812     }
813     fn unwrap() -> *packet<T> {
814         let mut p = none;
815         p <-> self.p;
816         option::unwrap(p)
817     }
818
819     pure fn header() -> *packet_header {
820         match self.p {
821           some(packet) => unsafe {
822             let packet = &*packet;
823             let header = ptr::addr_of(packet.header);
824             //forget(packet);
825             header
826           },
827           none => fail ~"packet already consumed"
828         }
829     }
830
831     fn reuse_buffer() -> buffer_resource<Tbuffer> {
832         //error!{"recv reuse_buffer"};
833         let mut tmp = none;
834         tmp <-> self.buffer;
835         option::unwrap(tmp)
836     }
837 }
838
839 #[doc(hidden)]
840 fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
841     let p = packet();
842     (send_packet(p), recv_packet(p))
843 }
844
845 /** Spawn a task to provide a service.
846
847 It takes an initialization function that produces a send and receive
848 endpoint. The send endpoint is returned to the caller and the receive
849 endpoint is passed to the new task.
850
851 */
852 fn spawn_service<T: send, Tb: send>(
853     init: extern fn() -> (send_packet_buffered<T, Tb>,
854                           recv_packet_buffered<T, Tb>),
855     +service: fn~(+recv_packet_buffered<T, Tb>))
856     -> send_packet_buffered<T, Tb>
857 {
858     let (client, server) = init();
859
860     // This is some nasty gymnastics required to safely move the pipe
861     // into a new task.
862     let server = ~mut some(server);
863     do task::spawn |move service| {
864         let mut server_ = none;
865         server_ <-> *server;
866         service(option::unwrap(server_))
867     }
868
869     client
870 }
871
872 /** Like `spawn_service_recv`, but for protocols that start in the
873 receive state.
874
875 */
876 fn spawn_service_recv<T: send, Tb: send>(
877     init: extern fn() -> (recv_packet_buffered<T, Tb>,
878                           send_packet_buffered<T, Tb>),
879     +service: fn~(+send_packet_buffered<T, Tb>))
880     -> recv_packet_buffered<T, Tb>
881 {
882     let (client, server) = init();
883
884     // This is some nasty gymnastics required to safely move the pipe
885     // into a new task.
886     let server = ~mut some(server);
887     do task::spawn |move service| {
888         let mut server_ = none;
889         server_ <-> *server;
890         service(option::unwrap(server_))
891     }
892
893     client
894 }
895
896 // Streams - Make pipes a little easier in general.
897
898 proto! streamp {
899     open:send<T: send> {
900         data(T) -> open<T>
901     }
902 }
903
904 /// A trait for things that can send multiple messages.
905 trait channel<T: send> {
906     // It'd be nice to call this send, but it'd conflict with the
907     // built in send kind.
908
909     /// Sends a message.
910     fn send(+x: T);
911
912     /// Sends a message, or report if the receiver has closed the connection.
913     fn try_send(+x: T) -> bool;
914 }
915
916 /// A trait for things that can receive multiple messages.
917 trait recv<T: send> {
918     /// Receives a message, or fails if the connection closes.
919     fn recv() -> T;
920
921     /** Receives a message if one is available, or returns `none` if
922     the connection is closed.
923
924     */
925     fn try_recv() -> option<T>;
926
927     /** Returns true if a message is available or the connection is
928     closed.
929
930     */
931     pure fn peek() -> bool;
932 }
933
934 #[doc(hidden)]
935 type chan_<T:send> = { mut endp: option<streamp::client::open<T>> };
936
937 /// An endpoint that can send many messages.
938 enum chan<T:send> {
939     chan_(chan_<T>)
940 }
941
942 #[doc(hidden)]
943 type port_<T:send> = { mut endp: option<streamp::server::open<T>> };
944
945 /// An endpoint that can receive many messages.
946 enum port<T:send> {
947     port_(port_<T>)
948 }
949
950 /** Creates a `(chan, port)` pair.
951
952 These allow sending or receiving an unlimited number of messages.
953
954 */
955 fn stream<T:send>() -> (chan<T>, port<T>) {
956     let (c, s) = streamp::init();
957
958     (chan_({ mut endp: some(c) }), port_({ mut endp: some(s) }))
959 }
960
961 impl<T: send> chan<T>: channel<T> {
962     fn send(+x: T) {
963         let mut endp = none;
964         endp <-> self.endp;
965         self.endp = some(
966             streamp::client::data(unwrap(endp), x))
967     }
968
969     fn try_send(+x: T) -> bool {
970         let mut endp = none;
971         endp <-> self.endp;
972         match move streamp::client::try_data(unwrap(endp), x) {
973             some(next) => {
974                 self.endp = some(move_it!(next));
975                 true
976             }
977             none => false
978         }
979     }
980 }
981
982 impl<T: send> port<T>: recv<T> {
983     fn recv() -> T {
984         let mut endp = none;
985         endp <-> self.endp;
986         let streamp::data(x, endp) = pipes::recv(unwrap(endp));
987         self.endp = some(endp);
988         x
989     }
990
991     fn try_recv() -> option<T> {
992         let mut endp = none;
993         endp <-> self.endp;
994         match move pipes::try_recv(unwrap(endp)) {
995           some(streamp::data(x, endp)) => {
996             self.endp = some(move_it!{endp});
997             some(move_it!{x})
998           }
999           none => none
1000         }
1001     }
1002
1003     pure fn peek() -> bool unchecked {
1004         let mut endp = none;
1005         endp <-> self.endp;
1006         let peek = match endp {
1007           some(endp) => pipes::peek(endp),
1008           none => fail ~"peeking empty stream"
1009         };
1010         self.endp <-> endp;
1011         peek
1012     }
1013 }
1014
1015 // Treat a whole bunch of ports as one.
1016 struct port_set<T: send> : recv<T> {
1017     let mut ports: ~[pipes::port<T>];
1018
1019     new() { self.ports = ~[]; }
1020
1021     fn add(+port: pipes::port<T>) {
1022         vec::push(self.ports, port)
1023     }
1024
1025     fn chan() -> chan<T> {
1026         let (ch, po) = stream();
1027         self.add(po);
1028         ch
1029     }
1030
1031     fn try_recv() -> option<T> {
1032         let mut result = none;
1033         // we have to swap the ports array so we aren't borrowing
1034         // aliasable mutable memory.
1035         let mut ports = ~[];
1036         ports <-> self.ports;
1037         while result == none && ports.len() > 0 {
1038             let i = wait_many(ports.map(|p| p.header()));
1039             match move ports[i].try_recv() {
1040                 some(copy m) => {
1041                     result = some(move m);
1042                 }
1043                 none => {
1044                     // Remove this port.
1045                     let mut ports_ = ~[];
1046                     ports <-> ports_;
1047                     vec::consume(ports_,
1048                                  |j, x| if i != j {
1049                                      vec::push(ports, x)
1050                                  });
1051                 }
1052             }
1053         }
1054         ports <-> self.ports;
1055         result
1056     }
1057
1058     fn recv() -> T {
1059         match move self.try_recv() {
1060             some(copy x) => move x,
1061             none => fail ~"port_set: endpoints closed"
1062         }
1063     }
1064
1065     pure fn peek() -> bool {
1066         // It'd be nice to use self.port.each, but that version isn't
1067         // pure.
1068         for vec::each(self.ports) |p| {
1069             if p.peek() { return true }
1070         }
1071         false
1072     }
1073 }
1074
1075 impl<T: send> port<T>: selectable {
1076     pure fn header() -> *packet_header unchecked {
1077         match self.endp {
1078           some(endp) => endp.header(),
1079           none => fail ~"peeking empty stream"
1080         }
1081     }
1082 }
1083
1084 /// A channel that can be shared between many senders.
1085 type shared_chan<T: send> = unsafe::Exclusive<chan<T>>;
1086
1087 impl<T: send> shared_chan<T>: channel<T> {
1088     fn send(+x: T) {
1089         let mut xx = some(x);
1090         do self.with |chan| {
1091             let mut x = none;
1092             x <-> xx;
1093             chan.send(option::unwrap(x))
1094         }
1095     }
1096
1097     fn try_send(+x: T) -> bool {
1098         let mut xx = some(x);
1099         do self.with |chan| {
1100             let mut x = none;
1101             x <-> xx;
1102             chan.try_send(option::unwrap(x))
1103         }
1104     }
1105 }
1106
1107 /// Converts a `chan` into a `shared_chan`.
1108 fn shared_chan<T:send>(+c: chan<T>) -> shared_chan<T> {
1109     unsafe::exclusive(c)
1110 }
1111
1112 /// Receive a message from one of two endpoints.
1113 trait select2<T: send, U: send> {
1114     /// Receive a message or return `none` if a connection closes.
1115     fn try_select() -> either<option<T>, option<U>>;
1116     /// Receive a message or fail if a connection closes.
1117     fn select() -> either<T, U>;
1118 }
1119
1120 impl<T: send, U: send, Left: selectable recv<T>, Right: selectable recv<U>>
1121     (Left, Right): select2<T, U> {
1122
1123     fn select() -> either<T, U> {
1124         match self {
1125           (lp, rp) => match select2i(lp, rp) {
1126             left(()) => left (lp.recv()),
1127             right(()) => right(rp.recv())
1128           }
1129         }
1130     }
1131
1132     fn try_select() -> either<option<T>, option<U>> {
1133         match self {
1134           (lp, rp) => match select2i(lp, rp) {
1135             left(()) => left (lp.try_recv()),
1136             right(()) => right(rp.try_recv())
1137           }
1138         }
1139     }
1140 }
1141
1142 proto! oneshot {
1143     oneshot:send<T:send> {
1144         send(T) -> !
1145     }
1146 }
1147
1148 /// The send end of a oneshot pipe.
1149 type chan_one<T: send> = oneshot::client::oneshot<T>;
1150 /// The receive end of a oneshot pipe.
1151 type port_one<T: send> = oneshot::server::oneshot<T>;
1152
1153 /// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
1154 fn oneshot<T: send>() -> (chan_one<T>, port_one<T>) {
1155     oneshot::init()
1156 }
1157
1158 /**
1159  * Receive a message from a oneshot pipe, failing if the connection was
1160  * closed.
1161  */
1162 fn recv_one<T: send>(+port: port_one<T>) -> T {
1163     let oneshot::send(message) = recv(port);
1164     message
1165 }
1166
1167 /// Receive a message from a oneshot pipe unless the connection was closed.
1168 fn try_recv_one<T: send> (+port: port_one<T>) -> option<T> {
1169     let message = try_recv(port);
1170
1171     if message == none { none }
1172     else {
1173         let oneshot::send(message) = option::unwrap(message);
1174         some(message)
1175     }
1176 }
1177
1178 /// Send a message on a oneshot pipe, failing if the connection was closed.
1179 fn send_one<T: send>(+chan: chan_one<T>, +data: T) {
1180     oneshot::client::send(chan, data);
1181 }
1182
1183 /**
1184  * Send a message on a oneshot pipe, or return false if the connection was
1185  * closed.
1186  */
1187 fn try_send_one<T: send>(+chan: chan_one<T>, +data: T)
1188         -> bool {
1189     oneshot::client::try_send(chan, data).is_some()
1190 }
1191
1192 #[cfg(test)]
1193 mod test {
1194     #[test]
1195     fn test_select2() {
1196         let (c1, p1) = pipes::stream();
1197         let (c2, p2) = pipes::stream();
1198
1199         c1.send(~"abc");
1200
1201         match (p1, p2).select() {
1202           right(_) => fail,
1203           _ => ()
1204         }
1205
1206         c2.send(123);
1207     }
1208
1209     #[test]
1210     fn test_oneshot() {
1211         let (c, p) = oneshot::init();
1212
1213         oneshot::client::send(c, ());
1214
1215         recv_one(p)
1216     }
1217 }