~~~~
use task::spawn;
-use pipes::{stream, Port, Chan};
+use comm::{stream, Port, Chan};
let (port, chan): (Port<int>, Chan<int>) = stream();
a tuple into its component parts).
~~~~
-# use pipes::{stream, Chan, Port};
+# use comm::{stream, Chan, Port};
let (port, chan): (Port<int>, Chan<int>) = stream();
~~~~
~~~~
# use task::{spawn};
# use task::spawn;
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
# fn some_expensive_computation() -> int { 42 }
# let (port, chan) = stream();
do spawn || {
port:
~~~~
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
# fn some_other_expensive_computation() {}
# let (port, chan) = stream::<int>();
# chan.send(0);
~~~ {.xfail-test}
# use task::{spawn};
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
# fn some_expensive_computation() -> int { 42 }
let (port, chan) = stream();
~~~
# use task::spawn;
-use pipes::{stream, SharedChan};
+use comm::{stream, SharedChan};
let (port, chan) = stream();
let chan = SharedChan(chan);
~~~
# use task::spawn;
-# use pipes::{stream, Port, Chan};
+# use comm::{stream, Port, Chan};
// Create a vector of ports, one for each child task
let ports = do vec::from_fn(3) |init_val| {
before returning. Hence:
~~~
-# use pipes::{stream, Chan, Port};
+# use comm::{stream, Chan, Port};
# use task::{spawn, try};
# fn sleep_forever() { loop { task::yield() } }
# do task::try {
~~~~
# use std::comm::DuplexStream;
-# use pipes::{Port, Chan};
+# use comm::{Port, Chan};
fn stringifier(channel: &DuplexStream<~str, uint>) {
let mut value: uint;
loop {
~~~~
# use std::comm::DuplexStream;
-# use pipes::{Port, Chan};
+# use comm::{Port, Chan};
# use task::spawn;
# fn stringifier(channel: &DuplexStream<~str, uint>) {
# let mut value: uint;
writeclose(pipe_in.out, input);
- let p = pipes::PortSet();
+ let p = comm::PortSet();
let ch = p.chan();
do task::spawn_sched(task::SingleThreaded) || {
let errput = readclose(pipe_err.in);
--- /dev/null
+// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+// Transitional -- needs snapshot
+#[allow(structural_records)];
+
+use either::{Either, Left, Right};
+use kinds::Owned;
+use option;
+use option::{Option, Some, None, unwrap};
+use private;
+use vec;
+
+use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
+
+// NOTE Making this public exposes some plumbing from pipes. Needs
+// some refactoring
+pub use pipes::Selectable;
+
+/// A trait for things that can send multiple messages.
+pub trait GenericChan<T> {
+ /// Sends a message.
+ fn send(x: T);
+}
+
+/// Things that can send multiple messages and can detect when the receiver
+/// is closed
+pub trait GenericSmartChan<T> {
+ /// Sends a message, or report if the receiver has closed the connection.
+ fn try_send(x: T) -> bool;
+}
+
+/// A trait for things that can receive multiple messages.
+pub trait GenericPort<T> {
+ /// Receives a message, or fails if the connection closes.
+ fn recv() -> T;
+
+ /** Receives a message, or returns `none` if
+ the connection is closed or closes.
+ */
+ fn try_recv() -> Option<T>;
+}
+
+/// Ports that can `peek`
+pub trait Peekable<T> {
+ /// Returns true if a message is available
+ pure fn peek() -> bool;
+}
+
+/// Returns the index of an endpoint that is ready to receive.
+pub fn selecti<T: Selectable>(endpoints: &[T]) -> uint {
+ wait_many(endpoints)
+}
+
+/// Returns 0 or 1 depending on which endpoint is ready to receive
+pub fn select2i<A: Selectable, B: Selectable>(a: &A, b: &B) ->
+ Either<(), ()> {
+ match wait_many([a.header(), b.header()]) {
+ 0 => Left(()),
+ 1 => Right(()),
+ _ => fail!(~"wait returned unexpected index")
+ }
+}
+
+// Streams - Make pipes a little easier in general.
+
+proto! streamp (
+ Open:send<T: Owned> {
+ data(T) -> Open<T>
+ }
+)
+
+#[doc(hidden)]
+struct Chan_<T> {
+ mut endp: Option<streamp::client::Open<T>>
+}
+
+/// An endpoint that can send many messages.
+pub enum Chan<T> {
+ Chan_(Chan_<T>)
+}
+
+struct Port_<T> {
+ mut endp: Option<streamp::server::Open<T>>,
+}
+
+/// An endpoint that can receive many messages.
+pub enum Port<T> {
+ Port_(Port_<T>)
+}
+
+/** Creates a `(chan, port)` pair.
+
+These allow sending or receiving an unlimited number of messages.
+
+*/
+pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
+ let (c, s) = streamp::init();
+
+ (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
+}
+
+impl<T: Owned> GenericChan<T> for Chan<T> {
+ fn send(x: T) {
+ let mut endp = None;
+ endp <-> self.endp;
+ self.endp = Some(
+ streamp::client::data(unwrap(endp), x))
+ }
+}
+
+impl<T: Owned> GenericSmartChan<T> for Chan<T> {
+
+ fn try_send(x: T) -> bool {
+ let mut endp = None;
+ endp <-> self.endp;
+ match streamp::client::try_data(unwrap(endp), x) {
+ Some(next) => {
+ self.endp = Some(next);
+ true
+ }
+ None => false
+ }
+ }
+}
+
+impl<T: Owned> GenericPort<T> for Port<T> {
+ fn recv() -> T {
+ let mut endp = None;
+ endp <-> self.endp;
+ let streamp::data(x, endp) = recv(unwrap(endp));
+ self.endp = Some(endp);
+ x
+ }
+
+ fn try_recv() -> Option<T> {
+ let mut endp = None;
+ endp <-> self.endp;
+ match try_recv(unwrap(endp)) {
+ Some(streamp::data(x, endp)) => {
+ self.endp = Some(endp);
+ Some(x)
+ }
+ None => None
+ }
+ }
+}
+
+impl<T: Owned> Peekable<T> for Port<T> {
+ pure fn peek() -> bool {
+ unsafe {
+ let mut endp = None;
+ endp <-> self.endp;
+ let peek = match &endp {
+ &Some(ref endp) => peek(endp),
+ &None => fail!(~"peeking empty stream")
+ };
+ self.endp <-> endp;
+ peek
+ }
+ }
+}
+
+impl<T: Owned> Selectable for Port<T> {
+ pure fn header() -> *PacketHeader {
+ unsafe {
+ match self.endp {
+ Some(ref endp) => endp.header(),
+ None => fail!(~"peeking empty stream")
+ }
+ }
+ }
+}
+
+/// Treat many ports as one.
+pub struct PortSet<T> {
+ mut ports: ~[Port<T>],
+}
+
+pub fn PortSet<T: Owned>() -> PortSet<T>{
+ PortSet {
+ ports: ~[]
+ }
+}
+
+impl<T: Owned> PortSet<T> {
+
+ fn add(port: Port<T>) {
+ self.ports.push(port)
+ }
+
+ fn chan() -> Chan<T> {
+ let (po, ch) = stream();
+ self.add(po);
+ ch
+ }
+}
+
+impl<T: Owned> GenericPort<T> for PortSet<T> {
+
+ fn try_recv() -> Option<T> {
+ let mut result = None;
+ // we have to swap the ports array so we aren't borrowing
+ // aliasable mutable memory.
+ let mut ports = ~[];
+ ports <-> self.ports;
+ while result.is_none() && ports.len() > 0 {
+ let i = wait_many(ports);
+ match ports[i].try_recv() {
+ Some(m) => {
+ result = Some(m);
+ }
+ None => {
+ // Remove this port.
+ let _ = ports.swap_remove(i);
+ }
+ }
+ }
+ ports <-> self.ports;
+ result
+ }
+
+ fn recv() -> T {
+ self.try_recv().expect("port_set: endpoints closed")
+ }
+
+}
+
+impl<T: Owned> Peekable<T> for PortSet<T> {
+ pure fn peek() -> bool {
+ // It'd be nice to use self.port.each, but that version isn't
+ // pure.
+ for vec::each(self.ports) |p| {
+ if p.peek() { return true }
+ }
+ false
+ }
+}
+
+/// A channel that can be shared between many senders.
+pub type SharedChan<T> = private::Exclusive<Chan<T>>;
+
+impl<T: Owned> GenericChan<T> for SharedChan<T> {
+ fn send(x: T) {
+ let mut xx = Some(x);
+ do self.with_imm |chan| {
+ let mut x = None;
+ x <-> xx;
+ chan.send(option::unwrap(x))
+ }
+ }
+}
+
+impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
+ fn try_send(x: T) -> bool {
+ let mut xx = Some(x);
+ do self.with_imm |chan| {
+ let mut x = None;
+ x <-> xx;
+ chan.try_send(option::unwrap(x))
+ }
+ }
+}
+
+/// Converts a `chan` into a `shared_chan`.
+pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
+ private::exclusive(c)
+}
+
+/// Receive a message from one of two endpoints.
+pub trait Select2<T: Owned, U: Owned> {
+ /// Receive a message or return `None` if a connection closes.
+ fn try_select() -> Either<Option<T>, Option<U>>;
+ /// Receive a message or fail if a connection closes.
+ fn select() -> Either<T, U>;
+}
+
+impl<T: Owned, U: Owned,
+ Left: Selectable + GenericPort<T>,
+ Right: Selectable + GenericPort<U>>
+ Select2<T, U> for (Left, Right) {
+
+ fn select() -> Either<T, U> {
+ match self {
+ (ref lp, ref rp) => match select2i(lp, rp) {
+ Left(()) => Left (lp.recv()),
+ Right(()) => Right(rp.recv())
+ }
+ }
+ }
+
+ fn try_select() -> Either<Option<T>, Option<U>> {
+ match self {
+ (ref lp, ref rp) => match select2i(lp, rp) {
+ Left(()) => Left (lp.try_recv()),
+ Right(()) => Right(rp.try_recv())
+ }
+ }
+ }
+}
+
+proto! oneshot (
+ Oneshot:send<T:Owned> {
+ send(T) -> !
+ }
+)
+
+/// The send end of a oneshot pipe.
+pub type ChanOne<T> = oneshot::client::Oneshot<T>;
+/// The receive end of a oneshot pipe.
+pub type PortOne<T> = oneshot::server::Oneshot<T>;
+
+/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
+pub fn oneshot<T: Owned>() -> (PortOne<T>, ChanOne<T>) {
+ let (chan, port) = oneshot::init();
+ (port, chan)
+}
+
+impl<T: Owned> PortOne<T> {
+ fn recv(self) -> T { recv_one(self) }
+ fn try_recv(self) -> Option<T> { try_recv_one(self) }
+}
+
+impl<T: Owned> ChanOne<T> {
+ fn send(self, data: T) { send_one(self, data) }
+ fn try_send(self, data: T) -> bool { try_send_one(self, data) }
+}
+
+/**
+ * Receive a message from a oneshot pipe, failing if the connection was
+ * closed.
+ */
+pub fn recv_one<T: Owned>(port: PortOne<T>) -> T {
+ let oneshot::send(message) = recv(port);
+ message
+}
+
+/// Receive a message from a oneshot pipe unless the connection was closed.
+pub fn try_recv_one<T: Owned> (port: PortOne<T>) -> Option<T> {
+ let message = try_recv(port);
+
+ if message.is_none() { None }
+ else {
+ let oneshot::send(message) = option::unwrap(message);
+ Some(message)
+ }
+}
+
+/// Send a message on a oneshot pipe, failing if the connection was closed.
+pub fn send_one<T: Owned>(chan: ChanOne<T>, data: T) {
+ oneshot::client::send(chan, data);
+}
+
+/**
+ * Send a message on a oneshot pipe, or return false if the connection was
+ * closed.
+ */
+pub fn try_send_one<T: Owned>(chan: ChanOne<T>, data: T)
+ -> bool {
+ oneshot::client::try_send(chan, data).is_some()
+}
+
+#[cfg(test)]
+pub mod test {
+ use either::{Either, Left, Right};
+ use super::{Chan, Port, oneshot, recv_one, stream};
+
+ #[test]
+ pub fn test_select2() {
+ let (p1, c1) = stream();
+ let (p2, c2) = stream();
+
+ c1.send(~"abc");
+
+ match (p1, p2).select() {
+ Right(_) => fail!(),
+ _ => ()
+ }
+
+ c2.send(123);
+ }
+
+ #[test]
+ pub fn test_oneshot() {
+ let (c, p) = oneshot::init();
+
+ oneshot::client::send(c, ());
+
+ recv_one(p)
+ }
+
+ #[test]
+ fn test_peek_terminated() {
+ let (port, chan): (Port<int>, Chan<int>) = stream();
+
+ {
+ // Destroy the channel
+ let _chan = chan;
+ }
+
+ assert !port.peek();
+ }
+}
#[path = "task/mod.rs"]
pub mod task;
+pub mod comm;
pub mod pipes;
pub use option;
pub use kinds;
pub use sys;
+ pub use pipes;
}
data: T,
}
-struct PacketHeader {
+pub struct PacketHeader {
mut state: State,
mut blocked_task: *rust_task,
mut buffer: *libc::c_void,
}
-fn PacketHeader() -> PacketHeader {
+pub fn PacketHeader() -> PacketHeader {
PacketHeader {
state: Empty,
blocked_task: ptr::null(),
}
}
-impl PacketHeader {
+pub impl PacketHeader {
// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> State {
rustrt::rust_task_ref(this);
}
}
-impl<T:Owned,Tb:Owned> Peekable<T> for RecvPacketBuffered<T, Tb> {
- pure fn peek() -> bool {
- peek(&self)
- }
-}
-
#[doc(hidden)]
fn sender_terminate<T:Owned>(p: *Packet<T>) {
let p = unsafe { &*p };
closed by the sender or has a message waiting to be received.
*/
-fn wait_many<T:Selectable>(pkts: &[T]) -> uint {
+pub fn wait_many<T: Selectable>(pkts: &[T]) -> uint {
let this = unsafe { rustrt::rust_get_task() };
unsafe {
}
#[doc(hidden)]
-trait Selectable {
+pub trait Selectable {
pure fn header() -> *PacketHeader;
}
client
}
-// Streams - Make pipes a little easier in general.
-
-proto! streamp (
- Open:send<T:Owned> {
- data(T) -> Open<T>
- }
-)
-
-/// A trait for things that can send multiple messages.
-pub trait GenericChan<T> {
- /// Sends a message.
- fn send(x: T);
-}
-
-/// Things that can send multiple messages and can detect when the receiver
-/// is closed
-pub trait GenericSmartChan<T> {
- /// Sends a message, or report if the receiver has closed the connection.
- fn try_send(x: T) -> bool;
-}
-
-/// A trait for things that can receive multiple messages.
-pub trait GenericPort<T> {
- /// Receives a message, or fails if the connection closes.
- fn recv() -> T;
-
- /** Receives a message, or returns `none` if
- the connection is closed or closes.
- */
- fn try_recv() -> Option<T>;
-}
-
-/// Ports that can `peek`
-pub trait Peekable<T> {
- /// Returns true if a message is available
- pure fn peek() -> bool;
-}
-
-#[doc(hidden)]
-struct Chan_<T> {
- mut endp: Option<streamp::client::Open<T>>
-}
-
-/// An endpoint that can send many messages.
-pub enum Chan<T> {
- Chan_(Chan_<T>)
-}
-
-#[doc(hidden)]
-struct Port_<T> {
- mut endp: Option<streamp::server::Open<T>>,
-}
-
-/// An endpoint that can receive many messages.
-pub enum Port<T> {
- Port_(Port_<T>)
-}
-
-/** Creates a `(chan, port)` pair.
-
-These allow sending or receiving an unlimited number of messages.
-
-*/
-pub fn stream<T:Owned>() -> (Port<T>, Chan<T>) {
- let (c, s) = streamp::init();
-
- (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) }))
-}
-
-impl<T:Owned> GenericChan<T> for Chan<T> {
- fn send(x: T) {
- let mut endp = None;
- endp <-> self.endp;
- self.endp = Some(
- streamp::client::data(unwrap(endp), x))
- }
-}
-
-impl<T:Owned> GenericSmartChan<T> for Chan<T> {
-
- fn try_send(x: T) -> bool {
- let mut endp = None;
- endp <-> self.endp;
- match streamp::client::try_data(unwrap(endp), x) {
- Some(next) => {
- self.endp = Some(next);
- true
- }
- None => false
- }
- }
-}
-
-impl<T:Owned> GenericPort<T> for Port<T> {
- fn recv() -> T {
- let mut endp = None;
- endp <-> self.endp;
- let streamp::data(x, endp) = pipes::recv(unwrap(endp));
- self.endp = Some(endp);
- x
- }
-
- fn try_recv() -> Option<T> {
- let mut endp = None;
- endp <-> self.endp;
- match pipes::try_recv(unwrap(endp)) {
- Some(streamp::data(x, endp)) => {
- self.endp = Some(endp);
- Some(x)
- }
- None => None
- }
- }
-}
-
-impl<T:Owned> Peekable<T> for Port<T> {
- pure fn peek() -> bool {
- unsafe {
- let mut endp = None;
- endp <-> self.endp;
- let peek = match &endp {
- &Some(ref endp) => pipes::peek(endp),
- &None => fail!(~"peeking empty stream")
- };
- self.endp <-> endp;
- peek
- }
- }
-}
-
-impl<T:Owned> Selectable for Port<T> {
- pure fn header() -> *PacketHeader {
- unsafe {
- match self.endp {
- Some(ref endp) => endp.header(),
- None => fail!(~"peeking empty stream")
- }
- }
- }
-}
-
-/// Treat many ports as one.
-pub struct PortSet<T> {
- mut ports: ~[pipes::Port<T>],
-}
-
-pub fn PortSet<T:Owned>() -> PortSet<T>{
- PortSet {
- ports: ~[]
- }
-}
-
-impl<T:Owned> PortSet<T> {
-
- fn add(port: pipes::Port<T>) {
- self.ports.push(port)
- }
-
- fn chan() -> Chan<T> {
- let (po, ch) = stream();
- self.add(po);
- ch
- }
-}
-
-impl<T:Owned> GenericPort<T> for PortSet<T> {
-
- fn try_recv() -> Option<T> {
- let mut result = None;
- // we have to swap the ports array so we aren't borrowing
- // aliasable mutable memory.
- let mut ports = ~[];
- ports <-> self.ports;
- while result.is_none() && ports.len() > 0 {
- let i = wait_many(ports);
- match ports[i].try_recv() {
- Some(m) => {
- result = Some(m);
- }
- None => {
- // Remove this port.
- let _ = ports.swap_remove(i);
- }
- }
- }
- ports <-> self.ports;
- result
- }
-
- fn recv() -> T {
- self.try_recv().expect("port_set: endpoints closed")
- }
-
-}
-
-impl<T:Owned> Peekable<T> for PortSet<T> {
- pure fn peek() -> bool {
- // It'd be nice to use self.port.each, but that version isn't
- // pure.
- for vec::each(self.ports) |p| {
- if p.peek() { return true }
- }
- false
- }
-}
-
-/// A channel that can be shared between many senders.
-pub type SharedChan<T> = private::Exclusive<Chan<T>>;
-
-impl<T:Owned> GenericChan<T> for SharedChan<T> {
- fn send(x: T) {
- let mut xx = Some(x);
- do self.with_imm |chan| {
- let mut x = None;
- x <-> xx;
- chan.send(option::unwrap(x))
- }
- }
-}
-
-impl<T:Owned> GenericSmartChan<T> for SharedChan<T> {
- fn try_send(x: T) -> bool {
- let mut xx = Some(x);
- do self.with_imm |chan| {
- let mut x = None;
- x <-> xx;
- chan.try_send(option::unwrap(x))
- }
- }
-}
-
-/// Converts a `chan` into a `shared_chan`.
-pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
- private::exclusive(c)
-}
-
-/// Receive a message from one of two endpoints.
-pub trait Select2<T:Owned,U:Owned> {
- /// Receive a message or return `None` if a connection closes.
- fn try_select() -> Either<Option<T>, Option<U>>;
- /// Receive a message or fail if a connection closes.
- fn select() -> Either<T, U>;
-}
-
-impl<T: Owned,
- U: Owned,
- Left: Selectable + GenericPort<T>,
- Right: Selectable + GenericPort<U>>
- Select2<T,U> for (Left, Right) {
- fn select() -> Either<T, U> {
- match self {
- (ref lp, ref rp) => match select2i(lp, rp) {
- Left(()) => Left (lp.recv()),
- Right(()) => Right(rp.recv())
- }
- }
- }
-
- fn try_select() -> Either<Option<T>, Option<U>> {
- match self {
- (ref lp, ref rp) => match select2i(lp, rp) {
- Left(()) => Left (lp.try_recv()),
- Right(()) => Right(rp.try_recv())
- }
- }
- }
-}
-
-proto! oneshot (
- Oneshot:send<T:Owned> {
- send(T) -> !
- }
-)
-
-/// The send end of a oneshot pipe.
-pub type ChanOne<T> = oneshot::client::Oneshot<T>;
-/// The receive end of a oneshot pipe.
-pub type PortOne<T> = oneshot::server::Oneshot<T>;
-
-/// Initialiase a (send-endpoint, recv-endpoint) oneshot pipe pair.
-pub fn oneshot<T:Owned>() -> (PortOne<T>, ChanOne<T>) {
- let (chan, port) = oneshot::init();
- (port, chan)
-}
-
-impl<T:Owned> PortOne<T> {
- fn recv(self) -> T { recv_one(self) }
- fn try_recv(self) -> Option<T> { try_recv_one(self) }
-}
-
-impl<T:Owned> ChanOne<T> {
- fn send(self, data: T) { send_one(self, data) }
- fn try_send(self, data: T) -> bool { try_send_one(self, data) }
-}
-
-/**
- * Receive a message from a oneshot pipe, failing if the connection was
- * closed.
- */
-pub fn recv_one<T:Owned>(port: PortOne<T>) -> T {
- let oneshot::send(message) = recv(port);
- message
-}
-
-/// Receive a message from a oneshot pipe unless the connection was closed.
-pub fn try_recv_one<T:Owned> (port: PortOne<T>) -> Option<T> {
- let message = try_recv(port);
-
- if message.is_none() { None }
- else {
- let oneshot::send(message) = option::unwrap(message);
- Some(message)
- }
-}
-
-/// Send a message on a oneshot pipe, failing if the connection was closed.
-pub fn send_one<T:Owned>(chan: ChanOne<T>, data: T) {
- oneshot::client::send(chan, data);
-}
-
-/**
- * Send a message on a oneshot pipe, or return false if the connection was
- * closed.
- */
-pub fn try_send_one<T:Owned>(chan: ChanOne<T>, data: T)
- -> bool {
- oneshot::client::try_send(chan, data).is_some()
-}
-
pub mod rt {
use option::{None, Option, Some};
#[cfg(test)]
pub mod test {
use either::{Either, Left, Right};
- use pipes::{Chan, Port, oneshot, recv_one, stream};
- use pipes;
+ use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
+ GenericPort, GenericChan, Peekable};
#[test]
pub fn test_select2() {
- let (p1, c1) = pipes::stream();
- let (p2, c2) = pipes::stream();
+ let (p1, c1) = stream();
+ let (p2, c2) = stream();
c1.send(~"abc");
pub use option;
pub use os;
pub use path;
-pub use pipes;
+pub use comm;
pub use private;
pub use ptr;
pub use rand;
use iter;
use libc;
use option;
-use pipes::{GenericChan, GenericPort};
+use comm::{GenericChan, GenericPort};
use prelude::*;
use ptr;
use result;
a normal large stack.
*/
pub unsafe fn run_in_bare_thread(f: ~fn()) {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
// FIXME #4525: Unfortunate that this creates an extra scheduler but it's
// necessary since rust_raw_thread_join_delete is blocking
do task::spawn_sched(task::SingleThreaded) {
// An unwrapper uses this protocol to communicate with the "other" task that
// drops the last refcount on an arc. Unfortunately this can't be a proper
// pipe protocol because the unwrapper has to access both stages at once.
-type UnwrapProto = ~mut Option<(pipes::ChanOne<()>, pipes::PortOne<bool>)>;
+type UnwrapProto = ~mut Option<(comm::ChanOne<()>, comm::PortOne<bool>)>;
struct ArcData<T> {
mut count: libc::intptr_t,
cast::reinterpret_cast(&data.unwrapper);
let (message, response) = option::swap_unwrap(p);
// Send 'ready' and wait for a response.
- pipes::send_one(message, ());
+ comm::send_one(message, ());
// Unkillable wait. Message guaranteed to come.
- if pipes::recv_one(response) {
+ if comm::recv_one(response) {
// Other task got the data.
cast::forget(data);
} else {
-> T {
struct DeathThroes<T> {
mut ptr: Option<~ArcData<T>>,
- mut response: Option<pipes::ChanOne<bool>>,
+ mut response: Option<comm::ChanOne<bool>>,
drop {
unsafe {
let response = option::swap_unwrap(&mut self.response);
// tried to wake us whether they should hand-off the data to
// us.
if task::failing() {
- pipes::send_one(response, false);
+ comm::send_one(response, false);
// Either this swap_unwrap or the one below (at "Got
// here") ought to run.
cast::forget(option::swap_unwrap(&mut self.ptr));
} else {
assert self.ptr.is_none();
- pipes::send_one(response, true);
+ comm::send_one(response, true);
}
}
}
do task::unkillable {
let ptr: ~ArcData<T> = cast::reinterpret_cast(&rc.data);
- let (p1,c1) = pipes::oneshot(); // ()
- let (p2,c2) = pipes::oneshot(); // bool
+ let (p1,c1) = comm::oneshot(); // ()
+ let (p2,c2) = comm::oneshot(); // bool
let server: UnwrapProto = ~mut Some((c1,p2));
let serverp: int = cast::transmute(server);
// Try to put our server end in the unwrapper slot.
response: Some(c2) };
let mut p1 = Some(p1); // argh
do task::rekillable {
- pipes::recv_one(option::swap_unwrap(&mut p1));
+ comm::recv_one(option::swap_unwrap(&mut p1));
}
// Got here. Back in the 'unkillable' without getting killed.
// Recover ownership of ptr, then take the data out.
use core::option::{None, Some};
use option;
- use pipes;
+ use comm;
use private::{exclusive, unwrap_exclusive};
use result;
use task;
for uint::range(0, num_tasks) |_i| {
let total = total.clone();
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
futures.push(port);
do task::spawn || {
use private::at_exit::at_exit;
use private::global::global_data_clone_create;
use private::finally::Finally;
-use pipes::{Port, Chan, SharedChan, GenericChan, GenericPort,
- GenericSmartChan, stream};
+use comm::{Port, Chan, SharedChan, GenericChan,
+ GenericPort, GenericSmartChan, stream};
use task::{Task, task, spawn};
use task::rt::{task_id, get_task_id};
use hashmap::linear::LinearMap;
#[test]
fn test_select_stream_and_oneshot() {
- use pipes::select2i;
+ use comm::select2i;
use either::{Left, Right};
let (port, chan) = stream();
use io::ReaderUtil;
use libc;
use libc::{pid_t, c_void, c_int};
-use pipes::{stream, SharedChan, GenericChan, GenericPort};
+use comm::{stream, SharedChan, GenericChan, GenericPort};
use option::{Some, None};
use os;
use prelude::*;
use libc;
use option;
use result::Result;
-use pipes::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
+use comm::{stream, Chan, GenericChan, GenericPort, Port, SharedChan};
use pipes;
use prelude::*;
use ptr;
#[ignore(cfg(windows))]
#[should_fail]
fn test_unkillable_nested() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
// We want to do this after failing
do spawn_unlinked || {
#[test]
fn test_sched_thread_per_core() {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
do spawn_sched(ThreadPerCore) || {
unsafe {
#[test]
fn test_spawn_thread_on_demand() {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
do spawn_sched(ManualThreads(2)) || {
unsafe {
let running_threads = rt::rust_sched_current_nonlazy_threads();
assert(running_threads as int == 1);
- let (port2, chan2) = pipes::stream();
+ let (port2, chan2) = comm::stream();
do spawn_sched(CurrentScheduler) || {
chan2.send(());
use cast;
use container::Map;
use option;
-use pipes::{Chan, GenericChan, GenericPort, Port, stream};
+use comm::{Chan, GenericChan, GenericPort, Port, stream};
use pipes;
use prelude::*;
use private;
#[test]
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_success() {
- let (notify_po, notify_ch) = pipes::stream();
+ let (notify_po, notify_ch) = comm::stream();
let opts = task::TaskOpts {
notify_chan: Some(notify_ch),
#[ignore(cfg(windows))]
fn test_spawn_raw_notify_failure() {
// New bindings for these
- let (notify_po, notify_ch) = pipes::stream();
+ let (notify_po, notify_ch) = comm::stream();
let opts = task::TaskOpts {
linked: false,
bug and need to present an error.
*/
pub fn monitor(+f: fn~(diagnostic::Emitter)) {
- use core::pipes::*;
+ use core::comm::*;
use std::cell::Cell;
let (p, ch) = stream();
let ch = SharedChan(ch);
use util;
use std::cell::Cell;
-use core::pipes::{stream, Chan, SharedChan, Port};
+use core::comm::{stream, Chan, SharedChan, Port};
use core::vec;
use core::ops::Drop;
use rustc::back::link;
use core::io;
use core::libc;
use core::os;
-use core::pipes;
+use core::comm;
use core::result;
use core::run;
use core::str;
use core::task;
-use core::pipes::*;
+use core::comm::*;
use std::future;
use syntax;
os::close(pipe_err.out);
os::close(pipe_in.out);
- let (stdout_po, stdout_ch) = pipes::stream();
+ let (stdout_po, stdout_ch) = comm::stream();
do task::spawn_sched(task::SingleThreaded) || {
stdout_ch.send(readclose(pipe_out.in));
}
- let (stderr_po, stderr_ch) = pipes::stream();
+ let (stderr_po, stderr_ch) = comm::stream();
do task::spawn_sched(task::SingleThreaded) || {
stderr_ch.send(readclose(pipe_err.in));
}
let (markdown_po, markdown_ch) = stream();
let markdown_ch = SharedChan(markdown_ch);
let writer_factory = fn~(page: doc::Page) -> Writer {
- let (writer_po, writer_ch) = pipes::stream();
+ let (writer_po, writer_ch) = comm::stream();
let markdown_ch = markdown_ch.clone();
do task::spawn || {
let (writer, future) = future_writer();
}
fn future_writer() -> (Writer, future::Future<~str>) {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
let writer = fn~(instr: WriteInstr) {
chan.send(copy instr);
};
use core::option;
use core::vec;
-use core::pipes::*;
+use core::comm::*;
use syntax::ast;
pub fn mk_pass(output_style: config::OutputStyle) -> Pass {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = arc::ARC(v);
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
do task::spawn() || {
- let p = pipes::PortSet();
+ let p = comm::PortSet();
c.send(p.chan());
let arc_v = p.recv();
pub fn test_mutex_arc_condvar() {
let arc = ~MutexARC(false);
let arc2 = ~arc.clone();
- let (p,c) = pipes::oneshot();
+ let (p,c) = comm::oneshot();
let (c,p) = (~mut Some(c), ~mut Some(p));
do task::spawn || {
// wait until parent gets in
- pipes::recv_one(option::swap_unwrap(p));
+ comm::recv_one(option::swap_unwrap(p));
do arc2.access_cond |state, cond| {
*state = true;
cond.signal();
}
}
do arc.access_cond |state, cond| {
- pipes::send_one(option::swap_unwrap(c), ());
+ comm::send_one(option::swap_unwrap(c), ());
assert !*state;
while !*state {
cond.wait();
pub fn test_arc_condvar_poison() {
let arc = ~MutexARC(1);
let arc2 = ~arc.clone();
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
do task::spawn_unlinked || {
let _ = p.recv();
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexARC(1);
let arc2 = ~(&arc).clone();
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
do task::spawn || {
do arc2.access |one| {
c.send(());
pub fn test_rw_arc() {
let arc = ~RWARC(0);
let arc2 = ~arc.clone();
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
do task::spawn || {
do arc2.write |num| {
// Reader tasks
let mut reader_convos = ~[];
for 10.times {
- let ((rp1,rc1),(rp2,rc2)) = (pipes::stream(),pipes::stream());
+ let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
reader_convos.push((rc1, rp2));
let arcn = ~arc.clone();
do task::spawn || {
// Writer task
let arc2 = ~arc.clone();
- let ((wp1,wc1),(wp2,wc2)) = (pipes::stream(),pipes::stream());
+ let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
do task::spawn || {
wp1.recv();
do arc2.write_cond |state, cond| {
*/
-use core::pipes::{GenericChan, GenericSmartChan, GenericPort};
-use core::pipes::{Chan, Port, Selectable, Peekable};
+use core::comm::{GenericChan, GenericSmartChan, GenericPort};
+use core::comm::{Chan, Port, Selectable, Peekable};
use core::pipes;
use core::prelude::*;
pub fn DuplexStream<T:Owned,U:Owned>()
-> (DuplexStream<T, U>, DuplexStream<U, T>)
{
- let (p1, c2) = pipes::stream();
- let (p2, c1) = pipes::stream();
+ let (p1, c2) = comm::stream();
+ let (p2, c1) = comm::stream();
(DuplexStream {
chan: c1,
port: p1
// The basic send/recv interface FlatChan and PortChan will implement
use core::io;
-use core::pipes::GenericChan;
-use core::pipes::GenericPort;
+use core::comm::GenericChan;
+use core::comm::GenericPort;
use core::pipes;
use core::prelude::*;
use core::sys::size_of;
use flatpipes::{FlatPort, FlatChan};
use core::io::{Reader, Writer};
- use core::pipes::{Port, Chan};
- use core::pipes;
+ use core::comm::{Port, Chan};
+ use core::comm;
pub type ReaderPort<T, R> = FlatPort<
T, DeserializingUnflattener<DefaultDecoder, T>,
pub fn pipe_stream<T: Encodable<DefaultEncoder> +
Decodable<DefaultDecoder>>(
) -> (PipePort<T>, PipeChan<T>) {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
return (pipe_port(port), pipe_chan(chan));
}
}
use flatpipes::{FlatPort, FlatChan};
use core::io::{Reader, Writer};
- use core::pipes::{Port, Chan};
- use core::pipes;
+ use core::comm::{Port, Chan};
+ use core::comm;
use core::prelude::*;
pub type ReaderPort<T, R> =
/// Create a pair of `FlatChan` and `FlatPort`, backed by pipes
pub fn pipe_stream<T:Copy + Owned>() -> (PipePort<T>, PipeChan<T>) {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
return (pipe_port(port), pipe_chan(chan));
}
use flatpipes::{ByteChan, BytePort};
use core::io::{Writer, Reader, ReaderUtil};
- use core::pipes::{Port, Chan};
+ use core::comm::{Port, Chan};
use core::pipes;
use core::prelude::*;
}
pub struct PipeBytePort {
- port: pipes::Port<~[u8]>,
+ port: comm::Port<~[u8]>,
mut buf: ~[u8]
}
pub struct PipeByteChan {
- chan: pipes::Chan<~[u8]>
+ chan: comm::Chan<~[u8]>
}
pub impl BytePort for PipeBytePort {
use uv;
// Indicate to the client task that the server is listening
- let (begin_connect_port, begin_connect_chan) = pipes::stream();
+ let (begin_connect_port, begin_connect_chan) = comm::stream();
// The connection is sent from the server task to the receiver task
// to handle the connection
- let (accept_port, accept_chan) = pipes::stream();
+ let (accept_port, accept_chan) = comm::stream();
// The main task will wait until the test is over to proceed
- let (finish_port, finish_chan) = pipes::stream();
+ let (finish_port, finish_chan) = comm::stream();
let addr0 = ip::v4::parse_addr("127.0.0.1");
}) |new_conn, kill_ch| {
// Incoming connection. Send it to the receiver task to accept
- let (res_port, res_chan) = pipes::stream();
+ let (res_port, res_chan) = comm::stream();
accept_chan.send((new_conn, res_chan));
// Wait until the connection is accepted
res_port.recv();
fn pipe_port_loader(bytes: ~[u8]
) -> pod::PipePort<int> {
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
if !bytes.is_empty() {
chan.send(bytes);
}
use core::cast;
use core::either::Either;
use core::option;
-use core::pipes::{recv, oneshot, ChanOne, PortOne, send_one, recv_one};
+use core::comm::{oneshot, ChanOne, PortOne, send_one, recv_one};
+use core::pipes::recv;
use core::prelude::*;
use core::task;
use future::*;
- use core::pipes::oneshot;
+ use core::comm::oneshot;
use core::task;
#[test]
use core::libc;
use core::prelude::*;
-use core::pipes::{stream, SharedChan};
+use core::comm::{stream, SharedChan};
use core::ptr;
use core::result;
use core::str;
use core::io;
use core::libc::size_t;
use core::libc;
-use core::pipes::{stream, Chan, Port, SharedChan};
+use core::comm::{stream, Chan, Port, SharedChan};
use core::prelude::*;
use core::ptr;
use core::result::{Result};
use uv;
use core::io;
- use core::pipes::{stream, Chan, Port, SharedChan};
+ use core::comm::{stream, Chan, Port, SharedChan};
use core::prelude::*;
use core::result;
use core::str;
// Each waiting task receives on one of these.
#[doc(hidden)]
-type WaitEnd = pipes::PortOne<()>;
+type WaitEnd = comm::PortOne<()>;
#[doc(hidden)]
-type SignalEnd = pipes::ChanOne<()>;
+type SignalEnd = comm::ChanOne<()>;
// A doubly-ended queue of waiting tasks.
#[doc(hidden)]
-struct Waitqueue { head: pipes::Port<SignalEnd>,
- tail: pipes::Chan<SignalEnd> }
+struct Waitqueue { head: comm::Port<SignalEnd>,
+ tail: comm::Chan<SignalEnd> }
fn new_waitqueue() -> Waitqueue {
- let (block_head, block_tail) = pipes::stream();
+ let (block_head, block_tail) = comm::stream();
Waitqueue { head: block_head, tail: block_tail }
}
if q.head.peek() {
// Pop and send a wakeup signal. If the waiter was killed, its port
// will have closed. Keep trying until we get a live task.
- if pipes::try_send_one(q.head.recv(), ()) {
+ if comm::try_send_one(q.head.recv(), ()) {
true
} else {
signal_waitqueue(q)
fn broadcast_waitqueue(q: &Waitqueue) -> uint {
let mut count = 0;
while q.head.peek() {
- if pipes::try_send_one(q.head.recv(), ()) {
+ if comm::try_send_one(q.head.recv(), ()) {
count += 1;
}
}
state.count -= 1;
if state.count < 0 {
// Create waiter nobe.
- let (WaitEnd, SignalEnd) = pipes::oneshot();
+ let (WaitEnd, SignalEnd) = comm::oneshot();
// Tell outer scope we need to block.
waiter_nobe = Some(WaitEnd);
// Enqueue ourself.
/* for 1000.times { task::yield(); } */
// Need to wait outside the exclusive.
if waiter_nobe.is_some() {
- let _ = pipes::recv_one(option::unwrap(waiter_nobe));
+ let _ = comm::recv_one(option::unwrap(waiter_nobe));
}
}
fn release() {
*/
fn wait_on(condvar_id: uint) {
// Create waiter nobe.
- let (WaitEnd, SignalEnd) = pipes::oneshot();
+ let (WaitEnd, SignalEnd) = comm::oneshot();
let mut WaitEnd = Some(WaitEnd);
let mut SignalEnd = Some(SignalEnd);
let mut reacquire = None;
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
- let _ = pipes::recv_one(option::swap_unwrap(&mut WaitEnd));
+ let _ = comm::recv_one(option::swap_unwrap(&mut WaitEnd));
}
// This is needed for a failing condition variable to reacquire the
#[test]
pub fn test_sem_as_cvar() {
/* Child waits and parent signals */
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let s = ~semaphore(0);
let s2 = ~s.clone();
do task::spawn || {
let _ = p.recv();
/* Parent waits and child signals */
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let s = ~semaphore(0);
let s2 = ~s.clone();
do task::spawn || {
// time, and shake hands.
let s = ~semaphore(2);
let s2 = ~s.clone();
- let (p1,c1) = pipes::stream();
- let (p2,c2) = pipes::stream();
+ let (p1,c1) = comm::stream();
+ let (p2,c2) = comm::stream();
do task::spawn || {
do s2.access {
let _ = p2.recv();
do task::spawn_sched(task::ManualThreads(1)) {
let s = ~semaphore(1);
let s2 = ~s.clone();
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let child_data = ~mut Some((s2, c));
do s.access {
let (s2,c) = option::swap_unwrap(child_data);
pub fn test_mutex_lock() {
// Unsafely achieve shared state, and do the textbook
// "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let m = ~Mutex();
let m2 = ~m.clone();
let mut sharedstate = ~0;
cond.wait();
}
// Parent wakes up child
- let (port,chan) = pipes::stream();
+ let (port,chan) = comm::stream();
let m3 = ~m.clone();
do task::spawn || {
do m3.lock_cond |cond| {
for num_waiters.times {
let mi = ~m.clone();
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
ports.push(port);
do task::spawn || {
do mi.lock_cond |cond| {
let m2 = ~m.clone();
let result: result::Result<(),()> = do task::try || {
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
do task::spawn || { // linked
let _ = p.recv(); // wait for sibling to get in the mutex
task::yield();
pub fn test_mutex_killed_broadcast() {
let m = ~Mutex();
let m2 = ~m.clone();
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let result: result::Result<(),()> = do task::try || {
let mut sibling_convos = ~[];
for 2.times {
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let c = ~mut Some(c);
sibling_convos.push(p);
let mi = ~m2.clone();
assert woken == 0;
}
struct SendOnFailure {
- c: pipes::Chan<()>,
+ c: comm::Chan<()>,
}
impl Drop for SendOnFailure {
}
}
- fn SendOnFailure(c: pipes::Chan<()>) -> SendOnFailure {
+ fn SendOnFailure(c: comm::Chan<()>) -> SendOnFailure {
SendOnFailure {
c: c
}
let result = do task::try {
let m = ~mutex_with_condvars(2);
let m2 = ~m.clone();
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
do task::spawn || {
do m2.lock_cond |cond| {
c.send(());
mode2: RWlockMode) {
// Test mutual exclusion between readers and writers. Just like the
// mutex mutual exclusion test, a ways above.
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let x2 = ~x.clone();
let mut sharedstate = ~0;
let ptr = ptr::addr_of(&(*sharedstate));
make_mode2_go_first: bool) {
// Much like sem_multi_resource.
let x2 = ~x.clone();
- let (p1,c1) = pipes::stream();
- let (p2,c2) = pipes::stream();
+ let (p1,c1) = comm::stream();
+ let (p2,c2) = comm::stream();
do task::spawn || {
if !make_mode2_go_first {
let _ = p2.recv(); // parent sends to us once it locks, or ...
cond.wait();
}
// Parent wakes up child
- let (port,chan) = pipes::stream();
+ let (port,chan) = comm::stream();
let x3 = ~x.clone();
do task::spawn || {
do x3.write_cond |cond| {
for num_waiters.times {
let xi = ~x.clone();
- let (port, chan) = pipes::stream();
+ let (port, chan) = comm::stream();
ports.push(port);
do task::spawn || {
do lock_cond(xi, dg1) |cond| {
/// parallelism.
use core::io;
-use core::pipes::{Chan, Port};
+use core::comm::{Chan, Port};
use core::pipes;
use core::prelude::*;
use core::task::{SchedMode, SingleThreaded};
assert n_tasks >= 1;
let channels = do vec::from_fn(n_tasks) |i| {
- let (port, chan) = pipes::stream::<Msg<T>>();
+ let (port, chan) = comm::stream::<Msg<T>>();
let init_fn = init_fn_factory();
let task_body: ~fn() = || {
use core::io::WriterUtil;
use core::io;
use core::libc::size_t;
-use core::pipes::{stream, Chan, Port, SharedChan};
+use core::comm::{stream, Chan, Port, SharedChan};
use core::option;
use core::prelude::*;
use core::result;
use test::{TestOpts, run_test};
use core::either;
- use core::pipes::{stream, SharedChan};
+ use core::comm::{stream, SharedChan};
use core::option;
use core::vec;
use core::libc;
use core::libc::c_void;
use core::cast::transmute;
-use core::pipes::{stream, Chan, SharedChan, Port, select2i};
+use core::comm::{stream, Chan, SharedChan, Port, select2i};
use core::prelude::*;
use core::ptr;
use core;
use core::either::{Left, Right};
use core::libc;
-use core::pipes::{Port, Chan, SharedChan, select2i};
+use core::comm::{Port, Chan, SharedChan, select2i};
use core::private::global::{global_data_clone_create,
global_data_clone};
use core::private::weak_task::weaken_task;
use core::task;
use core::cast::transmute;
use core::libc::c_void;
- use core::pipes::{stream, SharedChan, Chan};
+ use core::comm::{stream, SharedChan, Chan};
extern fn simple_timer_close_cb(timer_ptr: *ll::uv_timer_t) {
unsafe {
use core::libc::c_void;
use core::libc;
-use core::pipes::{stream, Port, Chan, SharedChan};
+use core::comm::{stream, Port, Chan, SharedChan};
use core::prelude::*;
use core::ptr::addr_of;
use core::task::TaskBuilder;
use core::ptr;
use core::str;
use core::vec;
-use core::pipes::{stream, Chan, SharedChan, Port};
+use core::comm::{stream, Chan, SharedChan, Port};
// libuv struct mappings
pub struct uv_ip4_addr {
use core::either::{Either, Left, Right};
use core::io;
use core::option;
-use core::pipes::{recv, oneshot, PortOne, send_one};
+use core::comm::{oneshot, PortOne, send_one};
+use core::pipes::recv;
use core::prelude::*;
use core::result;
use core::run;
};
body += ~"let b = pipe.reuse_buffer();\n";
- body += fmt!("let %s = ::pipes::SendPacketBuffered(\
+ body += fmt!("let %s = ::core::pipes::SendPacketBuffered(\
::ptr::addr_of(&(b.buffer.data.%s)));\n",
sp, next.name);
- body += fmt!("let %s = ::pipes::RecvPacketBuffered(\
+ body += fmt!("let %s = ::core::pipes::RecvPacketBuffered(\
::ptr::addr_of(&(b.buffer.data.%s)));\n",
rp, next.name);
}
(recv, recv) => "(c, s)"
};
- body += fmt!("let %s = ::pipes::entangle();\n", pat);
+ body += fmt!("let %s = ::core::pipes::entangle();\n", pat);
}
body += fmt!("let message = %s(%s);\n",
self.name(),
~"s"), ~", "));
if !try {
- body += fmt!("::pipes::send(pipe, message);\n");
+ body += fmt!("::core::pipes::send(pipe, message);\n");
// return the new channel
body += ~"c }";
}
else {
- body += fmt!("if ::pipes::send(pipe, message) {\n \
- ::pipes::rt::make_some(c) \
- } else { ::pipes::rt::make_none() } }");
+ body += fmt!("if ::core::pipes::send(pipe, message) {\n \
+ ::core::pipes::rt::make_some(c) \
+ } else { ::core::pipes::rt::make_none() } }");
}
let body = cx.parse_expr(body);
message_args);
if !try {
- body += fmt!("::pipes::send(pipe, message);\n");
+ body += fmt!("::core::pipes::send(pipe, message);\n");
body += ~" }";
} else {
- body += fmt!("if ::pipes::send(pipe, message) \
+ body += fmt!("if ::core::pipes::send(pipe, message) \
{ \
- ::pipes::rt::make_some(()) \
+ ::core::pipes::rt::make_some(()) \
} else { \
- ::pipes::rt::make_none() \
+ ::core::pipes::rt::make_none() \
} }");
}
self.data_name(),
self.span,
cx.ty_path_ast_builder(
- path_global(~[cx.ident_of(~"pipes"),
+ path_global(~[cx.ident_of(~"core"),
+ cx.ident_of(~"pipes"),
cx.ident_of(dir.to_str() + ~"Packet")],
dummy_sp())
.add_ty(cx.ty_path_ast_builder(
self.data_name(),
self.span,
cx.ty_path_ast_builder(
- path_global(~[cx.ident_of(~"pipes"),
+ path_global(~[cx.ident_of(~"core"),
+ cx.ident_of(~"pipes"),
cx.ident_of(dir.to_str()
+ ~"PacketBuffered")],
dummy_sp())
let body = if !self.is_bounded() {
match start_state.dir {
- send => quote_expr!( ::pipes::entangle() ),
+ send => quote_expr!( ::core::pipes::entangle() ),
recv => {
quote_expr!({
- let (s, c) = ::pipes::entangle();
+ let (s, c) = ::core::pipes::entangle();
(c, s)
})
}
};
cx.parse_item(fmt!("pub fn init%s() -> (client::%s, server::%s)\
- { use pipes::HasBuffer; %s }",
+ { use core::pipes::HasBuffer; %s }",
start_state.ty_params.to_source(cx),
start_state.to_ty(cx).to_source(cx),
start_state.to_ty(cx).to_source(cx),
let fty = s.to_ty(ext_cx);
ext_cx.field_imm(ext_cx.ident_of(s.name),
quote_expr!(
- ::pipes::mk_packet::<$fty>()
+ ::core::pipes::mk_packet::<$fty>()
))
}))
}
fn gen_init_bounded(&self, ext_cx: ext_ctxt) -> @ast::expr {
debug!("gen_init_bounded");
let buffer_fields = self.gen_buffer_init(ext_cx);
- let buffer = quote_expr!(~::pipes::Buffer {
- header: ::pipes::BufferHeader(),
+ let buffer = quote_expr!(~::core::pipes::Buffer {
+ header: ::core::pipes::BufferHeader(),
data: $buffer_fields,
});
quote_expr!({
let buffer = $buffer;
- do ::pipes::entangle_buffer(buffer) |buffer, data| {
+ do ::core::pipes::entangle_buffer(buffer) |buffer, data| {
$entangle_body
}
})
}
}
let ty = s.to_ty(cx);
- let fty = quote_ty!( ::pipes::Packet<$ty> );
+ let fty = quote_ty!( ::core::pipes::Packet<$ty> );
@spanned {
node: ast::struct_field_ {
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
pub fn foo<T:Owned + Copy>(x: T) -> Port<T> {
let (p, c) = stream();
use io::Writer;
use io::WriterUtil;
-use pipes::{Port, Chan, SharedChan};
+use comm::{Port, Chan, SharedChan};
macro_rules! move_out (
{ $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
stop
}
-fn server(requests: Port<request>, responses: pipes::Chan<uint>) {
+fn server(requests: Port<request>, responses: comm::Chan<uint>) {
let mut count = 0u;
let mut done = false;
while !done {
}
fn run(args: &[~str]) {
- let (from_child, to_parent) = pipes::stream();
- let (from_parent, to_child) = pipes::stream();
+ let (from_child, to_parent) = comm::stream();
+ let (from_parent, to_child) = comm::stream();
let to_child = SharedChan(to_child);
use io::Writer;
use io::WriterUtil;
-use pipes::{Port, PortSet, Chan};
+use comm::{Port, PortSet, Chan, stream};
macro_rules! move_out (
{ $x:expr } => { unsafe { let y = *ptr::addr_of(&($x)); y } }
stop
}
-fn server(requests: PortSet<request>, responses: pipes::Chan<uint>) {
+fn server(requests: PortSet<request>, responses: Chan<uint>) {
let mut count = 0;
let mut done = false;
while !done {
}
fn run(args: &[~str]) {
- let (from_child, to_parent) = pipes::stream();
- let (from_parent_, to_child) = pipes::stream();
+ let (from_child, to_parent) = stream();
+ let (from_parent_, to_child) = stream();
let from_parent = PortSet();
from_parent.add(from_parent_);
let start = std::time::precise_time_s();
let mut worker_results = ~[];
for uint::range(0, workers) |_i| {
- let (from_parent_, to_child) = pipes::stream();
+ let (from_parent_, to_child) = stream();
from_parent.add(from_parent_);
do task::task().future_result(|+r| {
worker_results.push(r);
use std::time;
use std::future;
-use pipes::recv;
+use core::pipes::recv;
proto! ring (
num:send {
extern mod std;
-use pipes::{spawn_service, recv};
+use core::pipes::{spawn_service, recv};
use std::time::precise_time_s;
proto! pingpong (
)
)
-fn switch<T:Owned,Tb:Owned,U>(+endp: pipes::RecvPacketBuffered<T, Tb>,
+fn switch<T:Owned,Tb:Owned,U>(+endp: core::pipes::RecvPacketBuffered<T, Tb>,
f: fn(+v: Option<T>) -> U) -> U {
- f(pipes::try_recv(endp))
+ f(core::pipes::try_recv(endp))
}
// Here's the benchmark
use std::oldmap::HashMap;
use std::sort;
use std::cell::Cell;
-use core::pipes::*;
+use core::comm::*;
fn print_complements() {
let all = ~[Blue, Red, Yellow];
use std::oldmap::HashMap;
use std::sort;
use io::ReaderUtil;
-use pipes::{stream, Port, Chan};
+use comm::{stream, Port, Chan};
use cmp::Ord;
// given a map, print a sorted version of it
return vec::slice(bb, len - (nn - 1u), len).to_vec();
}
-fn make_sequence_processor(sz: uint, from_parent: pipes::Port<~[u8]>,
- to_parent: pipes::Chan<~str>) {
+fn make_sequence_processor(sz: uint, from_parent: comm::Port<~[u8]>,
+ to_parent: comm::Chan<~str>) {
let freqs: HashMap<~[u8], uint> = oldmap::HashMap();
let mut carry: ~[u8] = ~[];
from_child.push(from_child_);
- let (from_parent, to_child) = pipes::stream();
+ let (from_parent, to_child) = comm::stream();
do task::spawn_with(from_parent) |from_parent| {
make_sequence_processor(sz, from_parent, to_parent_);
fn get_type(&self) -> io::WriterType { io::File }
}
-fn writer(path: ~str, pport: pipes::Port<Line>, size: uint)
+fn writer(path: ~str, pport: comm::Port<Line>, size: uint)
{
let cout: io::Writer = match path {
~"" => {
let size = if vec::len(args) < 2_u { 80_u }
else { uint::from_str(args[1]).get() };
- let (pport, pchan) = pipes::stream();
- let pchan = pipes::SharedChan(pchan);
+ let (pport, pchan) = comm::stream();
+ let pchan = comm::SharedChan(pchan);
for uint::range(0_u, size) |j| {
let cchan = pchan.clone();
do task::spawn { cchan.send(chanmb(j, size, depth)) };
extern mod std;
use std::{time, getopts};
-use io::WriterUtil;
-use int::range;
-use pipes::Port;
-use pipes::Chan;
-use pipes::send;
-use pipes::recv;
+use core::int::range;
+use core::comm::*;
+use core::io::WriterUtil;
use core::result;
use result::{Ok, Err};
} else if n <= 2 {
c.send(1);
} else {
- let p = pipes::PortSet();
+ let p = PortSet();
let ch = p.chan();
task::spawn(|| pfib(ch, n - 1) );
let ch = p.chan();
}
}
- let (p, ch) = pipes::stream();
+ let (p, ch) = stream();
let _t = task::spawn(|| pfib(ch, n) );
p.recv()
}
//
// The filename is a song reference; google it in quotes.
-fn child_generation(gens_left: uint, -c: pipes::Chan<()>) {
+fn child_generation(gens_left: uint, -c: comm::Chan<()>) {
// This used to be O(n^2) in the number of generations that ever existed.
// With this code, only as many generations are alive at a time as tasks
// alive at a time,
copy args
};
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
child_generation(uint::from_str(args[1]).get(), c);
if p.try_recv().is_none() {
fail!(~"it happened when we slumbered");
// Creates in the background 'num_tasks' tasks, all blocked forever.
// Doesn't return until all such tasks are ready, but doesn't block forever itself.
-use core::pipes::*;
+use core::comm::*;
fn grandchild_group(num_tasks: uint) {
let (po, ch) = stream();
// Test for concurrent tasks
-use core::pipes::*;
+use core::comm::*;
fn calc(children: uint, parent_wait_chan: &Chan<Chan<Chan<int>>>) {
// except according to those terms.
fn main() {
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
let x = Some(p);
c.send(false);
match x {
fn main() {
let cat = ~"kitty";
- let (_, ch) = pipes::stream(); //~ ERROR does not fulfill `Owned`
+ let (_, ch) = comm::stream(); //~ ERROR does not fulfill `Owned`
ch.send(foo(42, @(cat))); //~ ERROR does not fulfill `Owned`
}
fn child() { assert (1 == 2); }
fn main() {
- let (p, _c) = pipes::stream::<int>();
+ let (p, _c) = comm::stream::<int>();
task::spawn(|| child() );
let x = p.recv();
}
fn child() { fail!(); }
fn main() {
- let (p, _c) = pipes::stream::<()>();
+ let (p, _c) = comm::stream::<()>();
task::spawn(|| child() );
task::yield();
}
fn grandchild() { fail!(~"grandchild dies"); }
fn child() {
- let (p, _c) = pipes::stream::<int>();
+ let (p, _c) = comm::stream::<int>();
task::spawn(|| grandchild() );
let x = p.recv();
}
fn main() {
- let (p, _c) = pipes::stream::<int>();
+ let (p, _c) = comm::stream::<int>();
task::spawn(|| child() );
let x = p.recv();
}
fn child() { assert (1 == 2); }
fn parent() {
- let (p, _c) = pipes::stream::<int>();
+ let (p, _c) = comm::stream::<int>();
task::spawn(|| child() );
let x = p.recv();
}
// This task is not linked to the failure chain, but since the other
// tasks are going to fail the kernel, this one will fail too
fn sleeper() {
- let (p, _c) = pipes::stream::<int>();
+ let (p, _c) = comm::stream::<int>();
let x = p.recv();
}
fn main() {
task::spawn(|| goodfail() );
- let (po, _c) = pipes::stream();
+ let (po, _c) = comm::stream();
// We shouldn't be able to get past this recv since there's no
// message available
let i: int = po.recv();
// course preferable, as the value itself is
// irrelevant).
-use core::pipes::*;
+use core::comm::*;
fn foo(&&x: ()) -> Port<()> {
let (p, c) = stream::<()>();
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
pub fn main() {
let (p, ch) = stream();
use std::oldmap;
use std::oldmap::HashMap;
-use core::pipes::*;
+use core::comm::*;
pub fn map(filename: ~str, emit: map_reduce::putter) { emit(filename, ~"1"); }
mod map_reduce {
use std::oldmap;
use std::oldmap::HashMap;
- use core::pipes::*;
+ use core::comm::*;
pub type putter = fn@(~str, ~str);
// xfail-fast
pub fn main() {
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
do task::try || {
- let (p2,c2) = pipes::stream();
+ let (p2,c2) = comm::stream();
do task::spawn || {
p2.recv();
error!("sibling fails");
fail!();
}
- let (p3,c3) = pipes::stream();
+ let (p3,c3) = comm::stream();
c.send(c3);
c2.send(());
error!("child blocks");
// xfail-fast
-use pipes::{Select2, Selectable};
+use comm::{Select2, Selectable};
pub fn main() {
- let (p,c) = pipes::stream();
+ let (p,c) = comm::stream();
do task::try || {
- let (p2,c2) = pipes::stream();
+ let (p2,c2) = comm::stream();
do task::spawn || {
p2.recv();
error!("sibling fails");
fail!();
}
- let (p3,c3) = pipes::stream();
+ let (p3,c3) = comm::stream();
c.send(c3);
c2.send(());
error!("child blocks");
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
(p, p3).select();
c.send(());
};
extern mod std;
-use pipes::Chan;
+use comm::Chan;
type RingBuffer = ~[float];
type SamplesFn = fn~ (samples: &RingBuffer);
-use core::pipes::*;
+use core::comm::*;
fn producer(c: &Chan<~[u8]>) {
c.send(
//
// http://theincredibleholk.wordpress.com/2012/07/06/rusty-pipes/
-use pipes::try_recv;
+use core::pipes;
+use core::pipes::try_recv;
pub type username = ~str;
pub type password = ~str;
use std::timer::sleep;
use std::uv;
-use pipes::{try_recv, recv};
+use core::pipes;
+use core::pipes::{try_recv, recv};
proto! oneshot (
waiting:send {
extern mod std;
use std::timer::sleep;
use std::uv;
+use core::pipes;
proto! oneshot (
waiting:send {
#[legacy_records];
mod pingpong {
+ use core::pipes;
use core::pipes::*;
use core::ptr;
pub enum ping = server::pong;
pub enum pong = client::ping;
pub mod client {
+ use core::pipes;
use core::pipes::*;
use core::ptr;
let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.pong)));
let message = ::pingpong::ping(s);
- ::pipes::send(pipe, message);
+ send(pipe, message);
c
}
}
::pingpong::packets>;
}
pub mod server {
+ use core::pipes;
use core::pipes::*;
use core::ptr;
let s = SendPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
let c = RecvPacketBuffered(ptr::addr_of(&(b.buffer.data.ping)));
let message = ::pingpong::pong(s);
- ::pipes::send(pipe, message);
+ send(pipe, message);
c
}
}
}
mod test {
- use pipes::recv;
+ use core::pipes::recv;
use pingpong::{ping, pong};
pub fn client(-chan: ::pingpong::client::ping) {
], )*
} => {
if $index == $count {
- match pipes::try_recv($port) {
+ match core::pipes::try_recv($port) {
$(Some($message($($($x,)+)* next)) => {
let $next = next;
$e
-> $next:ident $e:expr),+
} )+
} => ({
- let index = pipes::selecti([$(($port).header()),+]);
+ let index = core::comm::selecti([$(($port).header()),+]);
select_if!(index, 0, $( $port => [
$($message$(($($x),+))dont_type_this* -> $next $e),+
], )+)
use std::timer::sleep;
use std::uv;
-use pipes::{recv, select};
+use core::pipes;
+use core::pipes::{recv, select};
proto! oneshot (
waiting:send {
extern mod std;
use std::timer::sleep;
use std::uv;
-use pipes::recv;
+use core::pipes;
+use core::pipes::recv;
proto! oneshot (
waiting:send {
// Tests of the runtime's scheduler interface
-use core::pipes::*;
+use core::comm::*;
type sched_id = int;
type task_id = *libc::c_void;
fn iloop() {
task::spawn(|| die() );
- let (p, c) = core::pipes::stream::<()>();
+ let (p, c) = comm::stream::<()>();
loop {
// Sending and receiving here because these actions yield,
// at which point our child can kill us.
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
struct test {
f: int,
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
// tests that ctrl's type gets inferred properly
type command<K, V> = {key: K, val: V};
}
pub fn main() {
- let (_po, ch) = pipes::stream();
+ let (_po, ch) = comm::stream();
ch.send(foo(42, 'c'));
}
Arnold.
*/
-use core::pipes::*;
+use core::comm::*;
type ctx = Chan<int>;
extern mod std;
-use pipes::Chan;
-use pipes::Port;
+use comm::Chan;
+use comm::Port;
pub fn main() { test05(); }
}
fn test05() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
task::spawn(|| test05_start(ch) );
let mut value = po.recv();
log(error, value);
extern mod std;
-fn start(c: pipes::Chan<pipes::Chan<~str>>) {
- let (p, ch) = pipes::stream();
+fn start(c: comm::Chan<comm::Chan<~str>>) {
+ let (p, ch) = comm::stream();
c.send(ch);
let mut a;
}
pub fn main() {
- let (p, ch) = pipes::stream();
+ let (p, ch) = comm::stream();
let child = task::spawn(|| start(ch) );
let c = p.recv();
extern mod std;
-fn start(c: pipes::Chan<pipes::Chan<int>>) {
- let (p, ch) = pipes::stream();
+fn start(c: comm::Chan<comm::Chan<int>>) {
+ let (p, ch) = comm::stream();
c.send(ch);
}
pub fn main() {
- let (p, ch) = pipes::stream();
+ let (p, ch) = comm::stream();
let child = task::spawn(|| start(ch) );
let c = p.recv();
}
#[legacy_modes];
extern mod std;
-use pipes::send;
-fn start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
+fn start(c: comm::Chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}
pub fn main() {
debug!("Check that we don't deadlock.");
- let (p, ch) = pipes::stream();
+ let (p, ch) = comm::stream();
task::try(|| start(ch, 0, 10) );
debug!("Joined task");
}
#[legacy_modes];
pub fn main() {
- let po = pipes::PortSet();
+ let po = comm::PortSet();
// Spawn 10 tasks each sending us back one int.
let mut i = 10;
while (i > 0) {
log(debug, i);
- let (p, ch) = pipes::stream();
+ let (p, ch) = comm::stream();
po.add(p);
task::spawn({let i = i; || child(i, ch)});
i = i - 1;
debug!("main thread exiting");
}
-fn child(x: int, ch: pipes::Chan<int>) {
+fn child(x: int, ch: comm::Chan<int>) {
log(debug, x);
ch.send(x);
}
extern mod std;
-fn start(c: pipes::Chan<int>, i0: int) {
+fn start(c: comm::Chan<int>, i0: int) {
let mut i = i0;
while i > 0 {
c.send(0);
// is likely to terminate before the child completes, so from
// the child's point of view the receiver may die. We should
// drop messages on the floor in this case, and not crash!
- let (p, ch) = pipes::stream();
+ let (p, ch) = comm::stream();
task::spawn(|| start(ch, 10));
p.recv();
}
// except according to those terms.
-use pipes::send;
-use pipes::Port;
-use pipes::recv;
-use pipes::Chan;
-
// Tests of ports and channels on various types
fn test_rec() {
struct R {val0: int, val1: u8, val2: char}
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
let r0: R = R {val0: 0, val1: 1u8, val2: '2'};
ch.send(r0);
let mut r1: R;
}
fn test_vec() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
let v0: ~[int] = ~[0, 1, 2];
ch.send(v0);
let v1 = po.recv();
}
fn test_str() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
let s0 = ~"test";
ch.send(s0);
let s1 = po.recv();
}
fn test_tag() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
ch.send(tag1);
ch.send(tag2(10));
ch.send(tag3(10, 11u8, 'A'));
}
fn test_chan() {
- let (po, ch) = pipes::stream();
- let (po0, ch0) = pipes::stream();
+ let (po, ch) = comm::stream();
+ let (po0, ch0) = comm::stream();
ch.send(ch0);
let ch1 = po.recv();
// Does the transmitted channel still work?
#[legacy_modes];
extern mod std;
-use pipes::Chan;
-use pipes::send;
-use pipes::recv;
+use core::comm::Chan;
pub fn main() { debug!("===== WITHOUT THREADS ====="); test00(); }
debug!("Creating tasks");
- let po = pipes::PortSet();
+ let po = comm::PortSet();
let mut i: int = 0;
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use pipes::send;
-
pub fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
c.send(1);
c.send(2);
c.send(3);
fn test00() {
let r: int = 0;
let mut sum: int = 0;
- let (p, c) = pipes::stream();
+ let (p, c) = comm::stream();
let number_of_messages: int = 1000;
let mut i: int = 0;
while i < number_of_messages { c.send(i + 0); i += 1; }
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use pipes::send;
-use pipes::Chan;
-use pipes::recv;
+use core::comm::Chan;
pub fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let p = pipes::PortSet();
+ let p = comm::PortSet();
let c0 = p.chan();
let c1 = p.chan();
let c2 = p.chan();
pub fn main() { test00(); }
-fn test00_start(c: pipes::Chan<int>, start: int, number_of_messages: int) {
+fn test00_start(c: comm::Chan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let p = pipes::PortSet();
+ let p = comm::PortSet();
let number_of_messages: int = 10;
let c = p.chan();
pub fn main() { test00(); }
-fn test00_start(c: pipes::Chan<int>, number_of_messages: int) {
+fn test00_start(c: comm::Chan<int>, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(i + 0); i += 1; }
}
fn test00() {
let r: int = 0;
let mut sum: int = 0;
- let p = pipes::PortSet();
+ let p = comm::PortSet();
let number_of_messages: int = 10;
let ch = p.chan();
// any size, but rustc currently can because they do have size. Whether
// or not this is desirable I don't know, but here's a regression test.
pub fn main() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
ch.send(());
let n: () = po.recv();
assert (n == ());
// A port of task-killjoin to use a class with a dtor to manage
// the join.
-use core::pipes::*;
+use core::comm::*;
struct notify {
ch: Chan<bool>, v: @mut bool,
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
pub fn main() {
let (p, ch) = stream::<uint>();
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use pipes::{Port, Chan};
-
/*
This is about the simplest program that can successfully send a
message.
*/
pub fn main() {
- let (po, ch) = pipes::stream();
+ let (po, ch) = comm::stream();
ch.send(42);
let r = po.recv();
log(error, r);
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
fn child(c: &SharedChan<~uint>, i: uint) {
c.send(~i);
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use core::pipes::*;
+use core::comm::*;
pub fn main() {
let (p, c) = stream();
// xfail-win32
extern mod std;
-use core::pipes::*;
+use core::comm::*;
struct complainer {
c: SharedChan<bool>,