]> git.lizzy.rs Git - rust.git/commitdiff
core: Extract comm from pipes. #4742
authorBrian Anderson <banderson@mozilla.com>
Sat, 2 Feb 2013 11:10:12 +0000 (03:10 -0800)
committerBrian Anderson <banderson@mozilla.com>
Fri, 22 Feb 2013 01:36:54 +0000 (17:36 -0800)
89 files changed:
doc/tutorial-tasks.md
src/compiletest/procsrv.rs
src/libcore/comm.rs [new file with mode: 0644]
src/libcore/core.rc
src/libcore/pipes.rs
src/libcore/prelude.rs
src/libcore/private.rs
src/libcore/private/weak_task.rs
src/libcore/run.rs
src/libcore/task/mod.rs
src/libcore/task/spawn.rs
src/librustc/rustc.rc
src/librustdoc/astsrv.rs
src/librustdoc/markdown_writer.rs
src/librustdoc/page_pass.rs
src/libstd/arc.rs
src/libstd/comm.rs
src/libstd/flatpipes.rs
src/libstd/future.rs
src/libstd/net_ip.rs
src/libstd/net_tcp.rs
src/libstd/sync.rs
src/libstd/task_pool.rs
src/libstd/test.rs
src/libstd/timer.rs
src/libstd/uv_global_loop.rs
src/libstd/uv_iotask.rs
src/libstd/uv_ll.rs
src/libstd/workcache.rs
src/libsyntax/ext/pipes/pipec.rs
src/test/auxiliary/cci_capture_clause.rs
src/test/bench/msgsend-pipes-shared.rs
src/test/bench/msgsend-pipes.rs
src/test/bench/msgsend-ring-pipes.rs
src/test/bench/pingpong.rs
src/test/bench/shootout-chameneos-redux.rs
src/test/bench/shootout-k-nucleotide-pipes.rs
src/test/bench/shootout-mandelbrot.rs
src/test/bench/shootout-pfib.rs
src/test/bench/task-perf-jargon-metal-smoke.rs
src/test/bench/task-perf-linked-failure.rs
src/test/bench/task-perf-one-million.rs
src/test/compile-fail/bind-by-move-no-guards.rs
src/test/compile-fail/unsendable-class.rs
src/test/run-fail/linked-failure.rs
src/test/run-fail/linked-failure2.rs
src/test/run-fail/linked-failure3.rs
src/test/run-fail/linked-failure4.rs
src/test/run-fail/task-comm-recv-block.rs
src/test/run-pass/capture_nil.rs
src/test/run-pass/comm.rs
src/test/run-pass/hashmap-memory.rs
src/test/run-pass/issue-3168.rs
src/test/run-pass/issue-3176.rs
src/test/run-pass/issue-3609.rs
src/test/run-pass/ivec-tag.rs
src/test/run-pass/pipe-bank-proto.rs
src/test/run-pass/pipe-detect-term.rs
src/test/run-pass/pipe-peek.rs
src/test/run-pass/pipe-pingpong-bounded.rs
src/test/run-pass/pipe-presentation-examples.rs
src/test/run-pass/pipe-select.rs
src/test/run-pass/pipe-sleep.rs
src/test/run-pass/rt-sched-1.rs
src/test/run-pass/send-iloop.rs
src/test/run-pass/send-resource.rs
src/test/run-pass/send-type-inference.rs
src/test/run-pass/sendable-class.rs
src/test/run-pass/spawn-types.rs
src/test/run-pass/task-comm-0.rs
src/test/run-pass/task-comm-10.rs
src/test/run-pass/task-comm-11.rs
src/test/run-pass/task-comm-13.rs
src/test/run-pass/task-comm-14.rs
src/test/run-pass/task-comm-15.rs
src/test/run-pass/task-comm-16.rs
src/test/run-pass/task-comm-3.rs
src/test/run-pass/task-comm-4.rs
src/test/run-pass/task-comm-5.rs
src/test/run-pass/task-comm-6.rs
src/test/run-pass/task-comm-7.rs
src/test/run-pass/task-comm-9.rs
src/test/run-pass/task-comm-chan-nil.rs
src/test/run-pass/task-killjoin-rsrc.rs
src/test/run-pass/task-spawn-move-and-copy.rs
src/test/run-pass/trivial-message.rs
src/test/run-pass/unique-send-2.rs
src/test/run-pass/unique-send.rs
src/test/run-pass/unwind-resource.rs

index a3d0ecaa4ba6552e21b33ceacc16d70ca4272527..c0f9a37627065954197a3d8b2c3b71a67d0f241a 100644 (file)
@@ -157,7 +157,7 @@ concurrently:
 
 ~~~~
 use task::spawn;
-use pipes::{stream, Port, Chan};
+use comm::{stream, Port, Chan};
 
 let (port, chan): (Port<int>, Chan<int>) = stream();
 
@@ -178,7 +178,7 @@ stream for sending and receiving integers (the left-hand side of the `let`,
 a tuple into its component parts).
 
 ~~~~
-# use pipes::{stream, Chan, Port};
+# use comm::{stream, Chan, Port};
 let (port, chan): (Port<int>, Chan<int>) = stream();
 ~~~~
 
@@ -189,7 +189,7 @@ spawns the child task.
 ~~~~
 # use task::{spawn};
 # use task::spawn;
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
 # fn some_expensive_computation() -> int { 42 }
 # let (port, chan) = stream();
 do spawn || {
@@ -209,7 +209,7 @@ computation, then waits for the child's result to arrive on the
 port:
 
 ~~~~
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
 # fn some_other_expensive_computation() {}
 # let (port, chan) = stream::<int>();
 # chan.send(0);
@@ -225,7 +225,7 @@ following program is ill-typed:
 
 ~~~ {.xfail-test}
 # use task::{spawn};
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
 # fn some_expensive_computation() -> int { 42 }
 let (port, chan) = stream();
 
@@ -245,7 +245,7 @@ Instead we can use a `SharedChan`, a type that allows a single
 
 ~~~
 # use task::spawn;
-use pipes::{stream, SharedChan};
+use comm::{stream, SharedChan};
 
 let (port, chan) = stream();
 let chan = SharedChan(chan);
@@ -278,7 +278,7 @@ might look like the example below.
 
 ~~~
 # use task::spawn;
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
 
 // Create a vector of ports, one for each child task
 let ports = do vec::from_fn(3) |init_val| {
@@ -393,7 +393,7 @@ internally, with additional logic to wait for the child task to finish
 before returning. Hence:
 
 ~~~
-# use pipes::{stream, Chan, Port};
+# use comm::{stream, Chan, Port};
 # use task::{spawn, try};
 # fn sleep_forever() { loop { task::yield() } }
 # do task::try {
@@ -468,7 +468,7 @@ Here is the function that implements the child task:
 
 ~~~~
 # use std::comm::DuplexStream;
-# use pipes::{Port, Chan};
+# use comm::{Port, Chan};
 fn stringifier(channel: &DuplexStream<~str, uint>) {
     let mut value: uint;
     loop {
@@ -491,7 +491,7 @@ Here is the code for the parent task:
 
 ~~~~
 # use std::comm::DuplexStream;
-# use pipes::{Port, Chan};
+# use comm::{Port, Chan};
 # use task::spawn;
 # fn stringifier(channel: &DuplexStream<~str, uint>) {
 #     let mut value: uint;
index 432258b26a638797a38f9f15154e57cf411487ef..6c8bd7ea44269151e1d49fc42a5f4f0c1ebf6dab 100644 (file)
@@ -76,7 +76,7 @@ pub fn run(lib_path: ~str,
 
 
     writeclose(pipe_in.out, input);
-    let p = pipes::PortSet();
+    let p = comm::PortSet();
     let ch = p.chan();
     do task::spawn_sched(task::SingleThreaded) || {
         let errput = readclose(pipe_err.in);
diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs
new file mode 100644 (file)
index 0000000..7939644
--- /dev/null
@@ -0,0 +1,410 @@
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+// Transitional -- needs snapshot
+#[allow(structural_records)];
+
+use either::{Either, Left, Right};
+use kinds::Owned;
+use option;
+use option::{Option, Some, None, unwrap};
+use private;
+use vec;
+
+use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
+
+// NOTE Making this public exposes some plumbing from pipes. Needs
+// some refactoring
+pub use pipes::Selectable;
+
+/// A trait for things that can send multiple messages.
+pub trait GenericChan<T> {
+    /// Sends a message.
+    fn send(x: T);
+}
+
+/// Things that can send multiple messages and can detect when the receiver
+/// is closed
+pub trait GenericSmartChan<T> {
+    /// Sends a message, or report if the receiver has closed the connection.
+    fn try_send(x: T) -> bool;
+}
+
+/// A trait for things that can receive multiple messages.
+pub trait GenericPort<T> {
+    /// Receives a message, or fails if the connection closes.
+    fn recv() -> T;
+
+    /** Receives a message, or returns `none` if
+    the connection is closed or closes.
+    */
+    fn try_recv() -> Option<T>;
+}
+
+/// Ports that can `peek`
+pub trait Peekable<T> {
+    /// Returns true if a message is available
+    pure fn peek() -> bool;
+}
+
+/// Returns the index of an endpoint that is ready to receive.
+pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint {
+    wait_many(endpoints)
+}
+
+/// Returns 0 or 1 depending on which endpoint is ready to receive
+pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) ->
+        Either<(), ()> {
+    match wait_many([a.header(), b.header()]) {
+      0 => Left(()),
+      1 => Right(()),
+      _ => fail!(~"wait returned unexpected index")
+    }
+}
+
+// Streams - Make pipes a little easier in general.
+
+proto! streamp (
+    Open:send<T: Owned> {
+        data(T) -> Open<T>
+    }
+)
+
+#[doc(hidden)]
+struct Chan_<T> {
+    mut endp: Option<streamp::client::Open<T>>
+}
+
+/// An endpoint that can send many messages.
+pub enum Chan<T> {
+    Chan_(Chan_<T>)
+}
+
+struct Port_<T> {
+    mut endp: Option<streamp::server::Open<T>>,
+}
+
+/// An endpoint that can receive many messages.
+pub enum Port<T> {
+    Port_(Port_<T>)
+}
+
+/** Creates a `(chan, port)` pair.
+
+These allow sending or receiving an unlimited number of messages.
+
+*/
+pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
+    let (c, s) = streamp::init();
+
+    (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
+}
+
+impl<T: Owned> GenericChan<T> for Chan<T> {
+    fn send(x: T) {
+        let mut endp = None;
+        endp <-> self.endp;
+        self.endp = Some(
+            streamp::client::data(unwrap(endp), x))
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+
+    fn try_send(x: T) -> bool {
+        let mut endp = None;
+        endp <-> self.endp;
+        match streamp::client::try_data(unwrap(endp), x) {
+            Some(next) => {
+                self.endp = Some(next);
+                true
+            }
+            None => false
+        }
+    }
+}
+
+impl<T: Owned> GenericPort<T> for Port<T> {
+    fn recv() -> T {
+        let mut endp = None;
+        endp <-> self.endp;
+        let streamp::data(x, endp) = recv(unwrap(endp));
+        self.endp = Some(endp);
+        x
+    }
+
+    fn try_recv() -> Option<T> {
+        let mut endp = None;
+        endp <-> self.endp;
+        match try_recv(unwrap(endp)) {
+          Some(streamp::data(x, endp)) => {
+            self.endp = Some(endp);
+            Some(x)
+          }
+          None => None
+        }
+    }
+}
+
+impl<T: Owned> Peekable<T> for Port<T> {
+    pure fn peek() -> bool {
+        unsafe {
+            let mut endp = None;
+            endp <-> self.endp;
+            let peek = match &endp {
+              &Some(ref endp) => peek(endp),
+              &None => fail!(~"peeking empty stream")
+            };
+            self.endp <-> endp;
+            peek
+        }
+    }
+}
+
+impl<T: Owned> Selectable for Port<T> {
+    pure fn header() -> *PacketHeader {
+        unsafe {
+            match self.endp {
+              Some(ref endp) => endp.header(),
+              None => fail!(~"peeking empty stream")
+            }
+        }
+    }
+}
+
+/// Treat many ports as one.
+pub struct PortSet<T> {
+    mut ports: ~[Port<T>],
+}
+
+pub fn PortSet<T: Owned>() -> PortSet<T>{
+    PortSet {
+        ports: ~[]
+    }
+}
+
+impl<T: Owned> PortSet<T> {
+
+    fn add(port: Port<T>) {
+        self.ports.push(port)
+    }
+
+    fn chan() -> Chan<T> {
+        let (po, ch) = stream();
+        self.add(po);
+        ch
+    }
+}
+
+impl<T: Owned> GenericPort<T> for PortSet<T> {
+
+    fn try_recv() -> Option<T> {
+        let mut result = None;
+        // we have to swap the ports array so we aren't borrowing
+        // aliasable mutable memory.
+        let mut ports = ~[];
+        ports <-> self.ports;
+        while result.is_none() && ports.len() > 0 {
+            let i = wait_many(ports);
+            match ports[i].try_recv() {
+                Some(m) => {
+                  result = Some(m);
+                }
+                None => {
+                    // Remove this port.
+                    let _ = ports.swap_remove(i);
+                }
+            }
+        }
+        ports <-> self.ports;
+        result
+    }
+
+    fn recv() -> T {
+        self.try_recv().expect("port_set: endpoints closed")
+    }
+
+}
+
+impl<T: Owned> Peekable<T> for PortSet<T> {
+    pure fn peek() -> bool {
+        // It'd be nice to use self.port.each, but that version isn't
+        // pure.
+        for vec::each(self.ports) |p| {
+            if p.peek() { return true }
+        }
+        false
+    }
+}
+
+/// A channel that can be shared between many senders.
+pub type SharedChan<T> = private::Exclusive<Chan<T>>;
+
+impl<T: Owned> GenericChan<T> for SharedChan<T> {
+    fn send(x: T) {
+        let mut xx = Some(x);
+        do self.with_imm |chan| {
+            let mut x = None;
+            x <-> xx;
+            chan.send(option::unwrap(x))
+        }
+    }
+}
+
+impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
+    fn try_send(x: T) -> bool {
+        let mut xx = Some(x);
+        do self.with_imm |chan| {
+            let mut x = None;
+            x <-> xx;
+            chan.try_send(option::unwrap(x))
+        }
+    }
+}
+
+/// Converts a `chan` into a `shared_chan`.
+pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
+    private::exclusive(c)
+}
+
+/// Receive a message from one of two endpoints.
+pub trait Select2<T: Owned, U: Owned> {
+    /// Receive a message or return `None` if a connection closes.
+    fn try_select() -> Either<Option<T>, Option<U>>;
+    /// Receive a message or fail if a connection closes.
+    fn select() -> Either<T, U>;
+}
+
+impl<T: Owned, U: Owned,
+     Left: Selectable + GenericPort<T>,
+     Right: Selectable + GenericPort<U>>
+    Select2<T, U> for (Left, Right) {
+
+    fn select() -> Either<T, U> {
+        match self {
+          (ref lp, ref rp) => match select2i(lp, rp) {
+            Left(()) => Left (lp.recv()),
+            Right(()) => Right(rp.recv())
+          }
+        }
+    }
+
+    fn try_select() -> Either<Option<T>, Option<U>> {
+        match self {
+          (ref lp, ref rp) => match select2i(lp, rp) {
+            Left(()) => Left (lp.try_recv()),
+            Right(()) => Right(rp.try_recv())
+          }
+        }
+    }
+}
+
+proto! oneshot (
+    Oneshot:send<T:Owned> {
+        send(T) -> !
+    }
+)
+
+/// The send end of a oneshot pipe.
+pub type ChanOne<T> = oneshot::client::Oneshot<T>;
+/// The receive end of a oneshot pipe.
+pub type PortOne<T> = oneshot::server::Oneshot<T>;
+
+/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
+pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+    let (chan, port) = oneshot::init();
+    (port, chan)
+}
+
+impl<T: Owned> PortOne<T> {
+    fn recv(self) -> T { recv_one(self) }
+    fn try_recv(self) -> Option<T> { try_recv_one(self) }
+}
+
+impl<T: Owned> ChanOne<T> {
+    fn send(self, data: T) { send_one(self, data) }
+    fn try_send(self, data: T) -> bool { try_send_one(self, data) }
+}
+
+/**
+ * Receive a message from a oneshot pipe, failing if the connection was
+ * closed.
+ */
+pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
+    let oneshot::send(message) = recv(port);
+    message
+}
+
+/// Receive a message from a oneshot pipe unless the connection was closed.
+pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> {
+    let message = try_recv(port);
+
+    if message.is_none() { None }
+    else {
+        let oneshot::send(message) = option::unwrap(message);
+        Some(message)
+    }
+}
+
+/// Send a message on a oneshot pipe, failing if the connection was closed.
+pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
+    oneshot::client::send(chan, data);
+}
+
+/**
+ * Send a message on a oneshot pipe, or return false if the connection was
+ * closed.
+ */
+pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T)
+        -> bool {
+    oneshot::client::try_send(chan, data).is_some()
+}
+
+#[cfg(test)]
+pub mod test {
+    use either::{Either, Left, Right};
+    use super::{Chan, Port, oneshot, recv_one, stream};
+
+    #[test]
+    pub fn test_select2() {
+        let (p1, c1) = stream();
+        let (p2, c2) = stream();
+
+        c1.send(~"abc");
+
+        match (p1, p2).select() {
+          Right(_) => fail!(),
+          _ => ()
+        }
+
+        c2.send(123);
+    }
+
+    #[test]
+    pub fn test_oneshot() {
+        let (c, p) = oneshot::init();
+
+        oneshot::client::send(c, ());
+
+        recv_one(p)
+    }
+
+    #[test]
+    fn test_peek_terminated() {
+        let (port, chan): (Port<int>, Chan<int>) = stream();
+
+        {
+            // Destroy the channel
+            let _chan = chan;
+        }
+
+        assert !port.peek();
+    }
+}
index eab66bc0e3776c433eb79f733d10061b72d5d927..01669557389ae575dc893296c9dce9b33cfc0af5 100644 (file)
@@ -148,6 +148,7 @@ pub mod hashmap;
 
 #[path = "task/mod.rs"]
 pub mod task;
