let (p, c) = comm::stream();
do task::spawn() || {
- let p = comm::PortSet::new();
- c.send(p.chan());
-
let arc_v : Arc<~[int]> = p.recv();
let v = (*arc_v.get()).clone();
assert_eq!(v[3], 4);
};
- let c = p.recv();
c.send(arc_v.clone());
assert_eq!(arc_v.get()[2], 3);
#[allow(missing_doc)];
-use cast::{transmute, transmute_mut};
-use container::Container;
+use cast::transmute;
use either::{Either, Left, Right};
use kinds::Send;
-use option::{Option, Some, None};
-use uint;
-use vec::OwnedVector;
-use util::replace;
+use option::{Option, Some};
use unstable::sync::Exclusive;
use rtcomm = rt::comm;
use rt;
}
}
-/// Treat many ports as one.
-#[unsafe_mut_field(ports)]
-pub struct PortSet<T> {
- ports: ~[pipesy::Port<T>],
-}
-
-impl<T: Send> PortSet<T> {
- pub fn new() -> PortSet<T> {
- PortSet {
- ports: ~[]
- }
- }
-
- pub fn add(&self, port: Port<T>) {
- let Port { inner } = port;
- let port = match inner {
- Left(p) => p,
- Right(_) => fail!("PortSet not implemented")
- };
- unsafe {
- let self_ports = transmute_mut(&self.ports);
- self_ports.push(port)
- }
- }
-
- pub fn chan(&self) -> Chan<T> {
- let (po, ch) = stream();
- self.add(po);
- ch
- }
-}
-
-impl<T:Send> GenericPort<T> for PortSet<T> {
- fn try_recv(&self) -> Option<T> {
- unsafe {
- let self_ports = transmute_mut(&self.ports);
- let mut result = None;
- // we have to swap the ports array so we aren't borrowing
- // aliasable mutable memory.
- let mut ports = replace(self_ports, ~[]);
- while result.is_none() && ports.len() > 0 {
- let i = wait_many(ports);
- match ports[i].try_recv() {
- Some(m) => {
- result = Some(m);
- }
- None => {
- // Remove this port.
- let _ = ports.swap_remove(i);
- }
- }
- }
- *self_ports = ports;
- result
- }
- }
- fn recv(&self) -> T {
- self.try_recv().expect("port_set: endpoints closed")
- }
-}
-
-impl<T: Send> Peekable<T> for PortSet<T> {
- fn peek(&self) -> bool {
- // It'd be nice to use self.port.each, but that version isn't
- // pure.
- for uint::range(0, self.ports.len()) |i| {
- let port: &pipesy::Port<T> = &self.ports[i];
- if port.peek() {
- return true;
- }
- }
- false
- }
-}
-
/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
extern mod extra;
-use std::comm::{PortSet, Chan, stream};
+use std::comm::{SharedChan, Chan, stream};
use std::io;
use std::os;
use std::task;
stop
}
-fn server(requests: &PortSet<request>, responses: &Chan<uint>) {
+fn server(requests: &Port<request>, responses: &Chan<uint>) {
let mut count: uint = 0;
let mut done = false;
while !done {
fn run(args: &[~str]) {
let (from_child, to_parent) = stream();
- let (from_parent_, to_child) = stream();
- let from_parent = PortSet::new();
- from_parent.add(from_parent_);
+ let (from_parent, to_child) = stream();
+ let to_child = SharedChan::new(to_child);
let size = uint::from_str(args[1]).get();
let workers = uint::from_str(args[2]).get();
let start = extra::time::precise_time_s();
let mut worker_results = ~[];
for uint::range(0, workers) |_i| {
- let (from_parent_, to_child) = stream();
- from_parent.add(from_parent_);
+ let to_child = to_child.clone();
let mut builder = task::task();
builder.future_result(|r| worker_results.push(r));
do builder.spawn {
use std::uint;
fn fib(n: int) -> int {
- fn pfib(c: &Chan<int>, n: int) {
+ fn pfib(c: &SharedChan<int>, n: int) {
if n == 0 {
c.send(0);
} else if n <= 2 {
c.send(1);
} else {
- let p = PortSet::new();
- let ch = p.chan();
+ let (pp, cc) = stream();
+ let cc = SharedChan::new(cc);
+ let ch = cc.clone();
task::spawn(|| pfib(&ch, n - 1) );
- let ch = p.chan();
+ let ch = cc.clone();
task::spawn(|| pfib(&ch, n - 2) );
- c.send(p.recv() + p.recv());
+ c.send(pp.recv() + pp.recv());
}
}
let (p, ch) = stream();
+ let ch = SharedChan::new(ch);
let _t = task::spawn(|| pfib(&ch, n) );
p.recv()
}
use std::task;
pub fn main() {
- let po = comm::PortSet::new();
+ let (po, ch) = comm::stream();
+ let ch = comm::SharedChan::new(ch);
// Spawn 10 tasks each sending us back one int.
let mut i = 10;
while (i > 0) {
info!(i);
- let (p, ch) = comm::stream();
- po.add(p);
+ let ch = ch.clone();
task::spawn({let i = i; || child(i, &ch)});
i = i - 1;
}
info!("main thread exiting");
}
-fn child(x: int, ch: &comm::Chan<int>) {
+fn child(x: int, ch: &comm::SharedChan<int>) {
info!(x);
ch.send(x);
}
extern mod extra;
-use std::comm::Chan;
+use std::comm::SharedChan;
use std::comm;
use std::task;
pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); }
-fn test00_start(ch: &Chan<int>, message: int, count: int) {
+fn test00_start(ch: &SharedChan<int>, message: int, count: int) {
info!("Starting test00_start");
let mut i: int = 0;
while i < count {
info!("Creating tasks");
- let po = comm::PortSet::new();
+ let (po, ch) = comm::stream();
+ let ch = comm::SharedChan::new(ch);
let mut i: int = 0;
// Create and spawn tasks...
let mut results = ~[];
while i < number_of_tasks {
- let ch = po.chan();
+ let ch = ch.clone();
let mut builder = task::task();
builder.future_result(|r| results.push(r));
builder.spawn({
// option. This file may not be copied, modified, or distributed
// except according to those terms.
-use std::comm::Chan;
+use std::comm::SharedChan;
use std::comm;
pub fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let p = comm::PortSet::new();
- let c0 = p.chan();
- let c1 = p.chan();
- let c2 = p.chan();
- let c3 = p.chan();
+ let (p, ch) = comm::stream();
+ let ch = SharedChan::new(ch);
+ let c0 = ch.clone();
+ let c1 = ch.clone();
+ let c2 = ch.clone();
+ let c3 = ch.clone();
let number_of_messages: int = 1000;
let mut i: int = 0;
while i < number_of_messages {
pub fn main() { test00(); }
-fn test00_start(c: &comm::Chan<int>, start: int, number_of_messages: int) {
+fn test00_start(c: &comm::SharedChan<int>, start: int, number_of_messages: int) {
let mut i: int = 0;
while i < number_of_messages { c.send(start + i); i += 1; }
}
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
- let p = comm::PortSet::new();
+ let (p, ch) = comm::stream();
+ let ch = comm::SharedChan::new(ch);
let number_of_messages: int = 10;
- let c = p.chan();
+ let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 0, number_of_messages);
}
- let c = p.chan();
+ let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 1, number_of_messages);
}
- let c = p.chan();
+ let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 2, number_of_messages);
}
- let c = p.chan();
+ let c = ch.clone();
do task::spawn || {
test00_start(&c, number_of_messages * 3, number_of_messages);
}
fn test00() {
let r: int = 0;
let mut sum: int = 0;
- let p = comm::PortSet::new();
+ let (p, ch) = comm::stream();
let number_of_messages: int = 10;
- let ch = p.chan();
let mut result = None;
let mut builder = task::task();