]> git.lizzy.rs Git - rust.git/commitdiff
Enhance timers to create ports
authorAlex Crichton <alex@alexcrichton.com>
Sat, 26 Oct 2013 04:55:10 +0000 (21:55 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Sat, 26 Oct 2013 05:12:55 +0000 (22:12 -0700)
In addition to being able to sleep the current task, timers should be able to
create ports which get notified after a period of time.

Closes #10014

src/libstd/rt/io/timer.rs
src/libstd/rt/rtio.rs
src/libstd/rt/uv/uvio.rs

index fab0062ee0054ce2498ca30e8c790717a09116ed..500cd91b3db96ce11c52f6330ed7c68cbbb09002 100644 (file)
@@ -8,6 +8,37 @@
 // option. This file may not be copied, modified, or distributed
 // except according to those terms.
 
+/*!
+
+Synchronous Timers
+
+This module exposes the functionality to create timers, block the current task,
+and create ports which will receive notifications after a period of time.
+
+# Example
+
+```rust
+
+use std::rt::io::Timer;
+
+let mut timer = Timer::new().unwrap();
+timer.sleep(10); // block the task for awhile
+
+let timeout = timer.oneshot(10);
+// do some work
+timeout.recv(); // wait for the timeout to expire
+
+let periodic = timer.periodic(10);
+loop {
+    periodic.recv();
+    // this loop is only executed once every 10ms
+}
+
+```
+
+*/
+
+use comm::{Port, PortOne};
 use option::{Option, Some, None};
 use result::{Ok, Err};
 use rt::io::io_error;
@@ -25,9 +56,9 @@ pub fn sleep(msecs: u64) {
 }
 
 impl Timer {
-
     /// Creates a new timer which can be used to put the current task to sleep
-    /// for a number of milliseconds.
+    /// for a number of milliseconds, or to possibly create channels which will
+    /// get notified after an amount of time has passed.
     pub fn new() -> Option<Timer> {
         do with_local_io |io| {
             match io.timer_init() {
@@ -42,20 +73,116 @@ pub fn new() -> Option<Timer> {
         }
     }
 
+    /// Blocks the current task for `msecs` milliseconds.
+    ///
+    /// Note that this function will cause any other ports for this timer to be
+    /// invalidated (the other end will be closed).
     pub fn sleep(&mut self, msecs: u64) {
         self.obj.sleep(msecs);
     }
+
+    /// Creates a oneshot port which will have a notification sent when `msecs`
+    /// milliseconds has elapsed. This does *not* block the current task, but
+    /// instead returns immediately.
+    ///
+    /// Note that this invalidates any previous port which has been created by
+    /// this timer, and that the returned port will be invalidated once the
+    /// timer is destroyed (when it falls out of scope).
+    pub fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+        self.obj.oneshot(msecs)
+    }
+
+    /// Creates a port which will have a continuous stream of notifications
+    /// being sent every `msecs` milliseconds. This does *not* block the
+    /// current task, but instead returns immediately. The first notification
+    /// will not be received immediately, but rather after `msec` milliseconds
+    /// have passed.
+    ///
+    /// Note that this invalidates any previous port which has been created by
+    /// this timer, and that the returned port will be invalidated once the
+    /// timer is destroyed (when it falls out of scope).
+    pub fn periodic(&mut self, msecs: u64) -> Port<()> {
+        self.obj.period(msecs)
+    }
 }
 
 #[cfg(test)]
 mod test {
     use super::*;
     use rt::test::*;
+    use cell::Cell;
+    use task;
+
     #[test]
     fn test_io_timer_sleep_simple() {
         do run_in_mt_newsched_task {
-            let timer = Timer::new();
-            do timer.map |mut t| { t.sleep(1) };
+            let mut timer = Timer::new().unwrap();
+            timer.sleep(1);
+        }
+    }
+
+    #[test]
+    fn test_io_timer_sleep_oneshot() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            timer.oneshot(1).recv();
+        }
+    }
+
+    #[test]
+    fn test_io_timer_sleep_oneshot_forget() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            timer.oneshot(100000000000);
+        }
+    }
+
+    #[test]
+    fn oneshot_twice() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            let port1 = timer.oneshot(100000000000);
+            let port = timer.oneshot(1);
+            port.recv();
+            let port1 = Cell::new(port1);
+            let ret = do task::try {
+                port1.take().recv();
+            };
+            assert!(ret.is_err());
+        }
+    }
+
+    #[test]
+    fn test_io_timer_oneshot_then_sleep() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            let port = timer.oneshot(100000000000);
+            timer.sleep(1); // this should invalidate the port
+
+            let port = Cell::new(port);
+            let ret = do task::try {
+                port.take().recv();
+            };
+            assert!(ret.is_err());
+        }
+    }
+
+    #[test]
+    fn test_io_timer_sleep_periodic() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            let port = timer.periodic(1);
+            port.recv();
+            port.recv();
+            port.recv();
+        }
+    }
+
+    #[test]
+    fn test_io_timer_sleep_periodic_forget() {
+        do run_in_mt_newsched_task {
+            let mut timer = Timer::new().unwrap();
+            timer.periodic(100000000000);
         }
     }
 
index 29f728a5e0cb8051553cdebff094f79675c0ce37..366388063d4ef02c75268b4cc7851e2b1f41ca2b 100644 (file)
@@ -11,7 +11,7 @@
 use libc;
 use option::*;
 use result::*;
-use comm::SharedChan;
+use comm::{SharedChan, PortOne, Port};
 use libc::c_int;
 use c_str::CString;
 
@@ -162,6 +162,8 @@ pub trait RtioUdpSocket : RtioSocket {
 
 pub trait RtioTimer {
     fn sleep(&mut self, msecs: u64);
+    fn oneshot(&mut self, msecs: u64) -> PortOne<()>;
+    fn period(&mut self, msecs: u64) -> Port<()>;
 }
 
 pub trait RtioFileStream {
index 6709f0bff446a44569226bd43a4ccd7d4963d7a1..2f8dacc4339ffd9e40e0266d2a6c480a0442c03e 100644 (file)
@@ -13,7 +13,7 @@
 use cast;
 use cell::Cell;
 use clone::Clone;
-use comm::{SendDeferred, SharedChan};
+use comm::{SendDeferred, SharedChan, Port, PortOne, GenericChan};
 use libc::{c_int, c_uint, c_void, pid_t};
 use ops::Drop;
 use option::*;
@@ -1468,6 +1468,41 @@ fn sleep(&mut self, msecs: u64) {
             self_.watcher.stop();
         }
     }
+
+    fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
+        use comm::oneshot;
+
+        let (port, chan) = oneshot();
+        let chan = Cell::new(chan);
+        do self.home_for_io |self_| {
+            let chan = Cell::new(chan.take());
+            do self_.watcher.start(msecs, 0) |_, status| {
+                assert!(status.is_none());
+                assert!(!chan.is_empty());
+                chan.take().send_deferred(());
+            }
+        }
+
+        return port;
+    }
+
+    fn period(&mut self, msecs: u64) -> Port<()> {
+        use comm::stream;
+
+        let (port, chan) = stream();
+        let chan = Cell::new(chan);
+        do self.home_for_io |self_| {
+            let chan = Cell::new(chan.take());
+            do self_.watcher.start(msecs, msecs) |_, status| {
+                assert!(status.is_none());
+                do chan.with_ref |chan| {
+                    chan.send_deferred(());
+                }
+            }
+        }
+
+        return port;
+    }
 }
 
 pub struct UvFileStream {