+pub mod comm;
 pub mod pipes;
 
 
@@ -255,6 +256,7 @@ pub mod core {
     pub use option;
     pub use kinds;
     pub use sys;
+    pub use pipes;
 }
 
 
index 9d4cadff08a2288d365a97e8e258cd890fbad311..94c0a567f4c0954ec36a89e604b306df0c83490b 100644 (file)
@@ -142,7 +142,7 @@ pub struct Buffer<T> {
     data: T,
 }
 
-struct PacketHeader {
+pub struct PacketHeader {
     mut state: State,
     mut blocked_task: *rust_task,
 
@@ -151,7 +151,7 @@ struct PacketHeader {
     mut buffer: *libc::c_void,
 }
 
-fn PacketHeader() -> PacketHeader {
+pub fn PacketHeader() -> PacketHeader {
     PacketHeader {
         state: Empty,
         blocked_task: ptr::null(),
@@ -159,7 +159,7 @@ fn PacketHeader() -> PacketHeader {
     }
 }
 
-impl PacketHeader {
+pub impl PacketHeader {
     // Returns the old state.
     unsafe fn mark_blocked(this: *rust_task) -> State {
         rustrt::rust_task_ref(this);
@@ -551,12 +551,6 @@ struct DropState {
     }
 }
 
-impl<T:Owned,Tb:Owned> Peekable<T> for RecvPacketBuffered<T, Tb> {
-    pure fn peek() -> bool {
-        peek(&self)
-    }
-}
-
 #[doc(hidden)]
 fn sender_terminate<T:Owned>(p: *Packet<T>) {
     let p = unsafe { &*p };
@@ -622,7 +616,7 @@ fn receiver_terminate<T:Owned>(p: *Packet<T>) {
 closed by the sender or has a message waiting to be received.
 
 */
-fn wait_many<T:Selectable>(pkts: &[T]) -> uint {
+pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
     let this = unsafe { rustrt::rust_get_task() };
 
     unsafe {
@@ -720,7 +714,7 @@ pub fn select2<A:Owned,Ab:Owned,B:Owned,Bb:Owned>(
 }
 
 #[doc(hidden)]
-trait Selectable {
+pub trait Selectable {
     pure fn header() -> *PacketHeader;
 }
 
@@ -957,335 +951,6 @@ pub fn spawn_service_recv<T:Owned,Tb:Owned>(
     client
 }
 
-// Streams - Make pipes a little easier in general.
-
-proto! streamp (
-    Open:send<T:Owned> {
-        data(T) -> Open<T>
-    }
-)
-
-/// A trait for things that can send multiple messages.
-pub trait GenericChan<T> {
-    /// Sends a message.
-    fn send(x: T);
-}
-
-/// Things that can send multiple messages and can detect when the receiver
-/// is closed
-pub trait GenericSmartChan<T> {
-    /// Sends a message, or report if the receiver has closed the connection.
-    fn try_send(x: T) -> bool;
-}
-
-/// A trait for things that can receive multiple messages.
-pub trait GenericPort<T> {
-    /// Receives a message, or fails if the connection closes.
-    fn recv() -> T;
-
-    /** Receives a message, or returns `none` if
-    the connection is closed or closes.
-    */
-    fn try_recv() -> Option<T>;
-}
-
-/// Ports that can `peek`
-pub trait Peekable<T> {
-    /// Returns true if a message is available
-    pure fn peek() -> bool;
-}
-
-#[doc(hidden)]
-struct Chan_<T> {
-    mut endp: Option<streamp::client::Open<T>>
-}
-
-/// An endpoint that can send many messages.
-pub enum Chan<T> {
-    Chan_(Chan_<T>)
-}
-
-#[doc(hidden)]
-struct Port_<T> {
-    mut endp: Option<streamp::server::Open<T>>,
-}
-
-/// An endpoint that can receive many messages.
-pub enum Port<T> {
-    Port_(Port_<T>)
-}
-
-/** Creates a `(chan, port)` pair.
-
-These allow sending or receiving an unlimited number of messages.
-
-*/
-pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
-    let (c, s) = streamp::init();
-
-    (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
-}
-
-impl<T:Owned> GenericChan<T> for Chan<T> {
-    fn send(x: T) {
-        let mut endp = None;
-        endp <-> self.endp;
-        self.endp = Some(
-            streamp::client::data(unwrap(endp), x))
-    }
-}
-
-impl<T:Owned> GenericSmartChan<T> for Chan<T> {
-
-    fn try_send(x: T) -> bool {
-        let mut endp = None;
-        endp <-> self.endp;
-        match streamp::client::try_data(unwrap(endp), x) {
-            Some(next) => {
-                self.endp = Some(next);
-                true
-            }
-            None => false
-        }
-    }
-}
-
-impl<T:Owned> GenericPort<T> for Port<T> {
-    fn recv() -> T {
-        let mut endp = None;
-        endp <-> self.endp;
-        let streamp::data(x, endp) = pipes::recv(unwrap(endp));
-        self.endp = Some(endp);
-        x
-    }
-
-    fn try_recv() -> Option<T> {
-        let mut endp = None;
-        endp <-> self.endp;
-        match pipes::try_recv(unwrap(endp)) {
-          Some(streamp::data(x, endp)) => {
-            self.endp = Some(endp);
-            Some(x)
-          }
-          None => None
-        }
-    }
-}
-
-impl<T:Owned> Peekable<T> for Port<T> {
-    pure fn peek() -> bool {
-        unsafe {
-            let mut endp = None;
-            endp <-> self.endp;
-            let peek = match &endp {
-              &Some(ref endp) => pipes::peek(endp),
-              &None => fail!(~"peeking empty stream")
-            };
-            self.endp <-> endp;
-            peek
-        }
-    }
-}
-
-impl<T:Owned> Selectable for Port<T> {
-    pure fn header() -> *PacketHeader {
-        unsafe {
-            match self.endp {
-              Some(ref endp) => endp.header(),
-              None => fail!(~"peeking empty stream")
-            }
-        }
-    }
-}
-
-/// Treat many ports as one.
-pub struct PortSet<T> {
-    mut ports: ~[pipes::Port<T>],
-}
-
-pub fn PortSet<T:Owned>() -> PortSet<T>{
-    PortSet {
-        ports: ~[]
-    }
-}
-
-impl<T:Owned> PortSet<T> {
-
-    fn add(port: pipes::Port<T>) {
-        self.ports.push(port)
-    }
-
-    fn chan() -> Chan<T> {
-        let (po, ch) = stream();
-        self.add(po);
-        ch
-    }
-}
-
-impl<T:Owned> GenericPort<T> for PortSet<T> {
-
-    fn try_recv() -> Option<T> {
-        let mut result = None;
-        // we have to swap the ports array so we aren't borrowing
-        // aliasable mutable memory.
-        let mut ports = ~[];
-        ports <-> self.ports;
-        while result.is_none() && ports.len() > 0 {
-            let i = wait_many(ports);
-            match ports[i].try_recv() {
-                Some(m) => {
-                  result = Some(m);
-                }
-                None => {
-                    // Remove this port.
-                    let _ = ports.swap_remove(i);
-                }
-            }
-        }
-        ports <-> self.ports;
-        result
-    }
-
-    fn recv() -> T {
-        self.try_recv().expect("port_set: endpoints closed")
-    }
-
-}
-
-impl<T:Owned> Peekable<T> for PortSet<T> {
-    pure fn peek() -> bool {
-        // It'd be nice to use self.port.each, but that version isn't
-        // pure.
-        for vec::each(self.ports) |p| {
-            if p.peek() { return true }
-        }
-        false
-    }
-}
-
-/// A channel that can be shared between many senders.
-pub type SharedChan<T> = private::Exclusive<Chan<T>>;
-
-impl<T:Owned> GenericChan<T> for SharedChan<T> {
-    fn send(x: T) {
-        let mut xx = Some(x);
-        do self.with_imm |chan| {
-            let mut x = None;
-            x <-> xx;
-            chan.send(option::unwrap(x))
-        }
-    }
-}
-
-impl<T:Owned> GenericSmartChan<T> for SharedChan<T> {
-    fn try_send(x: T) -> bool {
-        let mut xx = Some(x);
-        do self.with_imm |chan| {
-            let mut x = None;
-            x <-> xx;
-            chan.try_send(option::unwrap(x))
-        }
-    }
-}
-
-/// Converts a `chan` into a `shared_chan`.
-pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
-    private::exclusive(c)
-}
-
-/// Receive a message from one of two endpoints.
-pub trait Select2<T:Owned,U:Owned> {
-    /// Receive a message or return `None` if a connection closes.
-    fn try_select() -> Either<Option<T>, Option<U>>;
-    /// Receive a message or fail if a connection closes.
-    fn select() -> Either<T, U>;
-}
-
-impl<T: Owned,
-     U: Owned,
-     Left: Selectable + GenericPort<T>,
-     Right: Selectable + GenericPort<U>>
-     Select2<T,U> for (Left, Right) {
-    fn select() -> Either<T, U> {
-        match self {
-          (ref lp, ref rp) => match select2i(lp, rp) {
-            Left(()) => Left (lp.recv()),
-            Right(()) => Right(rp.recv())
-          }
-        }
-    }
-
-    fn try_select() -> Either<Option<T>, Option<U>> {
-        match self {
-          (ref lp, ref rp) => match select2i(lp, rp) {
-            Left(()) => Left (lp.try_recv()),
-            Right(()) => Right(rp.try_recv())
-          }
-        }
-    }
-}
-
-proto! oneshot (
-    Oneshot:send<T:Owned> {
-        send(T) -> !
-    }
-)
-
-/// The send end of a oneshot pipe.
-pub type ChanOne<T> = oneshot::client::Oneshot<T>;
-/// The receive end of a oneshot pipe.
-pub type PortOne<T> = oneshot::server::Oneshot<T>;
-
-/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
-pub fn oneshot<T:Owned>() -> (PortOne<T>, ChanOne<T>) {
-    let (chan, port) = oneshot::init();
-    (port, chan)
-}
-
-impl<T:Owned> PortOne<T> {
-    fn recv(self) -> T { recv_one(self) }
-    fn try_recv(self) -> Option<T> { try_recv_one(self) }
-}
-
-impl<T:Owned> ChanOne<T> {
-    fn send(self, data: T) { send_one(self, data) }
-    fn try_send(self, data: T) -> bool { try_send_one(self, data) }
-}
-
-/**
- * Receive a message from a oneshot pipe, failing if the connection was
- * closed.
- */
-pub fn recv_one<T:Owned>(port: PortOne<T>) -> T {
-    let oneshot::send(message) = recv(port);
-    message
-}
-
-/// Receive a message from a oneshot pipe unless the connection was closed.
-pub fn try_recv_one<T:Owned> (port: PortOne<T>) -> Option<T> {
-    let message = try_recv(port);
-
-    if message.is_none() { None }
-    else {
-        let oneshot::send(message) = option::unwrap(message);
-        Some(message)
-    }
-}
-
-/// Send a message on a oneshot pipe, failing if the connection was closed.
-pub fn send_one<T:Owned>(chan: ChanOne<T>, data: T) {
-    oneshot::client::send(chan, data);
-}
-
-/**
- * Send a message on a oneshot pipe, or return false if the connection was
- * closed.
- */
-pub fn try_send_one<T:Owned>(chan: ChanOne<T>, data: T)
-        -> bool {
-    oneshot::client::try_send(chan, data).is_some()
-}
-
 pub mod rt {
     use option::{None, Option, Some};
 
@@ -1298,13 +963,13 @@ pub fn make_none<T>() -> Option<T> { None }
 #[cfg(test)]
 pub mod test {
     use either::{Either, Left, Right};
-    use pipes::{Chan, Port, oneshot, recv_one, stream};
-    use pipes;
+    use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
+               GenericPort, GenericChan, Peekable};
 
     #[test]
     pub fn test_select2() {
-        let (p1, c1) = pipes::stream();
-        let (p2, c2) = pipes::stream();
+        let (p1, c1) = stream();
+        let (p2, c2) = stream();
 
         c1.send(~"abc");
 
index 1b2bfef5ecd453d44b27d8ea0edce0e109e35953..d0a16f7875b5cfb1deb6b25f0bc07eb06c245b79 100644 (file)
@@ -68,7 +68,7 @@
 pub use option;
 pub use os;
 pub use path;
-pub use pipes;
+pub use comm;
 pub use private;
 pub use ptr;
 pub use rand;
index a2656a9f73b945051051a7ab754a6f9c50a068ca..2738e5564fc18e84ef577d6409b8c12dbc78a3b5 100644 (file)
@@ -14,7 +14,7 @@
 use iter;
 use libc;
 use option;
-use pipes::{GenericChan, GenericPort};
+use comm::{GenericChan, GenericPort};
 use prelude::*;
 use ptr;
 use result;
@@ -59,7 +59,7 @@
 a normal large stack.
 */
 pub unsafe fn run_in_bare_thread(f: ~fn()) {
-    let (port, chan) = pipes::stream();
+    let (port, chan) = comm::stream();
     // FIXME #4525: Unfortunate that this creates an extra scheduler but it's
     // necessary since rust_raw_thread_join_delete is blocking
     do task::spawn_sched(task::SingleThreaded) {
@@ -110,7 +110,7 @@ fn compare_and_swap(address: &mut int, oldval: int, newval: int) -> bool {
 // An unwrapper uses this protocol to communicate with the "other" task that
 // drops the last refcount on an arc. Unfortunately this can't be a proper
 // pipe protocol because the unwrapper has to access both stages at once.
-type UnwrapProto = ~mut Option<(pipes::ChanOne<()>,  pipes::PortOne<bool>)>;
+type UnwrapProto = ~mut Option<(comm::ChanOne<()>,  comm::PortOne<bool>)>;
 
 struct ArcData<T> {
     mut count:     libc::intptr_t,
@@ -143,9 +143,9 @@ struct ArcDestruct<T> {
                             cast::reinterpret_cast(&data.unwrapper);
                         let (message, response) = option::swap_unwrap(p);
                         // Send 'ready' and wait for a response.
-                        pipes::send_one(message, ());
+                        comm::send_one(message, ());
                         // Unkillable wait. Message guaranteed to come.
-                        if pipes::recv_one(response) {
+                        if comm::recv_one(response) {
                             // Other task got the data.
                             cast::forget(data);
                         } else {
@@ -172,7 +172,7 @@ pub unsafe fn unwrap_shared_mutable_state<T:Owned>(rc: SharedMutableState<T>)
         -> T {
     struct DeathThroes<T> {
         mut ptr:      Option<~ArcData<T>>,
-        mut response: Option<pipes::ChanOne<bool>>,
+        mut response: Option<comm::ChanOne<bool>>,
         drop {
             unsafe {
                 let response = option::swap_unwrap(&mut self.response);
@@ -180,13 +180,13 @@ struct DeathThroes<T> {
                 // tried to wake us whether they should hand-off the data to
                 // us.
                 if task::failing() {
-                    pipes::send_one(response, false);
+                    comm::send_one(response, false);
                     // Either this swap_unwrap or the one below (at "Got
                     // here") ought to run.
                     cast::forget(option::swap_unwrap(&mut self.ptr));
                 } else {
                     assert self.ptr.is_none();
-                    pipes::send_one(response, true);
+                    comm::send_one(response, true);
                 }
             }
         }
@@ -194,8 +194,8 @@ struct DeathThroes<T> {
 
     do task::unkillable {
         let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
-        let (p1,c1) = pipes::oneshot(); // ()
-        let (p2,c2) = pipes::oneshot(); // bool
+        let (p1,c1) = comm::oneshot(); // ()
+        let (p2,c2) = comm::oneshot(); // bool
         let server: UnwrapProto = ~mut Some((c1,p2));
         let serverp: int = cast::transmute(server);
         // Try to put our server end in the unwrapper slot.
@@ -218,7 +218,7 @@ struct DeathThroes<T> {
                                   response: Some(c2) };
                 let mut p1 = Some(p1); // argh
                 do task::rekillable {
-                    pipes::recv_one(option::swap_unwrap(&mut p1));
+                    comm::recv_one(option::swap_unwrap(&mut p1));
                 }
                 // Got here. Back in the 'unkillable' without getting killed.
                 // Recover ownership of ptr, then take the data out.
@@ -410,7 +410,7 @@ pub mod tests {
     use core::option::{None, Some};
 
     use option;
-    use pipes;
+    use comm;
     use private::{exclusive, unwrap_exclusive};
     use result;
     use task;
@@ -427,7 +427,7 @@ pub fn exclusive_arc() {
 
         for uint::range(0, num_tasks) |_i| {
             let total = total.clone();
-            let (port, chan) = pipes::stream();
+            let (port, chan) = comm::stream();
             futures.push(port);
 
             do task::spawn || {
index f285f811f15d077c948943c487bdc511211b6756..f3df8ce72f146cd0b81f773355dce6720b8a77db 100644 (file)
@@ -22,8 +22,8 @@
 use private::at_exit::at_exit;
 use private::global::global_data_clone_create;
 use private::finally::Finally;
-use pipes::{Port, Chan, SharedChan, GenericChan, GenericPort,
-            GenericSmartChan, stream};
+use comm::{Port, Chan, SharedChan, GenericChan,
+           GenericPort, GenericSmartChan, stream};
 use task::{Task, task, spawn};
 use task::rt::{task_id, get_task_id};
 use hashmap::linear::LinearMap;
@@ -186,7 +186,7 @@ fn test_wait_for_signal_many() {
 
 #[test]
 fn test_select_stream_and_oneshot() {
-    use pipes::select2i;
+    use comm::select2i;
     use either::{Left, Right};
 
     let (port, chan) = stream();
index 5103025d1205193b1b58a50030163c53a1900ba3..4e2337b83313348ef63b8237286502c529b7884a 100644 (file)
@@ -14,7 +14,7 @@
 use io::ReaderUtil;
 use libc;
 use libc::{pid_t, c_void, c_int};
-use pipes::{stream, SharedChan, GenericChan, GenericPort};
+use comm::{stream, SharedChan, GenericChan, GenericPort};
 use option::{Some, None};
 use os;
 use prelude::*;
index 54dfa7459a1e7b94222388576b39784ffa1f1193..336e686193b91b3ff31792acbe61b97bd1e3a68c 100644 (file)
@@ -40,7 +40,7 @@
 use libc;
 use option;
 use result::Result;
-use pipes::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
+use comm::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
 use pipes;
 use prelude::*;
 use ptr;
@@ -1109,7 +1109,7 @@ fn test_unkillable() {
 #[ignore(cfg(windows))]
 #[should_fail]
 fn test_unkillable_nested() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
 
     // We want to do this after failing
     do spawn_unlinked || {
@@ -1175,7 +1175,7 @@ fn child_no(x: uint) -> fn~() {
 
 #[test]
 fn test_sched_thread_per_core() {
-    let (port, chan) = pipes::stream();
+    let (port, chan) = comm::stream();
 
     do spawn_sched(ThreadPerCore) || {
         unsafe {
@@ -1191,7 +1191,7 @@ fn test_sched_thread_per_core() {
 
 #[test]
 fn test_spawn_thread_on_demand() {
-    let (port, chan) = pipes::stream();
+    let (port, chan) = comm::stream();
 
     do spawn_sched(ManualThreads(2)) || {
         unsafe {
@@ -1200,7 +1200,7 @@ fn test_spawn_thread_on_demand() {
             let running_threads = rt::rust_sched_current_nonlazy_threads();
             assert(running_threads as int == 1);
 
-            let (port2, chan2) = pipes::stream();
+            let (port2, chan2) = comm::stream();
 
             do spawn_sched(CurrentScheduler) || {
                 chan2.send(());
index d72cacc2c4be7b71c4190c9ff788c3c565484c3f..e77af820079b6acd503e3b0e0bc8fc784add7efa 100644 (file)
@@ -75,7 +75,7 @@
 use cast;
 use container::Map;
 use option;
-use pipes::{Chan, GenericChan, GenericPort, Port, stream};
+use comm::{Chan, GenericChan, GenericPort, Port, stream};
 use pipes;
 use prelude::*;
 use private;
@@ -702,7 +702,7 @@ fn test_spawn_raw_unsupervise() {
 #[test]
 #[ignore(cfg(windows))]
 fn test_spawn_raw_notify_success() {
-    let (notify_po, notify_ch) = pipes::stream();
+    let (notify_po, notify_ch) = comm::stream();
 
     let opts = task::TaskOpts {
         notify_chan: Some(notify_ch),
@@ -717,7 +717,7 @@ fn test_spawn_raw_notify_success() {
 #[ignore(cfg(windows))]
 fn test_spawn_raw_notify_failure() {
     // New bindings for these
-    let (notify_po, notify_ch) = pipes::stream();
+    let (notify_po, notify_ch) = comm::stream();
 
     let opts = task::TaskOpts {
         linked: false,
index 93bc0dc0a2cda9c643254f5f138cb0d64d6a10ce..01758a1845d7c5664532d78750a272497607ed0a 100644 (file)
@@ -314,7 +314,7 @@ fails without recording a fatal error then we've encountered a compiler
 bug and need to present an error.
 */
 pub fn monitor(+f: fn~(diagnostic::Emitter)) {
-    use core::pipes::*;
+    use core::comm::*;
     use std::cell::Cell;
     let (p, ch) = stream();
     let ch = SharedChan(ch);
index f34a7ffbbdbcf9a3c10abfb8627947f0f572eb8f..fff2e189eb85c565aa39e3c5c49cebf27cf70215 100644 (file)
@@ -23,7 +23,7 @@
 use util;
 use std::cell::Cell;
 
-use core::pipes::{stream, Chan, SharedChan, Port};
+use core::comm::{stream, Chan, SharedChan, Port};
 use core::vec;
 use core::ops::Drop;
 use rustc::back::link;
index a6cc517079693efb49f04e8e621b848ff36ed083..45a8aa9fd2920c112e3c9fee0b5e0fd806d656fe 100644 (file)
 use core::io;
 use core::libc;
 use core::os;
-use core::pipes;
+use core::comm;
 use core::result;
 use core::run;
 use core::str;
 use core::task;
-use core::pipes::*;
+use core::comm::*;
 use std::future;
 use syntax;
 
@@ -128,12 +128,12 @@ fn pandoc_writer(
         os::close(pipe_err.out);
         os::close(pipe_in.out);
 
-        let (stdout_po, stdout_ch) = pipes::stream();
+        let (stdout_po, stdout_ch) = comm::stream();
         do task::spawn_sched(task::SingleThreaded) || {
             stdout_ch.send(readclose(pipe_out.in));
         }
 
-        let (stderr_po, stderr_ch) = pipes::stream();
+        let (stderr_po, stderr_ch) = comm::stream();
         do task::spawn_sched(task::SingleThreaded) || {
             stderr_ch.send(readclose(pipe_err.in));
         }
@@ -296,7 +296,7 @@ pub fn future_writer_factory(
     let (markdown_po, markdown_ch) = stream();
     let markdown_ch = SharedChan(markdown_ch);
     let writer_factory = fn~(page: doc::Page) -> Writer {
-        let (writer_po, writer_ch) = pipes::stream();
+        let (writer_po, writer_ch) = comm::stream();
         let markdown_ch = markdown_ch.clone();
         do task::spawn || {
             let (writer, future) = future_writer();
@@ -311,7 +311,7 @@ pub fn future_writer_factory(
 }
 
 fn future_writer() -> (Writer, future::Future<~str>) {
-    let (port, chan) = pipes::stream();
+    let (port, chan) = comm::stream();
     let writer = fn~(instr: WriteInstr) {
         chan.send(copy instr);
     };
index 4971806c7ed8102059c015d3c8ac5b0279478d5d..2a2c388864723d43826f31da005ddf4653b0f912 100644 (file)
@@ -30,7 +30,7 @@
 
 use core::option;
 use core::vec;
-use core::pipes::*;
+use core::comm::*;
 use syntax::ast;
 
 pub fn mk_pass(output_style: config::OutputStyle) -> Pass {
index 50f40559807ab36b71f215097cb58ce11753db9b..61b5ffd845f892fe2d32db9f7fdd8fbf44fcb5a6 100644 (file)
@@ -507,10 +507,10 @@ pub fn manually_share_arc() {
         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
         let arc_v = arc::ARC(v);
 
-        let (p, c) = pipes::stream();
+        let (p, c) = comm::stream();
 
         do task::spawn() || {
-            let p = pipes::PortSet();
+            let p = comm::PortSet();
             c.send(p.chan());
 
             let arc_v = p.recv();
@@ -531,18 +531,18 @@ pub fn manually_share_arc() {
     pub fn test_mutex_arc_condvar() {
         let arc = ~MutexARC(false);
         let arc2 = ~arc.clone();
-        let (p,c) = pipes::oneshot();
+        let (p,c) = comm::oneshot();
         let (c,p) = (~mut Some(c), ~mut Some(p));
         do task::spawn || {
             // wait until parent gets in
-            pipes::recv_one(option::swap_unwrap(p));
+            comm::recv_one(option::swap_unwrap(p));
             do arc2.access_cond |state, cond| {
                 *state = true;
                 cond.signal();
             }
         }
         do arc.access_cond |state, cond| {
-            pipes::send_one(option::swap_unwrap(c), ());
+            comm::send_one(option::swap_unwrap(c), ());
             assert !*state;
             while !*state {
                 cond.wait();
@@ -553,7 +553,7 @@ pub fn test_mutex_arc_condvar() {
     pub fn test_arc_condvar_poison() {
         let arc = ~MutexARC(1);
         let arc2 = ~arc.clone();
-        let (p, c) = pipes::stream();
+        let (p, c) = comm::stream();
 
         do task::spawn_unlinked || {
             let _ = p.recv();
@@ -587,7 +587,7 @@ pub fn test_mutex_arc_poison() {
     pub fn test_mutex_arc_unwrap_poison() {
         let arc = MutexARC(1);
         let arc2 = ~(&arc).clone();
-        let (p, c) = pipes::stream();
+        let (p, c) = comm::stream();
         do task::spawn || {
             do arc2.access |one| {
                 c.send(());
@@ -685,7 +685,7 @@ pub fn test_rw_arc_no_poison_dr() {
     pub fn test_rw_arc() {
         let arc = ~RWARC(0);
         let arc2 = ~arc.clone();
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
 
         do task::spawn || {
             do arc2.write |num| {
@@ -731,7 +731,7 @@ pub fn test_rw_downgrade() {
         // Reader tasks
         let mut reader_convos = ~[];
         for 10.times {
-            let ((rp1,rc1),(rp2,rc2)) = (pipes::stream(),pipes::stream());
+            let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
             reader_convos.push((rc1, rp2));
             let arcn = ~arc.clone();
             do task::spawn || {
@@ -745,7 +745,7 @@ pub fn test_rw_downgrade() {
 
         // Writer task
         let arc2 = ~arc.clone();
-        let ((wp1,wc1),(wp2,wc2)) = (pipes::stream(),pipes::stream());
+        let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
         do task::spawn || {
             wp1.recv();
             do arc2.write_cond |state, cond| {
index 9478a39279673cc21a120d62f2d110d39f90d16d..02875739ebaf933dff268146a845d7c5501b3f54 100644 (file)
@@ -14,8 +14,8 @@
 
 */
 
-use core::pipes::{GenericChan, GenericSmartChan, GenericPort};
-use core::pipes::{Chan, Port, Selectable, Peekable};
+use core::comm::{GenericChan, GenericSmartChan, GenericPort};
+use core::comm::{Chan, Port, Selectable, Peekable};
 use core::pipes;
 use core::prelude::*;
 
@@ -63,8 +63,8 @@ impl<T:Owned,U:Owned> Selectable for DuplexStream<T, U> {
 pub fn DuplexStream<T:Owned,U:Owned>()
     -> (DuplexStream<T, U>, DuplexStream<U, T>)
 {
-    let (p1, c2) = pipes::stream();
-    let (p2, c1) = pipes::stream();
+    let (p1, c2) = comm::stream();
+    let (p2, c1) = comm::stream();
     (DuplexStream {
         chan: c1,
         port: p1
index 80f93323a8ea6938a6b0cf1f6221904b498b38c0..13c0bbe1a674c732822ef503442ca845ba929d99 100644 (file)
@@ -49,8 +49,8 @@
 
 // The basic send/recv interface FlatChan and PortChan will implement
 use core::io;
-use core::pipes::GenericChan;
-use core::pipes::GenericPort;
+use core::comm::GenericChan;
+use core::comm::GenericPort;
 use core::pipes;
 use core::prelude::*;
 use core::sys::size_of;
@@ -95,8 +95,8 @@ pub mod serial {
     use flatpipes::{FlatPort, FlatChan};
 
     use core::io::{Reader, Writer};
-    use core::pipes::{Port, Chan};
-    use core::pipes;
+    use core::comm::{Port, Chan};
+    use core::comm;
 
     pub type ReaderPort<T, R> = FlatPort<
         T, DeserializingUnflattener<DefaultDecoder, T>,
@@ -154,7 +154,7 @@ pub fn pipe_chan<T:Encodable<DefaultEncoder>>(
     pub fn pipe_stream<T: Encodable<DefaultEncoder> +
                           Decodable<DefaultDecoder>>(
                           ) -> (PipePort<T>, PipeChan<T>) {
-        let (port, chan) = pipes::stream();
+        let (port, chan) = comm::stream();
         return (pipe_port(port), pipe_chan(chan));
     }
 }
@@ -177,8 +177,8 @@ pub mod pod {
     use flatpipes::{FlatPort, FlatChan};
 
     use core::io::{Reader, Writer};
-    use core::pipes::{Port, Chan};
-    use core::pipes;
+    use core::comm::{Port, Chan};
+    use core::comm;
     use core::prelude::*;
 
     pub type ReaderPort<T, R> =
@@ -222,7 +222,7 @@ pub fn pipe_chan<T:Copy + Owned>(chan: Chan<~[u8]>) -> PipeChan<T> {
 
     /// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
     pub fn pipe_stream<T:Copy + Owned>() -> (PipePort<T>, PipeChan<T>) {
-        let (port, chan) = pipes::stream();
+        let (port, chan) = comm::stream();
         return (pipe_port(port), pipe_chan(chan));
     }
 
@@ -507,7 +507,7 @@ pub mod bytepipes {
     use flatpipes::{ByteChan, BytePort};
 
     use core::io::{Writer, Reader, ReaderUtil};
-    use core::pipes::{Port, Chan};
+    use core::comm::{Port, Chan};
     use core::pipes;
     use core::prelude::*;
 
@@ -564,12 +564,12 @@ pub impl<W:Writer> WriterByteChan<W> {
     }
 
     pub struct PipeBytePort {
-        port: pipes::Port<~[u8]>,
+        port: comm::Port<~[u8]>,
         mut buf: ~[u8]
     }
 
     pub struct PipeByteChan {
-        chan: pipes::Chan<~[u8]>
+        chan: comm::Chan<~[u8]>
     }
 
     pub impl BytePort for PipeBytePort {
@@ -777,12 +777,12 @@ fn test_some_tcp_stream<U:Unflattener<int>,F:Flattener<int>>(
         use uv;
 
         // Indicate to the client task that the server is listening
-        let (begin_connect_port, begin_connect_chan) = pipes::stream();
+        let (begin_connect_port, begin_connect_chan) = comm::stream();
         // The connection is sent from the server task to the receiver task
         // to handle the connection
-        let (accept_port, accept_chan) = pipes::stream();
+        let (accept_port, accept_chan) = comm::stream();
         // The main task will wait until the test is over to proceed
-        let (finish_port, finish_chan) = pipes::stream();
+        let (finish_port, finish_chan) = comm::stream();
 
         let addr0 = ip::v4::parse_addr("127.0.0.1");
 
@@ -803,7 +803,7 @@ fn test_some_tcp_stream<U:Unflattener<int>,F:Flattener<int>>(
                 }) |new_conn, kill_ch| {
 
                 // Incoming connection. Send it to the receiver task to accept
-                let (res_port, res_chan) = pipes::stream();
+                let (res_port, res_chan) = comm::stream();
                 accept_chan.send((new_conn, res_chan));
                 // Wait until the connection is accepted
                 res_port.recv();
@@ -894,7 +894,7 @@ fn reader_port_loader(bytes: ~[u8]
 
         fn pipe_port_loader(bytes: ~[u8]
                            ) -> pod::PipePort<int> {
-            let (port, chan) = pipes::stream();
+            let (port, chan) = comm::stream();
             if !bytes.is_empty() {
                 chan.send(bytes);
             }
index ff81393a914db527d90d1c8c191f92cd059e27c3..b6b001727a45a8af1723c846bac9558507b8eb95 100644 (file)
@@ -25,7 +25,8 @@
 use core::cast;
 use core::either::Either;
 use core::option;
-use core::pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
+use core::comm::{oneshot, ChanOne, PortOne, send_one, recv_one};
+use core::pipes::recv;
 use core::prelude::*;
 use core::task;
 
@@ -150,7 +151,7 @@ pub mod test {
 
     use future::*;
 
-    use core::pipes::oneshot;
+    use core::comm::oneshot;
     use core::task;
 
     #[test]
index 4a185f68e17f9ee3af8c1c60a885abb5978a2e12..bc17cb0bfe9b6f2b3ee5dcb2dced4c512770bb75 100644 (file)
@@ -12,7 +12,7 @@
 
 use core::libc;
 use core::prelude::*;
-use core::pipes::{stream, SharedChan};
+use core::comm::{stream, SharedChan};
 use core::ptr;
 use core::result;
 use core::str;
index 563bc1c203ae41832b0b47c3e89455f867b4c907..8835cdfb105eb57cc5e91e7fcb217bb865d7315e 100644 (file)
@@ -24,7 +24,7 @@
 use core::io;
 use core::libc::size_t;
 use core::libc;
-use core::pipes::{stream, Chan, Port, SharedChan};
+use core::comm::{stream, Chan, Port, SharedChan};
 use core::prelude::*;
 use core::ptr;
 use core::result::{Result};
@@ -1441,7 +1441,7 @@ pub mod test {
     use uv;
 
     use core::io;
-    use core::pipes::{stream, Chan, Port, SharedChan};
+    use core::comm::{stream, Chan, Port, SharedChan};
     use core::prelude::*;
     use core::result;
     use core::str;
index 66d17392417888cff9f1dcae8226a13384c924b5..016847a5bfd7d9b303db6186033c44547e6ef740 100644 (file)
 
 // Each waiting task receives on one of these.
 #[doc(hidden)]
-type WaitEnd = pipes::PortOne<()>;
+type WaitEnd = comm::PortOne<()>;
 #[doc(hidden)]
-type SignalEnd = pipes::ChanOne<()>;
+type SignalEnd = comm::ChanOne<()>;
 // A doubly-ended queue of waiting tasks.
 #[doc(hidden)]
-struct Waitqueue { head: pipes::Port<SignalEnd>,
-                   tail: pipes::Chan<SignalEnd> }
+struct Waitqueue { head: comm::Port<SignalEnd>,
+                   tail: comm::Chan<SignalEnd> }
 
 fn new_waitqueue() -> Waitqueue {
-    let (block_head, block_tail) = pipes::stream();
+    let (block_head, block_tail) = comm::stream();
     Waitqueue { head: block_head, tail: block_tail }
 }
 
@@ -50,7 +50,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool {
     if q.head.peek() {
         // Pop and send a wakeup signal. If the waiter was killed, its port
         // will have closed. Keep trying until we get a live task.
-        if pipes::try_send_one(q.head.recv(), ()) {
+        if comm::try_send_one(q.head.recv(), ()) {
             true
         } else {
             signal_waitqueue(q)
@@ -64,7 +64,7 @@ fn signal_waitqueue(q: &Waitqueue) -> bool {
 fn broadcast_waitqueue(q: &Waitqueue) -> uint {
     let mut count = 0;
     while q.head.peek() {
-        if pipes::try_send_one(q.head.recv(), ()) {
+        if comm::try_send_one(q.head.recv(), ()) {
             count += 1;
         }
     }
@@ -107,7 +107,7 @@ fn acquire() {
                 state.count -= 1;
                 if state.count < 0 {
                     // Create waiter nobe.
-                    let (WaitEnd, SignalEnd) = pipes::oneshot();
+                    let (WaitEnd, SignalEnd) = comm::oneshot();
                     // Tell outer scope we need to block.
                     waiter_nobe = Some(WaitEnd);
                     // Enqueue ourself.
@@ -119,7 +119,7 @@ fn acquire() {
         /* for 1000.times { task::yield(); } */
         // Need to wait outside the exclusive.
         if waiter_nobe.is_some() {
-            let _ = pipes::recv_one(option::unwrap(waiter_nobe));
+            let _ = comm::recv_one(option::unwrap(waiter_nobe));
         }
     }
     fn release() {
@@ -214,7 +214,7 @@ fn wait() { self.wait_on(0) }
      */
     fn wait_on(condvar_id: uint) {
         // Create waiter nobe.
-        let (WaitEnd, SignalEnd) = pipes::oneshot();
+        let (WaitEnd, SignalEnd) = comm::oneshot();
         let mut WaitEnd   = Some(WaitEnd);
         let mut SignalEnd = Some(SignalEnd);
         let mut reacquire = None;
@@ -250,7 +250,7 @@ fn wait_on(condvar_id: uint) {
             // Unconditionally "block". (Might not actually block if a
             // signaller already sent -- I mean 'unconditionally' in contrast
             // with acquire().)
-            let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd));
+            let _ = comm::recv_one(option::swap_unwrap(&mut WaitEnd));
         }
 
         // This is needed for a failing condition variable to reacquire the
@@ -749,7 +749,7 @@ pub fn test_sem_as_mutex() {
     #[test]
     pub fn test_sem_as_cvar() {
         /* Child waits and parent signals */
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
         let s = ~semaphore(0);
         let s2 = ~s.clone();
         do task::spawn || {
@@ -761,7 +761,7 @@ pub fn test_sem_as_cvar() {
         let _ = p.recv();
 
         /* Parent waits and child signals */
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
         let s = ~semaphore(0);
         let s2 = ~s.clone();
         do task::spawn || {
@@ -778,8 +778,8 @@ pub fn test_sem_multi_resource() {
         // time, and shake hands.
         let s = ~semaphore(2);
         let s2 = ~s.clone();
-        let (p1,c1) = pipes::stream();
-        let (p2,c2) = pipes::stream();
+        let (p1,c1) = comm::stream();
+        let (p2,c2) = comm::stream();
         do task::spawn || {
             do s2.access {
                 let _ = p2.recv();
@@ -798,7 +798,7 @@ pub fn test_sem_runtime_friendly_blocking() {
         do task::spawn_sched(task::ManualThreads(1)) {
             let s = ~semaphore(1);
             let s2 = ~s.clone();
-            let (p,c) = pipes::stream();
+            let (p,c) = comm::stream();
             let child_data = ~mut Some((s2, c));
             do s.access {
                 let (s2,c) = option::swap_unwrap(child_data);
@@ -820,7 +820,7 @@ pub fn test_sem_runtime_friendly_blocking() {
     pub fn test_mutex_lock() {
         // Unsafely achieve shared state, and do the textbook
         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
         let m = ~Mutex();
         let m2 = ~m.clone();
         let mut sharedstate = ~0;
@@ -863,7 +863,7 @@ pub fn test_mutex_cond_wait() {
             cond.wait();
         }
         // Parent wakes up child
-        let (port,chan) = pipes::stream();
+        let (port,chan) = comm::stream();
         let m3 = ~m.clone();
         do task::spawn || {
             do m3.lock_cond |cond| {
@@ -886,7 +886,7 @@ pub fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
 
         for num_waiters.times {
             let mi = ~m.clone();
-            let (port, chan) = pipes::stream();
+            let (port, chan) = comm::stream();
             ports.push(port);
             do task::spawn || {
                 do mi.lock_cond |cond| {
@@ -948,7 +948,7 @@ pub fn test_mutex_killed_cond() {
         let m2 = ~m.clone();
 
         let result: result::Result<(),()> = do task::try || {
-            let (p,c) = pipes::stream();
+            let (p,c) = comm::stream();
             do task::spawn || { // linked
                 let _ = p.recv(); // wait for sibling to get in the mutex
                 task::yield();
@@ -970,12 +970,12 @@ pub fn test_mutex_killed_cond() {
     pub fn test_mutex_killed_broadcast() {
         let m = ~Mutex();
         let m2 = ~m.clone();
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
 
         let result: result::Result<(),()> = do task::try || {
             let mut sibling_convos = ~[];
             for 2.times {
-                let (p,c) = pipes::stream();
+                let (p,c) = comm::stream();
                 let c = ~mut Some(c);
                 sibling_convos.push(p);
                 let mi = ~m2.clone();
@@ -1004,7 +1004,7 @@ pub fn test_mutex_killed_broadcast() {
             assert woken == 0;
         }
         struct SendOnFailure {
-            c: pipes::Chan<()>,
+            c: comm::Chan<()>,
         }
 
         impl Drop for SendOnFailure {
@@ -1013,7 +1013,7 @@ fn finalize(&self) {
             }
         }
 
-        fn SendOnFailure(c: pipes::Chan<()>) -> SendOnFailure {
+        fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
             SendOnFailure {
                 c: c
             }
@@ -1038,7 +1038,7 @@ pub fn test_mutex_different_conds() {
         let result = do task::try {
             let m = ~mutex_with_condvars(2);
             let m2 = ~m.clone();
-            let (p,c) = pipes::stream();
+            let (p,c) = comm::stream();
             do task::spawn || {
                 do m2.lock_cond |cond| {
                     c.send(());
@@ -1099,7 +1099,7 @@ pub fn test_rwlock_exclusion(x: ~RWlock,
                                  mode2: RWlockMode) {
         // Test mutual exclusion between readers and writers. Just like the
         // mutex mutual exclusion test, a ways above.
-        let (p,c) = pipes::stream();
+        let (p,c) = comm::stream();
         let x2 = ~x.clone();
         let mut sharedstate = ~0;
         let ptr = ptr::addr_of(&(*sharedstate));
@@ -1146,8 +1146,8 @@ pub fn test_rwlock_handshake(x: ~RWlock,
                                  make_mode2_go_first: bool) {
         // Much like sem_multi_resource.
         let x2 = ~x.clone();
-        let (p1,c1) = pipes::stream();
-        let (p2,c2) = pipes::stream();
+        let (p1,c1) = comm::stream();
+        let (p2,c2) = comm::stream();
         do task::spawn || {
             if !make_mode2_go_first {
                 let _ = p2.recv(); // parent sends to us once it locks, or ...
@@ -1212,7 +1212,7 @@ pub fn test_rwlock_cond_wait() {
             cond.wait();
         }
         // Parent wakes up child
-        let (port,chan) = pipes::stream();
+        let (port,chan) = comm::stream();
         let x3 = ~x.clone();
         do task::spawn || {
             do x3.write_cond |cond| {
@@ -1249,7 +1249,7 @@ fn lock_cond(x: &RWlock, downgrade: bool, blk: fn(c: &Condvar)) {
 
         for num_waiters.times {
             let xi = ~x.clone();
-            let (port, chan) = pipes::stream();
+            let (port, chan) = comm::stream();
             ports.push(port);
             do task::spawn || {
                 do lock_cond(xi, dg1) |cond| {
index 6f479fbb9f7f583e06fa2f0c65ddf1ca0179f935..6b8ea8a6ef42c13e8382855eec923f745127ff2f 100644 (file)
@@ -12,7 +12,7 @@
 /// parallelism.
 
 use core::io;
-use core::pipes::{Chan, Port};
+use core::comm::{Chan, Port};
 use core::pipes;
 use core::prelude::*;
 use core::task::{SchedMode, SingleThreaded};
@@ -47,7 +47,7 @@ pub impl<T> TaskPool<T> {
         assert n_tasks >= 1;
 
         let channels = do vec::from_fn(n_tasks) |i| {
-            let (port, chan) = pipes::stream::<Msg<T>>();
+            let (port, chan) = comm::stream::<Msg<T>>();
             let init_fn = init_fn_factory();
 
             let task_body: ~fn() = || {
index cd03de911839014d6911076b976cd46b4ee2c32f..e14e9665216f66c2e86e414a660eea6e8511877b 100644 (file)
@@ -27,7 +27,7 @@
 use core::io::WriterUtil;
 use core::io;
 use core::libc::size_t;
-use core::pipes::{stream, Chan, Port, SharedChan};
+use core::comm::{stream, Chan, Port, SharedChan};
 use core::option;
 use core::prelude::*;
 use core::result;
@@ -794,7 +794,7 @@ mod tests {
     use test::{TestOpts, run_test};
 
     use core::either;
-    use core::pipes::{stream, SharedChan};
+    use core::comm::{stream, SharedChan};
     use core::option;
     use core::vec;
 
index 6768ff232487740a731c695b34cc7dcc21d7d33c..b711825aecf79d61632894df231184b20edab513 100644 (file)
@@ -18,7 +18,7 @@
 use core::libc;
 use core::libc::c_void;
 use core::cast::transmute;
-use core::pipes::{stream, Chan, SharedChan, Port, select2i};
+use core::comm::{stream, Chan, SharedChan, Port, select2i};
 use core::prelude::*;
 use core::ptr;
 use core;
index 872d53e93ebbe70ef03c96c4c6eee447efa9d92d..401cecf8811417a19c0f1bcfe4a012b2cf846813 100644 (file)
@@ -17,7 +17,7 @@
 
 use core::either::{Left, Right};
 use core::libc;
-use core::pipes::{Port, Chan, SharedChan, select2i};
+use core::comm::{Port, Chan, SharedChan, select2i};
 use core::private::global::{global_data_clone_create,
                             global_data_clone};
 use core::private::weak_task::weaken_task;
@@ -133,7 +133,7 @@ mod test {
     use core::task;
     use core::cast::transmute;
     use core::libc::c_void;
-    use core::pipes::{stream, SharedChan, Chan};
+    use core::comm::{stream, SharedChan, Chan};
 
     extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
         unsafe {
index be4240237adf426bd201920b211271f6ce9ec8d0..52956f152fe2bb5e073d63f67bc01e2f88d56eb8 100644 (file)
@@ -19,7 +19,7 @@
 
 use core::libc::c_void;
 use core::libc;
-use core::pipes::{stream, Port, Chan, SharedChan};
+use core::comm::{stream, Port, Chan, SharedChan};
 use core::prelude::*;
 use core::ptr::addr_of;
 use core::task::TaskBuilder;
index 96b285b8c0a85c710a843330e196be8f8ae90ed8..dd54620c83d28cde3d0ace024138aed2bc6e7e33 100644 (file)
@@ -39,7 +39,7 @@
 use core::ptr;
 use core::str;
 use core::vec;
-use core::pipes::{stream, Chan, SharedChan, Port};
+use core::comm::{stream, Chan, SharedChan, Port};
 
 // libuv struct mappings
 pub struct uv_ip4_addr {
index a06dee723c86d1dfb2df1d5b1782e8f9979b3748..8ce68a41f81bf09d446af8a15f8a75bc2b555647 100644 (file)
@@ -19,7 +19,8 @@
 use core::either::{Either, Left, Right};
 use core::io;
 use core::option;
-use core::pipes::{recv, oneshot, PortOne, send_one};
+use core::comm::{oneshot, PortOne, send_one};
+use core::pipes::recv;
 use core::prelude::*;
 use core::result;
 use core::run;
index 48bd8b03297424ee8e8ff787f26de8ad1d05d8b4..e8e4c939907404938230ea18f13ace388dfbe746 100644 (file)
@@ -78,10 +78,10 @@ fn gen_send(&self, cx: ext_ctxt, try: bool) -> @ast::item {
                 };
 
                 body += ~"let b = pipe.reuse_buffer();\n";
-                body += fmt!("let %s = ::pipes::SendPacketBuffered(\
+                body += fmt!("let %s = ::core::pipes::SendPacketBuffered(\
                               ::ptr::addr_of(&(b.buffer.data.%s)));\n",
                              sp, next.name);
-                body += fmt!("let %s = ::pipes::RecvPacketBuffered(\
+                body += fmt!("let %s = ::core::pipes::RecvPacketBuffered(\
                               ::ptr::addr_of(&(b.buffer.data.%s)));\n",
                              rp, next.name);
             }
@@ -93,7 +93,7 @@ fn gen_send(&self, cx: ext_ctxt, try: bool) -> @ast::item {
                   (recv, recv) => "(c, s)"
                 };
 
-                body += fmt!("let %s = ::pipes::entangle();\n", pat);
+                body += fmt!("let %s = ::core::pipes::entangle();\n", pat);
             }
             body += fmt!("let message = %s(%s);\n",
                          self.name(),
@@ -102,14 +102,14 @@ fn gen_send(&self, cx: ext_ctxt, try: bool) -> @ast::item {
                              ~"s"), ~", "));
 
             if !try {
-                body += fmt!("::pipes::send(pipe, message);\n");
+                body += fmt!("::core::pipes::send(pipe, message);\n");
                 // return the new channel
                 body += ~"c }";
             }
             else {
-                body += fmt!("if ::pipes::send(pipe, message) {\n \
-                                  ::pipes::rt::make_some(c) \
-                              } else { ::pipes::rt::make_none() } }");
+                body += fmt!("if ::core::pipes::send(pipe, message) {\n \
+                                  ::core::pipes::rt::make_some(c) \
+                              } else { ::core::pipes::rt::make_none() } }");
             }
 
             let body = cx.parse_expr(body);
@@ -162,14 +162,14 @@ fn gen_send(&self, cx: ext_ctxt, try: bool) -> @ast::item {
                              message_args);
 
                 if !try {
-                    body += fmt!("::pipes::send(pipe, message);\n");
+                    body += fmt!("::core::pipes::send(pipe, message);\n");
                     body += ~" }";
                 } else {
-                    body += fmt!("if ::pipes::send(pipe, message) \
+                    body += fmt!("if ::core::pipes::send(pipe, message) \
                                         { \
-                                      ::pipes::rt::make_some(()) \
+                                      ::core::pipes::rt::make_some(()) \
                                   } else { \
-                                    ::pipes::rt::make_none() \
+                                    ::core::pipes::rt::make_none() \
                                   } }");
                 }
 
@@ -272,7 +272,8 @@ fn to_endpoint_decls(&self, cx: ext_ctxt,
                     self.data_name(),
                     self.span,
                     cx.ty_path_ast_builder(
-                        path_global(~[cx.ident_of(~"pipes"),
+                        path_global(~[cx.ident_of(~"core"),
+                                      cx.ident_of(~"pipes"),
                                       cx.ident_of(dir.to_str() + ~"Packet")],
                              dummy_sp())
                         .add_ty(cx.ty_path_ast_builder(
@@ -288,7 +289,8 @@ fn to_endpoint_decls(&self, cx: ext_ctxt,
                     self.data_name(),
                     self.span,
                     cx.ty_path_ast_builder(
-                        path_global(~[cx.ident_of(~"pipes"),
+                        path_global(~[cx.ident_of(~"core"),
+                                      cx.ident_of(~"pipes"),
                                       cx.ident_of(dir.to_str()
                                                   + ~"PacketBuffered")],
                              dummy_sp())
@@ -313,10 +315,10 @@ fn gen_init(&self, cx: ext_ctxt) -> @ast::item {
 
         let body = if !self.is_bounded() {
             match start_state.dir {
-              send => quote_expr!( ::pipes::entangle() ),
+              send => quote_expr!( ::core::pipes::entangle() ),
               recv => {
                 quote_expr!({
-                    let (s, c) = ::pipes::entangle();
+                    let (s, c) = ::core::pipes::entangle();
                     (c, s)
                 })
               }
@@ -336,7 +338,7 @@ fn gen_init(&self, cx: ext_ctxt) -> @ast::item {
         };
 
         cx.parse_item(fmt!("pub fn init%s() -> (client::%s, server::%s)\
-                            { use pipes::HasBuffer; %s }",
+                            { use core::pipes::HasBuffer; %s }",
                            start_state.ty_params.to_source(cx),
                            start_state.to_ty(cx).to_source(cx),
                            start_state.to_ty(cx).to_source(cx),
@@ -350,7 +352,7 @@ fn gen_buffer_init(&self, ext_cx: ext_ctxt) -> @ast::expr {
             let fty = s.to_ty(ext_cx);
             ext_cx.field_imm(ext_cx.ident_of(s.name),
                              quote_expr!(
-                                 ::pipes::mk_packet::<$fty>()
+                                 ::core::pipes::mk_packet::<$fty>()
                              ))
         }))
     }
@@ -358,8 +360,8 @@ fn gen_buffer_init(&self, ext_cx: ext_ctxt) -> @ast::expr {
     fn gen_init_bounded(&self, ext_cx: ext_ctxt) -> @ast::expr {
         debug!("gen_init_bounded");
         let buffer_fields = self.gen_buffer_init(ext_cx);
-        let buffer = quote_expr!(~::pipes::Buffer {
-            header: ::pipes::BufferHeader(),
+        let buffer = quote_expr!(~::core::pipes::Buffer {
+            header: ::core::pipes::BufferHeader(),
             data: $buffer_fields,
         });
 
@@ -375,7 +377,7 @@ fn gen_init_bounded(&self, ext_cx: ext_ctxt) -> @ast::expr {
 
         quote_expr!({
             let buffer = $buffer;
-            do ::pipes::entangle_buffer(buffer) |buffer, data| {
+            do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
                 $entangle_body
             }
         })
@@ -408,7 +410,7 @@ fn gen_buffer_type(&self, cx: ext_ctxt) -> @ast::item {
                 }
             }
             let ty = s.to_ty(cx);
-            let fty = quote_ty!( ::pipes::Packet<$ty> );
+            let fty = quote_ty!( ::core::pipes::Packet<$ty> );
 
             @spanned {
                 node: ast::struct_field_ {
index 8038c5fc41ac4c800b8aecd02ccf3be72c8cff71..f2749ed1d0c059607a6a34953a171590111f5c07 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 pub fn foo<T:Owned + Copy>(x: T) -> Port<T> {
     let (p, c) = stream();
index bfbc7ecd20ac692e1efa907f4f13d160ba23185a..4bbd22786a563f12082a23886c636459d8c0b1b0 100644 (file)
@@ -24,7 +24,7 @@
 use io::Writer;
 use io::WriterUtil;
 
-use pipes::{Port, Chan, SharedChan};
+use comm::{Port, Chan, SharedChan};
 
 macro_rules! move_out (
     { $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
@@ -36,7 +36,7 @@ enum request {
     stop
 }
 
-fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
+fn server(requests: Port<request>, responses: comm::Chan<uint>) {
     let mut count = 0u;
     let mut done = false;
     while !done {
@@ -55,8 +55,8 @@ fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
 }
 
 fn run(args: &[~str]) {
-    let (from_child, to_parent) = pipes::stream();
-    let (from_parent, to_child) = pipes::stream();
+    let (from_child, to_parent) = comm::stream();
+    let (from_parent, to_child) = comm::stream();
 
     let to_child = SharedChan(to_child);
 
index 57d9bb49df25ab03bb48a5066f10c2e02a4e9af9..a969368ebaca3a88f0ac41d7a3c7f58bb6513842 100644 (file)
@@ -20,7 +20,7 @@
 use io::Writer;
 use io::WriterUtil;
 
-use pipes::{Port, PortSet, Chan};
+use comm::{Port, PortSet, Chan, stream};
 
 macro_rules! move_out (
     { $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
@@ -32,7 +32,7 @@ enum request {
     stop
 }
 
-fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
+fn server(requests: PortSet<request>, responses: Chan<uint>) {
     let mut count = 0;
     let mut done = false;
     while !done {
@@ -51,8 +51,8 @@ fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
 }
 
 fn run(args: &[~str]) {
-    let (from_child, to_parent) = pipes::stream();
-    let (from_parent_, to_child) = pipes::stream();
+    let (from_child, to_parent) = stream();
+    let (from_parent_, to_child) = stream();
     let from_parent = PortSet();
     from_parent.add(from_parent_);
 
@@ -62,7 +62,7 @@ fn run(args: &[~str]) {
     let start = std::time::precise_time_s();
     let mut worker_results = ~[];
     for uint::range(0, workers) |_i| {
-        let (from_parent_, to_child) = pipes::stream();
+        let (from_parent_, to_child) = stream();
         from_parent.add(from_parent_);
         do task::task().future_result(|+r| {
             worker_results.push(r);
index 71ca0f957657506ad3aae844712715234febfe0a..0f7c41f5997a8372ae5a70f0d38496be6d81cc46 100644 (file)
@@ -20,7 +20,7 @@
 use std::time;
 use std::future;
 
-use pipes::recv;
+use core::pipes::recv;
 
 proto! ring (
     num:send {
index 16f44b88793c80f1de5b08411d6a1fe4b98cf4af..a444dfd26b7d6b87cbc0bf032fa6e648f0eed55d 100644 (file)
@@ -16,7 +16,7 @@
  
 extern mod std;
 
-use pipes::{spawn_service, recv};
+use core::pipes::{spawn_service, recv};
 use std::time::precise_time_s;
 
 proto! pingpong (
@@ -72,9 +72,9 @@ macro_rules! follow (
     )
 )
 
-fn switch<T:Owned,Tb:Owned,U>(+endp: pipes::RecvPacketBuffered<T, Tb>,
+fn switch<T:Owned,Tb:Owned,U>(+endp: core::pipes::RecvPacketBuffered<T, Tb>,
                       f: fn(+v: Option<T>) -> U) -> U {
-    f(pipes::try_recv(endp))
+    f(core::pipes::try_recv(endp))
 }
 
 // Here's the benchmark
index b42ec246ccb975f935c29fc624ac695eaf1e5272..42a1e4b504660f24053f79a32c879878795b503f 100644 (file)
@@ -15,7 +15,7 @@
 use std::oldmap::HashMap;
 use std::sort;
 use std::cell::Cell;
-use core::pipes::*;
+use core::comm::*;
 
 fn print_complements() {
     let all = ~[Blue, Red, Yellow];
index a887a13bf3802b39106aaa34086900d00a6c1eb6..3fe5f7057057d33814b6be9a7d0c78ff4775c0d8 100644 (file)
@@ -18,7 +18,7 @@
 use std::oldmap::HashMap;
 use std::sort;
 use io::ReaderUtil;
-use pipes::{stream, Port, Chan};
+use comm::{stream, Port, Chan};
 use cmp::Ord;
 
 // given a map, print a sorted version of it
@@ -97,8 +97,8 @@ fn windows_with_carry(bb: &[u8], nn: uint,
    return vec::slice(bb, len - (nn - 1u), len).to_vec();
 }
 
-fn make_sequence_processor(sz: uint, from_parent: pipes::Port<~[u8]>,
-                           to_parent: pipes::Chan<~str>) {
+fn make_sequence_processor(sz: uint, from_parent: comm::Port<~[u8]>,
+                           to_parent: comm::Chan<~str>) {
 
    let freqs: HashMap<~[u8], uint> = oldmap::HashMap();
    let mut carry: ~[u8] = ~[];
@@ -159,7 +159,7 @@ fn main() {
 
         from_child.push(from_child_);
 
-        let (from_parent, to_child) = pipes::stream();
+        let (from_parent, to_child) = comm::stream();
 
         do task::spawn_with(from_parent) |from_parent| {
             make_sequence_processor(sz, from_parent, to_parent_);
index 840cec44c6440d7874d62040f8b9be19871d9ff2..5e472712fda4334134fda8ec27f52bacfd09ac24 100644 (file)
@@ -108,7 +108,7 @@ fn flush(&self) -> int {0}
     fn get_type(&self) -> io::WriterType { io::File }
 }
 
-fn writer(path: ~str, pport: pipes::Port<Line>, size: uint)
+fn writer(path: ~str, pport: comm::Port<Line>, size: uint)
 {
     let cout: io::Writer = match path {
         ~"" => {
@@ -172,8 +172,8 @@ fn main() {
     let size = if vec::len(args) < 2_u { 80_u }
     else { uint::from_str(args[1]).get() };
 
-    let (pport, pchan) = pipes::stream();
-    let pchan = pipes::SharedChan(pchan);
+    let (pport, pchan) = comm::stream();
+    let pchan = comm::SharedChan(pchan);
     for uint::range(0_u, size) |j| {
         let cchan = pchan.clone();
         do task::spawn { cchan.send(chanmb(j, size, depth)) };
index 2c9da65cc13accd2f57d41e79d7052d50362b5e1..a8383c4647ec1d50ef81d48f6c81a70906f62563 100644 (file)
 extern mod std;
 
 use std::{time, getopts};
-use io::WriterUtil;
-use int::range;
-use pipes::Port;
-use pipes::Chan;
-use pipes::send;
-use pipes::recv;
+use core::int::range;
+use core::comm::*;
+use core::io::WriterUtil;
 
 use core::result;
 use result::{Ok, Err};
@@ -41,7 +38,7 @@ fn pfib(c: Chan<int>, n: int) {
         } else if n <= 2 {
             c.send(1);
         } else {
-            let p = pipes::PortSet();
+            let p = PortSet();
             let ch = p.chan();
             task::spawn(|| pfib(ch, n - 1) );
             let ch = p.chan();
@@ -50,7 +47,7 @@ fn pfib(c: Chan<int>, n: int) {
         }
     }
 
-    let (p, ch) = pipes::stream();
+    let (p, ch) = stream();
     let _t = task::spawn(|| pfib(ch, n) );
     p.recv()
 }
index f2441755a7b1dab0f1f78ab72bdf21e9f64cd7b7..528dfd3ec734159260fdf036e5ba25f20b6a17fe 100644 (file)
@@ -15,7 +15,7 @@
 //
 // The filename is a song reference; google it in quotes.
 
-fn child_generation(gens_left: uint, -c: pipes::Chan<()>) {
+fn child_generation(gens_left: uint, -c: comm::Chan<()>) {
     // This used to be O(n^2) in the number of generations that ever existed.
     // With this code, only as many generations are alive at a time as tasks
     // alive at a time,
@@ -43,7 +43,7 @@ fn main() {
         copy args
     };
 
-    let (p,c) = pipes::stream();
+    let (p,c) = comm::stream();
     child_generation(uint::from_str(args[1]).get(), c);
     if p.try_recv().is_none() {
         fail!(~"it happened when we slumbered");
index 3b6ececaef90a5d9953cc37e1e931eade0a07858..8bb4c9bc592349cb822106c7c766ed6d5906d87e 100644 (file)
@@ -20,7 +20,7 @@
 // Creates in the background 'num_tasks' tasks, all blocked forever.
 // Doesn't return until all such tasks are ready, but doesn't block forever itself.
 
-use core::pipes::*;
+use core::comm::*;
 
 fn grandchild_group(num_tasks: uint) {
     let (po, ch) = stream();
index c5092ecaecc0022a4c8424e73d015f0a0a1521bb..8e1cbb9e17bdd90a1026c63279b541ae5f18a3dc 100644 (file)
@@ -10,7 +10,7 @@
 
 // Test for concurrent tasks
 
-use core::pipes::*;
+use core::comm::*;
 
 fn calc(children: uint, parent_wait_chan: &Chan<Chan<Chan<int>>>) {
 
index 40a444df12d368b162f370d26edb7259af85d5fb..d428feb2a24de7ff799baab87da11ffddf65096f 100644 (file)
@@ -9,7 +9,7 @@
 // except according to those terms.
 
 fn main() {
-    let (p,c) = pipes::stream();
+    let (p,c) = comm::stream();
     let x = Some(p);
     c.send(false);
     match x {
index 8e9ce5f97fc1da0ecc59c3c1026118c051f6f0d1..3eebc4647c28be86d9e0474a0be42d136a3ab2e1 100644 (file)
@@ -25,6 +25,6 @@ fn foo(i:int, j: @~str) -> foo {
 
 fn main() {
   let cat = ~"kitty";
-    let (_, ch) = pipes::stream(); //~ ERROR does not fulfill `Owned`
+    let (_, ch) = comm::stream(); //~ ERROR does not fulfill `Owned`
   ch.send(foo(42, @(cat))); //~ ERROR does not fulfill `Owned`
 }
index d592fb80f7682786cb2bf7eccc02e305b500eef8..e8bb075ac00feabe8a9bb76bc28a5bf04b9149e0 100644 (file)
@@ -16,7 +16,7 @@
 fn child() { assert (1 == 2); }
 
 fn main() {
-    let (p, _c) = pipes::stream::<int>();
+    let (p, _c) = comm::stream::<int>();
     task::spawn(|| child() );
     let x = p.recv();
 }
index 1402020c357ec71088102264d8f750404da63664..9f09c16ed6a571054688a9799dad9cef13f1cf78 100644 (file)
@@ -15,7 +15,7 @@
 fn child() { fail!(); }
 
 fn main() {
-    let (p, _c) = pipes::stream::<()>();
+    let (p, _c) = comm::stream::<()>();
     task::spawn(|| child() );
     task::yield();
 }
index cb03a71aabcdd89c688dda978efd00576cd93d3c..c2c97662b6c965b73347ba0cf04159f6327934c4 100644 (file)
 fn grandchild() { fail!(~"grandchild dies"); }
 
 fn child() {
-    let (p, _c) = pipes::stream::<int>();
+    let (p, _c) = comm::stream::<int>();
     task::spawn(|| grandchild() );
     let x = p.recv();
 }
 
 fn main() {
-    let (p, _c) = pipes::stream::<int>();
+    let (p, _c) = comm::stream::<int>();
     task::spawn(|| child() );
     let x = p.recv();
 }
index 18d6b3c369bc110f65d738bdae259087b1a87905..97e4edc81bccd921d73ad312d4e8521022d9d38b 100644 (file)
@@ -14,7 +14,7 @@
 fn child() { assert (1 == 2); }
 
 fn parent() {
-    let (p, _c) = pipes::stream::<int>();
+    let (p, _c) = comm::stream::<int>();
     task::spawn(|| child() );
     let x = p.recv();
 }
@@ -22,7 +22,7 @@ fn parent() {
 // This task is not linked to the failure chain, but since the other
 // tasks are going to fail the kernel, this one will fail too
 fn sleeper() {
-    let (p, _c) = pipes::stream::<int>();
+    let (p, _c) = comm::stream::<int>();
     let x = p.recv();
 }
 
index bd866b9f9e7c77eab48efba7e94a9143b83a23b5..a0896ea7babbd2a302ccafca956ed35901871a95 100644 (file)
@@ -17,7 +17,7 @@ fn goodfail() {
 
 fn main() {
     task::spawn(|| goodfail() );
-    let (po, _c) = pipes::stream();
+    let (po, _c) = comm::stream();
     // We shouldn't be able to get past this recv since there's no
     // message available
     let i: int = po.recv();
index b77e91c8d212c65ea828faf17ff63b3b366fefcf..99d8fab4bba501ac24dd9bdf8284c3b574addbf6 100644 (file)
@@ -24,7 +24,7 @@
 // course preferable, as the value itself is
 // irrelevant).
 
-use core::pipes::*;
+use core::comm::*;
 
 fn foo(&&x: ()) -> Port<()> {
     let (p, c) = stream::<()>();
index 1af0bb003f2bbcf83f6fe51b30f867a1cd51f09b..da467ae7ba596310d8edcda9ca458cdf911713cc 100644 (file)
@@ -9,7 +9,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 pub fn main() {
     let (p, ch) = stream();
index 6a12704d4b4b52d62017cc27ec50c5ac2a528ce9..b90633bab01ef9b6ab7881c603655307cb1d7e47 100644 (file)
 
 use std::oldmap;
 use std::oldmap::HashMap;
-use core::pipes::*;
+use core::comm::*;
 
 pub fn map(filename: ~str, emit: map_reduce::putter) { emit(filename, ~"1"); }
 
 mod map_reduce {
     use std::oldmap;
     use std::oldmap::HashMap;
-    use core::pipes::*;
+    use core::comm::*;
 
     pub type putter = fn@(~str, ~str);
 
index d40bf6e32cf87a7aa2f682c0f3b09c8707c8529c..e0ec62ff255aeb4fe874204ca5d7b7d17ce40c23 100644 (file)
 // xfail-fast
 
 pub fn main() {
-    let (p,c) = pipes::stream();
+    let (p,c) = comm::stream();
     do task::try || {
-        let (p2,c2) = pipes::stream();
+        let (p2,c2) = comm::stream();
         do task::spawn || {
             p2.recv();
             error!("sibling fails");
             fail!();
         }   
-        let (p3,c3) = pipes::stream();
+        let (p3,c3) = comm::stream();
         c.send(c3);
         c2.send(());
         error!("child blocks");
index fac73d07b6642188bbffd4637b49cfa526754958..e441fa22b3150d74d420fde6cc7e761d57d252ce 100644 (file)
 
 // xfail-fast
 
-use pipes::{Select2, Selectable};
+use comm::{Select2, Selectable};
 
 pub fn main() {
-    let (p,c) = pipes::stream();
+    let (p,c) = comm::stream();
     do task::try || {
-        let (p2,c2) = pipes::stream();
+        let (p2,c2) = comm::stream();
         do task::spawn || {
             p2.recv();
             error!("sibling fails");
             fail!();
         }   
-        let (p3,c3) = pipes::stream();
+        let (p3,c3) = comm::stream();
         c.send(c3);
         c2.send(());
         error!("child blocks");
-        let (p, c) = pipes::stream();
+        let (p, c) = comm::stream();
         (p, p3).select();
         c.send(());
     };  
index 6eb540c4737267fc0203bdebf24f9b3c363f7216..83703dacc36e3abf8d0f4abf4f0ed4c4c53b9ba7 100644 (file)
@@ -1,6 +1,6 @@
 extern mod std;
 
-use pipes::Chan;
+use comm::Chan;
 
 type RingBuffer = ~[float];
 type SamplesFn = fn~ (samples: &RingBuffer);
index 9971b098bb0f580a0a0f7728e427b56002b6c9c8..017d90cbcd736a112a841aff8c99fc5d6a0f4cb0 100644 (file)
@@ -1,4 +1,4 @@
-use core::pipes::*;
+use core::comm::*;
 
 fn producer(c: &Chan<~[u8]>) {
     c.send(
index 0016d792c0d0f14f50e499b839b88c14da713e48..b74c70d3ea71680301101c208304a550661f3c0d 100644 (file)
@@ -15,7 +15,8 @@
 //
 // http://theincredibleholk.wordpress.com/2012/07/06/rusty-pipes/
 
-use pipes::try_recv;
+use core::pipes;
+use core::pipes::try_recv;
 
 pub type username = ~str;
 pub type password = ~str;
index 2d765423988419035d70d81ea44f581c9dcfdc0c..c09fbd19fdcd2365c5c0f6bf3c50adde18526ec8 100644 (file)
@@ -20,7 +20,8 @@
 use std::timer::sleep;
 use std::uv;
 
-use pipes::{try_recv, recv};
+use core::pipes;
+use core::pipes::{try_recv, recv};
 
 proto! oneshot (
     waiting:send {
index baa5ba5bf00a82d7b62414511caac0b24d8938ad..dc03eab22a8282fca95bddb18385733c75cc2d3c 100644 (file)
@@ -15,6 +15,7 @@
 extern mod std;
 use std::timer::sleep;
 use std::uv;
+use core::pipes;
 
 proto! oneshot (
     waiting:send {
index b13b262e8645e85fff0d33254ce6dd0f3f6e585d..4b2ac40dc4a70cbad4bf650ece470268259a138e 100644 (file)
@@ -20,6 +20,7 @@
 #[legacy_records];
 
 mod pingpong {
+    use core::pipes;
     use core::pipes::*;
     use core::ptr;
 
@@ -45,6 +46,7 @@ pub fn init() -> (client::ping, server::ping) {
     pub enum ping = server::pong;
     pub enum pong = client::ping;
     pub mod client {
+        use core::pipes;
         use core::pipes::*;
         use core::ptr;
 
@@ -54,7 +56,7 @@ pub fn ping(+pipe: ping) -> pong {
                 let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
                 let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
                 let message = ::pingpong::ping(s);
-                ::pipes::send(pipe, message);
+                send(pipe, message);
                 c
             }
         }
@@ -64,6 +66,7 @@ pub fn ping(+pipe: ping) -> pong {
                                                   ::pingpong::packets>;
     }
     pub mod server {
+        use core::pipes;
         use core::pipes::*;
         use core::ptr;
 
@@ -75,7 +78,7 @@ pub fn pong(+pipe: pong) -> ping {
                 let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
                 let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
                 let message = ::pingpong::pong(s);
-                ::pipes::send(pipe, message);
+                send(pipe, message);
                 c
             }
         }
@@ -85,7 +88,7 @@ pub fn pong(+pipe: pong) -> ping {
 }
 
 mod test {
-    use pipes::recv;
+    use core::pipes::recv;
     use pingpong::{ping, pong};
 
     pub fn client(-chan: ::pingpong::client::ping) {
index 7ac6337d1caa130d4ea60c127a340e164a9106d8..423e4782333ed8cf8e666c70fca5e6ffb1aab953 100644 (file)
@@ -34,7 +34,7 @@ macro_rules! select_if (
         ], )*
     } => {
         if $index == $count {
-            match pipes::try_recv($port) {
+            match core::pipes::try_recv($port) {
               $(Some($message($($($x,)+)* next)) => {
                 let $next = next;
                 $e
@@ -68,7 +68,7 @@ macro_rules! select (
               -> $next:ident $e:expr),+
         } )+
     } => ({
-        let index = pipes::selecti([$(($port).header()),+]);
+        let index = core::comm::selecti([$(($port).header()),+]);
         select_if!(index, 0, $( $port => [
             $($message$(($($x),+))dont_type_this* -> $next $e),+
         ], )+)
index 0bf739139cfe18716c1d3b288696a6008ca2ad8f..0e170cbe9958697d8f2c1448359a24a0f4975799 100644 (file)
@@ -19,7 +19,8 @@
 use std::timer::sleep;
 use std::uv;
 
-use pipes::{recv, select};
+use core::pipes;
+use core::pipes::{recv, select};
 
 proto! oneshot (
     waiting:send {
index 521c400489e95620eac83d53edfee3fc37ef5b06..51980eeea94ba935dcf43b4cf6bf5794bf0577db 100644 (file)
@@ -15,7 +15,8 @@
 extern mod std;
 use std::timer::sleep;
 use std::uv;
-use pipes::recv;
+use core::pipes;
+use core::pipes::recv;
 
 proto! oneshot (
     waiting:send {
index cefed420546eab56909af6b1e297421e1a6eeca7..ca37a6663fd69711dfa25b31b70d32a0b38be532 100644 (file)
@@ -10,7 +10,7 @@
 
 // Tests of the runtime's scheduler interface
 
-use core::pipes::*;
+use core::comm::*;
 
 type sched_id = int;
 type task_id = *libc::c_void;
index f1b9c85a0ff1a3c08cceaf9df33e9ceb6c25ba63..18f4fd27858ba8bc9752926bc1a9adbde4b2ffd3 100644 (file)
@@ -17,7 +17,7 @@ fn die() {
 
 fn iloop() {
     task::spawn(|| die() );
-    let (p, c) = core::pipes::stream::<()>();
+    let (p, c) = comm::stream::<()>();
     loop {
         // Sending and receiving here because these actions yield,
         // at which point our child can kill us.
index ac910232c16bfea67a8910d1bd9f9ba7a843974f..6bda62be621d6f2bc342e25e891d9f6d501a75d1 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 struct test {
   f: int,
index 77f0614036941b646be7bb0383db3fa0e27b8e7a..0f924df8dc006e53a85a61fc5ea6d5c999bd7dee 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 // tests that ctrl's type gets inferred properly
 type command<K, V> = {key: K, val: V};
index 19169c168c2cf2724c7083cac47f0720ae39fb8d..8ef0173dbd32758141af9871ffdbb4dd3e0b873f 100644 (file)
@@ -23,6 +23,6 @@ fn foo(i:int, j: char) -> foo {
 }
 
 pub fn main() {
-    let (_po, ch) = pipes::stream();
+    let (_po, ch) = comm::stream();
     ch.send(foo(42, 'c'));
 }
index 6090f2eb71cd7e57174d730c15c93b8021d76ec0..4111b50549055e394b11e77c4443dd0f67381305 100644 (file)
@@ -14,7 +14,7 @@
   Arnold.
  */
 
-use core::pipes::*;
+use core::comm::*;
 
 type ctx = Chan<int>;
 
index aa8a2a9146cbc6466fcfcace5a25e4b60ecec766..f260e571b42ac66edb7c14a2fd216777eff3b96f 100644 (file)
@@ -13,8 +13,8 @@
 
 extern mod std;
 
-use pipes::Chan;
-use pipes::Port;
+use comm::Chan;
+use comm::Port;
 
 pub fn main() { test05(); }
 
@@ -28,7 +28,7 @@ fn test05_start(ch : Chan<int>) {
 }
 
 fn test05() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     task::spawn(|| test05_start(ch) );
     let mut value = po.recv();
     log(error, value);
index 289a728efc39a535a1ca0d1f99032905177fb340..379b9acf6f554edfb7b4e72b82202747ca85ad03 100644 (file)
@@ -13,8 +13,8 @@
 
 extern mod std;
 
-fn start(c: pipes::Chan<pipes::Chan<~str>>) {
-    let (p, ch) = pipes::stream();
+fn start(c: comm::Chan<comm::Chan<~str>>) {
+    let (p, ch) = comm::stream();
     c.send(ch);
 
     let mut a;
@@ -28,7 +28,7 @@ fn start(c: pipes::Chan<pipes::Chan<~str>>) {
 }
 
 pub fn main() {
-    let (p, ch) = pipes::stream();
+    let (p, ch) = comm::stream();
     let child = task::spawn(|| start(ch) );
 
     let c = p.recv();
index 996566abcd89fd49b257a0eb673fb54ef604148e..3e3eefd26ba7c2b9213a6126a9e19a639f9906f9 100644 (file)
 
 extern mod std;
 
-fn start(c: pipes::Chan<pipes::Chan<int>>) {
-    let (p, ch) = pipes::stream();
+fn start(c: comm::Chan<comm::Chan<int>>) {
+    let (p, ch) = comm::stream();
     c.send(ch);
 }
 
 pub fn main() {
-    let (p, ch) = pipes::stream();
+    let (p, ch) = comm::stream();
     let child = task::spawn(|| start(ch) );
     let c = p.recv();
 }
index 4ee23ec54d6cb9c1a71409eb9dceb01f674ea502..a246f1f4af2b187a9d6bb60e6b9dbe630cbd2aec 100644 (file)
 #[legacy_modes];
 
 extern mod std;
-use pipes::send;
 
-fn start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
+fn start(c: comm::Chan<int>, start: int, number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(start + i); i += 1; }
 }
 
 pub fn main() {
     debug!("Check that we don't deadlock.");
-    let (p, ch) = pipes::stream();
+    let (p, ch) = comm::stream();
     task::try(|| start(ch, 0, 10) );
     debug!("Joined task");
 }
index f32fbdd04e62ead75f8ffe8da6bf9e67645b0636..c5179652fdc89c51ddf3c4ae9bfe192dc891af1e 100644 (file)
 #[legacy_modes];
 
 pub fn main() {
-    let po = pipes::PortSet();
+    let po = comm::PortSet();
 
     // Spawn 10 tasks each sending us back one int.
     let mut i = 10;
     while (i > 0) {
         log(debug, i);
-        let (p, ch) = pipes::stream();
+        let (p, ch) = comm::stream();
         po.add(p);
         task::spawn({let i = i; || child(i, ch)});
         i = i - 1;
@@ -37,7 +37,7 @@ pub fn main() {
     debug!("main thread exiting");
 }
 
-fn child(x: int, ch: pipes::Chan<int>) {
+fn child(x: int, ch: comm::Chan<int>) {
     log(debug, x);
     ch.send(x);
 }
index 957066005acda898cb5d558e621486412fa4ffb5..525cafef169ef729b1706b6ce09f8b95c843a4d3 100644 (file)
@@ -14,7 +14,7 @@
 
 extern mod std;
 
-fn start(c: pipes::Chan<int>, i0: int) {
+fn start(c: comm::Chan<int>, i0: int) {
     let mut i = i0;
     while i > 0 {
         c.send(0);
@@ -27,7 +27,7 @@ pub fn main() {
     // is likely to terminate before the child completes, so from
     // the child's point of view the receiver may die. We should
     // drop messages on the floor in this case, and not crash!
-    let (p, ch) = pipes::stream();
+    let (p, ch) = comm::stream();
     task::spawn(|| start(ch, 10));
     p.recv();
 }
index 648a54d190fa408b0b294f95632510943ce15558..e2ac5623db3d965a8a0fa9112fc4f165ef100777 100644 (file)
 // except according to those terms.
 
 
-use pipes::send;
-use pipes::Port;
-use pipes::recv;
-use pipes::Chan;
-
 // Tests of ports and channels on various types
 fn test_rec() {
     struct R {val0: int, val1: u8, val2: char}
 
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     let r0: R = R {val0: 0, val1: 1u8, val2: '2'};
     ch.send(r0);
     let mut r1: R;
@@ -30,7 +25,7 @@ struct R {val0: int, val1: u8, val2: char}
 }
 
 fn test_vec() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     let v0: ~[int] = ~[0, 1, 2];
     ch.send(v0);
     let v1 = po.recv();
@@ -40,7 +35,7 @@ fn test_vec() {
 }
 
 fn test_str() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     let s0 = ~"test";
     ch.send(s0);
     let s1 = po.recv();
@@ -84,7 +79,7 @@ impl cmp::Eq for t {
 }
 
 fn test_tag() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     ch.send(tag1);
     ch.send(tag2(10));
     ch.send(tag3(10, 11u8, 'A'));
@@ -98,8 +93,8 @@ fn test_tag() {
 }
 
 fn test_chan() {
-    let (po, ch) = pipes::stream();
-    let (po0, ch0) = pipes::stream();
+    let (po, ch) = comm::stream();
+    let (po0, ch0) = comm::stream();
     ch.send(ch0);
     let ch1 = po.recv();
     // Does the transmitted channel still work?
index 372a0ea434b58b326a1fb2a0eb07b39df6f28d23..9bbe20c2e1332311d65257e847b4e35fa36558dd 100644 (file)
@@ -12,9 +12,7 @@
 #[legacy_modes];
 
 extern mod std;
-use pipes::Chan;
-use pipes::send;
-use pipes::recv;
+use core::comm::Chan;
 
 pub fn main() { debug!("===== WITHOUT THREADS ====="); test00(); }
 
@@ -35,7 +33,7 @@ fn test00() {
 
     debug!("Creating tasks");
 
-    let po = pipes::PortSet();
+    let po = comm::PortSet();
 
     let mut i: int = 0;
 
index d7997d932482589c7d059daeef9cc0a51cecf2a5..dc4dc27229c56bf563c5a5f4c97c7118f6bca7d1 100644 (file)
@@ -8,14 +8,12 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use pipes::send;
-
 pub fn main() { test00(); }
 
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let (p, c) = pipes::stream();
+    let (p, c) = comm::stream();
     c.send(1);
     c.send(2);
     c.send(3);
index f8f19d804c8ed4c6fb30fc578a0c76239ff27051..0256c1cbb8754b124487a8d3ef21d65fb57a08f3 100644 (file)
@@ -15,7 +15,7 @@
 fn test00() {
     let r: int = 0;
     let mut sum: int = 0;
-    let (p, c) = pipes::stream();
+    let (p, c) = comm::stream();
     let number_of_messages: int = 1000;
     let mut i: int = 0;
     while i < number_of_messages { c.send(i + 0); i += 1; }
index 5d19075a71ea040d015bda1aca4e288982463b9d..c18090ea45f540a8e720042691fea18bfdf22096 100644 (file)
@@ -8,16 +8,14 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use pipes::send;
-use pipes::Chan;
-use pipes::recv;
+use core::comm::Chan;
 
 pub fn main() { test00(); }
 
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let p = pipes::PortSet();
+    let p = comm::PortSet();
     let c0 = p.chan();
     let c1 = p.chan();
     let c2 = p.chan();
index 481df2d1d529b4a1c94dfe1b38c6f5ef15e0bf01..21eb93e8d09b9a40f02d066dbef5822271f3e738 100644 (file)
@@ -15,7 +15,7 @@
 
 pub fn main() { test00(); }
 
-fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
+fn test00_start(c: comm::Chan<int>, start: int, number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(start + i); i += 1; }
 }
@@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
 fn test00() {
     let mut r: int = 0;
     let mut sum: int = 0;
-    let p = pipes::PortSet();
+    let p = comm::PortSet();
     let number_of_messages: int = 10;
 
     let c = p.chan();
index d3ed48f7575239fc67ee9cb63b648ba9a34318a4..75fcd12c312d0712773ad9c051b84d74919d2958 100644 (file)
@@ -15,7 +15,7 @@
 
 pub fn main() { test00(); }
 
-fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
+fn test00_start(c: comm::Chan<int>, number_of_messages: int) {
     let mut i: int = 0;
     while i < number_of_messages { c.send(i + 0); i += 1; }
 }
@@ -23,7 +23,7 @@ fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
 fn test00() {
     let r: int = 0;
     let mut sum: int = 0;
-    let p = pipes::PortSet();
+    let p = comm::PortSet();
     let number_of_messages: int = 10;
     let ch = p.chan();
 
index cb62e2f87ee9aba502b892377a38317439ec419d..db2ad2de61b9577d9e0c370719a2006103f15618 100644 (file)
@@ -16,7 +16,7 @@
 // any size, but rustc currently can because they do have size. Whether
 // or not this is desirable I don't know, but here's a regression test.
 pub fn main() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     ch.send(());
     let n: () = po.recv();
     assert (n == ());
index ca60dfd3de009f6c26a2278ef43fd4476478e3ac..b90c39ab34e503654a919591c864883165bc20ed 100644 (file)
@@ -13,7 +13,7 @@
 // A port of task-killjoin to use a class with a dtor to manage
 // the join.
 
-use core::pipes::*;
+use core::comm::*;
 
 struct notify {
     ch: Chan<bool>, v: @mut bool,
index d9b06627c80b77fe3affa182c81a223f77279c03..805f8e8b1e24cae0aa3d85975ed3dcbd9c457efe 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 pub fn main() {
     let (p, ch) = stream::<uint>();
index 21524d3fc5414275139bb01ad76d1f096d51ad93..7800ebd7310ca84da6f6d73b96f82424436ff6c8 100644 (file)
@@ -8,14 +8,12 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use pipes::{Port, Chan};
-
 /*
   This is about the simplest program that can successfully send a
   message.
  */
 pub fn main() {
-    let (po, ch) = pipes::stream();
+    let (po, ch) = comm::stream();
     ch.send(42);
     let r = po.recv();
     log(error, r);
index a5398e7407b376391cc261ae74c86fbef49e8a25..7be6907a0c730ef630fec9338f239c350888ae4c 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 fn child(c: &SharedChan<~uint>, i: uint) {
     c.send(~i);
index 57b345c2d25eabd28de629b67f2aabf4f6c55b87..75fc71441f8f3116ed7bca23ed5d78cbc9d386df 100644 (file)
@@ -8,7 +8,7 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
-use core::pipes::*;
+use core::comm::*;
 
 pub fn main() {
     let (p, c) = stream();
index 93f1c7b5b45090a0f9a09d835bc88309883f5c9d..2693a8d39423e6a4d32bdece4d6a34c58c3d4899 100644 (file)
@@ -11,7 +11,7 @@
 // xfail-win32
 extern mod std;
 
-use core::pipes::*;
+use core::comm::*;
 
 struct complainer {
   c: SharedChan<bool>,