]> git.lizzy.rs Git - rust.git/commitdiff
handle all the reads on the "main" watcher thread
authorAleksey Kladov <aleksey.kladov@gmail.com>
Sat, 26 Jan 2019 14:01:58 +0000 (17:01 +0300)
committerAleksey Kladov <aleksey.kladov@gmail.com>
Sat, 26 Jan 2019 14:01:58 +0000 (17:01 +0300)
crates/ra_vfs/src/io.rs

index 279fa5da8d2e9c94a18ff31706d131941bddd339..98b107b3591f754c9cc6bbc20b2d784f593a75f9 100644 (file)
@@ -5,7 +5,7 @@
     sync::{mpsc, Arc},
     time::Duration,
 };
-use crossbeam_channel::{Receiver, Sender};
+use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
 use relative_path::RelativePathBuf;
 use thread_worker::WorkerHandle;
 use walkdir::WalkDir;
@@ -61,9 +61,25 @@ pub(crate) struct Worker {
 
 impl Worker {
     pub(crate) fn start(roots: Arc<Roots>) -> Worker {
-        let (worker, worker_handle) =
-            thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
+        // This is a pretty elaborate setup of threads & channels! It is
+        // explained by the following concerns:
+
+        //    * we need to burn a thread translating from notify's mpsc to
+        //      crossbeam_channel.
+        //    * we want to read all files from a single thread, to gurantee that
+        //      we always get fresher versions and never go back in time.
+        //    * we want to tear down everything neatly during shutdown.
+        let (worker, worker_handle) = thread_worker::spawn(
+            "vfs",
+            128,
+            // This are the channels we use to communicate with outside world.
+            // If `input_receiver` is closed we need to tear ourselves down.
+            // `output_sender` should not be closed unless the parent died.
+            move |input_receiver, output_sender| {
+                // These are `std` channels notify will send events to
                 let (notify_sender, notify_receiver) = mpsc::channel();
+                // These are the corresponding crossbeam channels
+                let (watcher_sender, watcher_receiver) = unbounded();
                 let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
                     .map_err(|e| log::error!("failed to spawn notify {}", e))
                     .ok();
@@ -72,18 +88,30 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
                     watcher: Arc::new(Mutex::new(watcher)),
                     sender: output_sender,
                 };
-                let thread = thread::spawn({
-                    let ctx = ctx.clone();
-                    move || {
-                        let _ = notify_receiver
-                            .into_iter()
-                            // forward relevant events only
-                            .try_for_each(|change| ctx.handle_debounced_event(change));
-                    }
-                });
-                let res1 = input_receiver.into_iter().try_for_each(|t| match t {
-                    Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
+                let thread = thread::spawn(move || {
+                    let _ = notify_receiver
+                        .into_iter()
+                        // forward relevant events only
+                        .for_each(|event| convert_notify_event(event, &watcher_sender));
                 });
+
+                loop {
+                    select! {
+                        // Received request from the caller. If this channel is
+                        // closed, we should shutdown everything.
+                        recv(input_receiver) -> t => match t {
+                            Err(RecvError) => break,
+                            Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)),
+                        },
+                        // Watcher send us changes. If **this** channel is
+                        // closed, the watcher has died, which indicates a bug
+                        // -- escalate!
+                        recv(watcher_receiver) -> event => match event {
+                            Err(RecvError) => panic!("watcher is dead"),
+                            Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(),
+                        },
+                    }
+                }
                 drop(ctx.watcher.lock().take());
                 drop(ctx);
                 let res2 = thread.join();
@@ -91,9 +119,9 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
                     Ok(()) => log::info!("... Watcher terminated with ok"),
                     Err(_) => log::error!("... Watcher terminated with err"),
                 }
-                res1.unwrap();
                 res2.unwrap();
-            });
+            },
+        );
         Worker {
             worker,
             worker_handle,
@@ -114,7 +142,7 @@ pub(crate) fn shutdown(self) -> thread::Result<()> {
     }
 }
 
-fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Result<()> {
+fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
     let mut guard = woker.watcher.lock();
     log::debug!("loading {} ...", config.root.as_path().display());
     let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
@@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Res
         .collect();
     woker
         .sender
-        .send(TaskResult::BulkLoadRoot { root, files })?;
+        .send(TaskResult::BulkLoadRoot { root, files })
+        .unwrap();
     log::debug!("... loaded {}", config.root.as_path().display());
-    Ok(())
 }
 
 #[derive(Clone)]
@@ -139,38 +167,37 @@ struct WatcherCtx {
     sender: Sender<TaskResult>,
 }
 
-impl WatcherCtx {
-    fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> {
-        match ev {
-            DebouncedEvent::NoticeWrite(_)
-            | DebouncedEvent::NoticeRemove(_)
-            | DebouncedEvent::Chmod(_) => {
-                // ignore
-            }
-            DebouncedEvent::Rescan => {
-                // TODO rescan all roots
-            }
-            DebouncedEvent::Create(path) => {
-                self.handle_change(path, ChangeKind::Create)?;
-            }
-            DebouncedEvent::Write(path) => {
-                self.handle_change(path, ChangeKind::Write)?;
-            }
-            DebouncedEvent::Remove(path) => {
-                self.handle_change(path, ChangeKind::Remove)?;
-            }
-            DebouncedEvent::Rename(src, dst) => {
-                self.handle_change(src, ChangeKind::Remove)?;
-                self.handle_change(dst, ChangeKind::Create)?;
-            }
-            DebouncedEvent::Error(err, path) => {
-                // TODO should we reload the file contents?
-                log::warn!("watcher error \"{}\", {:?}", err, path);
-            }
+fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
+    match event {
+        DebouncedEvent::NoticeWrite(_)
+        | DebouncedEvent::NoticeRemove(_)
+        | DebouncedEvent::Chmod(_) => {
+            // ignore
+        }
+        DebouncedEvent::Rescan => {
+            // TODO rescan all roots
+        }
+        DebouncedEvent::Create(path) => {
+            sender.send((path, ChangeKind::Create)).unwrap();
+        }
+        DebouncedEvent::Write(path) => {
+            sender.send((path, ChangeKind::Write)).unwrap();
+        }
+        DebouncedEvent::Remove(path) => {
+            sender.send((path, ChangeKind::Remove)).unwrap();
+        }
+        DebouncedEvent::Rename(src, dst) => {
+            sender.send((src, ChangeKind::Remove)).unwrap();
+            sender.send((dst, ChangeKind::Create)).unwrap();
+        }
+        DebouncedEvent::Error(err, path) => {
+            // TODO should we reload the file contents?
+            log::warn!("watcher error \"{}\", {:?}", err, path);
         }
-        Ok(())
     }
+}
 
+impl WatcherCtx {
     fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> {
         let (root, rel_path) = match self.roots.find(&path) {
             None => return Ok(()),