]> git.lizzy.rs Git - rust.git/commitdiff
move watcher to io module
authorBernardo <berublan@gmail.com>
Mon, 21 Jan 2019 17:59:54 +0000 (18:59 +0100)
committerAleksey Kladov <aleksey.kladov@gmail.com>
Sat, 26 Jan 2019 08:46:27 +0000 (11:46 +0300)
crates/ra_vfs/src/io.rs [deleted file]
crates/ra_vfs/src/io/mod.rs [new file with mode: 0644]
crates/ra_vfs/src/io/watcher.rs [new file with mode: 0644]
crates/ra_vfs/src/lib.rs
crates/ra_vfs/src/watcher.rs [deleted file]

diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
deleted file mode 100644 (file)
index a74222c..0000000
+++ /dev/null
@@ -1,210 +0,0 @@
-use std::{
-    fmt, fs,
-    path::{Path, PathBuf},
-    sync::Arc,
-    thread,
-};
-
-use crossbeam_channel::{Receiver, Sender};
-use parking_lot::Mutex;
-use relative_path::RelativePathBuf;
-use thread_worker::WorkerHandle;
-use walkdir::{DirEntry, WalkDir};
-
-use crate::{
-    watcher::{Watcher, WatcherChange},
-    VfsRoot,
-};
-
-pub(crate) enum Task {
-    AddRoot {
-        root: VfsRoot,
-        path: PathBuf,
-        filter: Box<Fn(&DirEntry) -> bool + Send>,
-    },
-    HandleChange(WatcherChange),
-    LoadChange(WatcherChange),
-    Watch {
-        dir: PathBuf,
-        filter: Box<Fn(&DirEntry) -> bool + Send>,
-    },
-}
-
-#[derive(Debug)]
-pub struct AddRootResult {
-    pub(crate) root: VfsRoot,
-    pub(crate) files: Vec<(RelativePathBuf, String)>,
-}
-
-#[derive(Debug)]
-pub enum WatcherChangeData {
-    Create { path: PathBuf, text: String },
-    Write { path: PathBuf, text: String },
-    Remove { path: PathBuf },
-}
-
-pub enum TaskResult {
-    AddRoot(AddRootResult),
-    HandleChange(WatcherChange),
-    LoadChange(WatcherChangeData),
-    NoOp,
-}
-
-impl fmt::Debug for TaskResult {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        f.write_str("TaskResult { ... }")
-    }
-}
-
-pub(crate) struct Worker {
-    worker: thread_worker::Worker<Task, TaskResult>,
-    worker_handle: WorkerHandle,
-    watcher: Arc<Mutex<Option<Watcher>>>,
-}
-
-impl Worker {
-    pub(crate) fn start() -> Worker {
-        let watcher = Arc::new(Mutex::new(None));
-        let watcher_clone = watcher.clone();
-        let (worker, worker_handle) =
-            thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
-                input_receiver
-                    .into_iter()
-                    .map(|t| handle_task(t, &watcher_clone))
-                    .try_for_each(|it| output_sender.send(it))
-                    .unwrap()
-            });
-        match Watcher::start(worker.inp.clone()) {
-            Ok(w) => {
-                watcher.lock().replace(w);
-            }
-            Err(e) => log::error!("could not start watcher: {}", e),
-        };
-        Worker {
-            worker,
-            worker_handle,
-            watcher,
-        }
-    }
-
-    pub(crate) fn sender(&self) -> &Sender<Task> {
-        &self.worker.inp
-    }
-
-    pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
-        &self.worker.out
-    }
-
-    pub(crate) fn shutdown(self) -> thread::Result<()> {
-        if let Some(watcher) = self.watcher.lock().take() {
-            let _ = watcher.shutdown();
-        }
-        let _ = self.worker.shutdown();
-        self.worker_handle.shutdown()
-    }
-}
-
-fn watch(
-    watcher: &Arc<Mutex<Option<Watcher>>>,
-    dir: &Path,
-    filter_entry: impl Fn(&DirEntry) -> bool,
-    emit_for_existing: bool,
-) {
-    let mut watcher = watcher.lock();
-    let watcher = match *watcher {
-        Some(ref mut w) => w,
-        None => {
-            // watcher dropped or couldn't start
-            return;
-        }
-    };
-    watcher.watch_recursive(dir, filter_entry, emit_for_existing)
-}
-
-fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
-    match task {
-        Task::AddRoot { root, path, filter } => {
-            watch(watcher, &path, &*filter, false);
-            log::debug!("loading {} ...", path.as_path().display());
-            let files = load_root(path.as_path(), &*filter);
-            log::debug!("... loaded {}", path.as_path().display());
-            TaskResult::AddRoot(AddRootResult { root, files })
-        }
-        Task::HandleChange(change) => {
-            // forward as is because Vfs has to decide if we should load it
-            TaskResult::HandleChange(change)
-        }
-        Task::LoadChange(change) => {
-            log::debug!("loading {:?} ...", change);
-            match load_change(change) {
-                Some(data) => TaskResult::LoadChange(data),
-                None => TaskResult::NoOp,
-            }
-        }
-        Task::Watch { dir, filter } => {
-            watch(watcher, &dir, &*filter, true);
-            TaskResult::NoOp
-        }
-    }
-}
-
-fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
-    let mut res = Vec::new();
-    for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
-        let entry = match entry {
-            Ok(entry) => entry,
-            Err(e) => {
-                log::warn!("watcher error: {}", e);
-                continue;
-            }
-        };
-        if !entry.file_type().is_file() {
-            continue;
-        }
-        let path = entry.path();
-        let text = match fs::read_to_string(path) {
-            Ok(text) => text,
-            Err(e) => {
-                log::warn!("watcher error: {}", e);
-                continue;
-            }
-        };
-        let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
-        res.push((path.to_owned(), text))
-    }
-    res
-}
-
-fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
-    let data = match change {
-        WatcherChange::Create(path) => {
-            if path.is_dir() {
-                return None;
-            }
-            let text = match fs::read_to_string(&path) {
-                Ok(text) => text,
-                Err(e) => {
-                    log::warn!("watcher error \"{}\": {}", path.display(), e);
-                    return None;
-                }
-            };
-            WatcherChangeData::Create { path, text }
-        }
-        WatcherChange::Write(path) => {
-            let text = match fs::read_to_string(&path) {
-                Ok(text) => text,
-                Err(e) => {
-                    log::warn!("watcher error \"{}\": {}", path.display(), e);
-                    return None;
-                }
-            };
-            WatcherChangeData::Write { path, text }
-        }
-        WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
-        WatcherChange::Rescan => {
-            // this should be handled by Vfs::handle_task
-            return None;
-        }
-    };
-    Some(data)
-}
diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs
new file mode 100644 (file)
index 0000000..6d5af76
--- /dev/null
@@ -0,0 +1,212 @@
+use std::{
+    fmt, fs,
+    path::{Path, PathBuf},
+    sync::Arc,
+    thread,
+};
+
+use crossbeam_channel::{Receiver, Sender};
+use parking_lot::Mutex;
+use relative_path::RelativePathBuf;
+use thread_worker::WorkerHandle;
+use walkdir::{DirEntry, WalkDir};
+
+mod watcher;
+use watcher::Watcher;
+pub use watcher::WatcherChange;
+
+use crate::VfsRoot;
+
+pub(crate) enum Task {
+    AddRoot {
+        root: VfsRoot,
+        path: PathBuf,
+        filter: Box<Fn(&DirEntry) -> bool + Send>,
+    },
+    /// this variant should only be created by the watcher
+    HandleChange(WatcherChange),
+    LoadChange(WatcherChange),
+    Watch {
+        dir: PathBuf,
+        filter: Box<Fn(&DirEntry) -> bool + Send>,
+    },
+}
+
+#[derive(Debug)]
+pub struct AddRootResult {
+    pub(crate) root: VfsRoot,
+    pub(crate) files: Vec<(RelativePathBuf, String)>,
+}
+
+#[derive(Debug)]
+pub enum WatcherChangeData {
+    Create { path: PathBuf, text: String },
+    Write { path: PathBuf, text: String },
+    Remove { path: PathBuf },
+}
+
+pub enum TaskResult {
+    AddRoot(AddRootResult),
+    HandleChange(WatcherChange),
+    LoadChange(WatcherChangeData),
+    NoOp,
+}
+
+impl fmt::Debug for TaskResult {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        f.write_str("TaskResult { ... }")
+    }
+}
+
+pub(crate) struct Worker {
+    worker: thread_worker::Worker<Task, TaskResult>,
+    worker_handle: WorkerHandle,
+    watcher: Arc<Mutex<Option<Watcher>>>,
+}
+
+impl Worker {
+    pub(crate) fn start() -> Worker {
+        let watcher = Arc::new(Mutex::new(None));
+        let watcher_clone = watcher.clone();
+        let (worker, worker_handle) =
+            thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
+                input_receiver
+                    .into_iter()
+                    .map(|t| handle_task(t, &watcher_clone))
+                    .try_for_each(|it| output_sender.send(it))
+                    .unwrap()
+            });
+        match Watcher::start(worker.inp.clone()) {
+            Ok(w) => {
+                watcher.lock().replace(w);
+            }
+            Err(e) => log::error!("could not start watcher: {}", e),
+        };
+        Worker {
+            worker,
+            worker_handle,
+            watcher,
+        }
+    }
+
+    pub(crate) fn sender(&self) -> &Sender<Task> {
+        &self.worker.inp
+    }
+
+    pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
+        &self.worker.out
+    }
+
+    pub(crate) fn shutdown(self) -> thread::Result<()> {
+        if let Some(watcher) = self.watcher.lock().take() {
+            let _ = watcher.shutdown();
+        }
+        let _ = self.worker.shutdown();
+        self.worker_handle.shutdown()
+    }
+}
+
+fn watch(
+    watcher: &Arc<Mutex<Option<Watcher>>>,
+    dir: &Path,
+    filter_entry: impl Fn(&DirEntry) -> bool,
+    emit_for_existing: bool,
+) {
+    let mut watcher = watcher.lock();
+    let watcher = match *watcher {
+        Some(ref mut w) => w,
+        None => {
+            // watcher dropped or couldn't start
+            return;
+        }
+    };
+    watcher.watch_recursive(dir, filter_entry, emit_for_existing)
+}
+
+fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
+    match task {
+        Task::AddRoot { root, path, filter } => {
+            watch(watcher, &path, &*filter, false);
+            log::debug!("loading {} ...", path.as_path().display());
+            let files = load_root(path.as_path(), &*filter);
+            log::debug!("... loaded {}", path.as_path().display());
+            TaskResult::AddRoot(AddRootResult { root, files })
+        }
+        Task::HandleChange(change) => {
+            // forward as is because Vfs has to decide if we should load it
+            TaskResult::HandleChange(change)
+        }
+        Task::LoadChange(change) => {
+            log::debug!("loading {:?} ...", change);
+            match load_change(change) {
+                Some(data) => TaskResult::LoadChange(data),
+                None => TaskResult::NoOp,
+            }
+        }
+        Task::Watch { dir, filter } => {
+            watch(watcher, &dir, &*filter, true);
+            TaskResult::NoOp
+        }
+    }
+}
+
+fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
+    let mut res = Vec::new();
+    for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
+        let entry = match entry {
+            Ok(entry) => entry,
+            Err(e) => {
+                log::warn!("watcher error: {}", e);
+                continue;
+            }
+        };
+        if !entry.file_type().is_file() {
+            continue;
+        }
+        let path = entry.path();
+        let text = match fs::read_to_string(path) {
+            Ok(text) => text,
+            Err(e) => {
+                log::warn!("watcher error: {}", e);
+                continue;
+            }
+        };
+        let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
+        res.push((path.to_owned(), text))
+    }
+    res
+}
+
+fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
+    let data = match change {
+        WatcherChange::Create(path) => {
+            if path.is_dir() {
+                return None;
+            }
+            let text = match fs::read_to_string(&path) {
+                Ok(text) => text,
+                Err(e) => {
+                    log::warn!("watcher error \"{}\": {}", path.display(), e);
+                    return None;
+                }
+            };
+            WatcherChangeData::Create { path, text }
+        }
+        WatcherChange::Write(path) => {
+            let text = match fs::read_to_string(&path) {
+                Ok(text) => text,
+                Err(e) => {
+                    log::warn!("watcher error \"{}\": {}", path.display(), e);
+                    return None;
+                }
+            };
+            WatcherChangeData::Write { path, text }
+        }
+        WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
+        WatcherChange::Rescan => {
+            // this should be handled by Vfs::handle_task
+            return None;
+        }
+    };
+    Some(data)
+}
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs
new file mode 100644 (file)
index 0000000..e332984
--- /dev/null
@@ -0,0 +1,128 @@
+use crate::io;
+use crossbeam_channel::Sender;
+use drop_bomb::DropBomb;
+use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
+use std::{
+    path::{Path, PathBuf},
+    sync::mpsc,
+    thread,
+    time::Duration,
+};
+use walkdir::{DirEntry, WalkDir};
+
+#[derive(Debug)]
+pub enum WatcherChange {
+    Create(PathBuf),
+    Write(PathBuf),
+    Remove(PathBuf),
+    Rescan,
+}
+
+fn handle_change_event(
+    ev: DebouncedEvent,
+    sender: &Sender<io::Task>,
+) -> Result<(), Box<std::error::Error>> {
+    match ev {
+        DebouncedEvent::NoticeWrite(_)
+        | DebouncedEvent::NoticeRemove(_)
+        | DebouncedEvent::Chmod(_) => {
+            // ignore
+        }
+        DebouncedEvent::Rescan => {
+            sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
+        }
+        DebouncedEvent::Create(path) => {
+            sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
+        }
+        DebouncedEvent::Write(path) => {
+            sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
+        }
+        DebouncedEvent::Remove(path) => {
+            sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
+        }
+        DebouncedEvent::Rename(src, dst) => {
+            sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
+            sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
+        }
+        DebouncedEvent::Error(err, path) => {
+            // TODO should we reload the file contents?
+            log::warn!("watcher error \"{}\", {:?}", err, path);
+        }
+    }
+    Ok(())
+}
+
+const WATCHER_DELAY: Duration = Duration::from_millis(250);
+
+pub(crate) struct Watcher {
+    watcher: RecommendedWatcher,
+    thread: thread::JoinHandle<()>,
+    bomb: DropBomb,
+    sender: Sender<io::Task>,
+}
+
+impl Watcher {
+    pub(crate) fn start(
+        output_sender: Sender<io::Task>,
+    ) -> Result<Watcher, Box<std::error::Error>> {
+        let (input_sender, input_receiver) = mpsc::channel();
+        let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
+        let sender = output_sender.clone();
+        let thread = thread::spawn(move || {
+            input_receiver
+                .into_iter()
+                // forward relevant events only
+                .try_for_each(|change| handle_change_event(change, &output_sender))
+                .unwrap()
+        });
+        Ok(Watcher {
+            watcher,
+            thread,
+            sender,
+            bomb: DropBomb::new(format!("Watcher was not shutdown")),
+        })
+    }
+
+    pub fn watch_recursive(
+        &mut self,
+        dir: &Path,
+        filter_entry: impl Fn(&DirEntry) -> bool,
+        emit_for_contents: bool,
+    ) {
+        for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
+            match res {
+                Ok(entry) => {
+                    if entry.path().is_dir() {
+                        match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
+                            Ok(()) => log::debug!("watching \"{}\"", dir.display()),
+                            Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
+                        }
+                    }
+                    if emit_for_contents && entry.depth() > 0 {
+                        // emit as create because we haven't seen it yet
+                        if let Err(e) =
+                            self.sender
+                                .send(io::Task::HandleChange(WatcherChange::Create(
+                                    entry.path().to_path_buf(),
+                                )))
+                        {
+                            log::warn!("watcher error: {}", e)
+                        }
+                    }
+                }
+                Err(e) => log::warn!("watcher error: {}", e),
+            }
+        }
+    }
+
+    pub fn shutdown(mut self) -> thread::Result<()> {
+        self.bomb.defuse();
+        drop(self.watcher);
+        let res = self.thread.join();
+        match &res {
+            Ok(()) => log::info!("... Watcher terminated with ok"),
+            Err(_) => log::error!("... Watcher terminated with err"),
+        }
+        res
+    }
+}
index 1961808904e7c90acde7ee8a078c50da2e74e43b..5db0d86460e9fa1bdef22cb032ca0646fd29bc00 100644 (file)
@@ -14,7 +14,6 @@
 //! which are watched for changes. Typically, there will be a root for each
 //! Cargo package.
 mod io;
-mod watcher;
 
 use std::{
     cmp::Reverse,
@@ -32,7 +31,7 @@
 use walkdir::DirEntry;
 
 pub use crate::io::TaskResult as VfsTask;
-pub use crate::watcher::WatcherChange;
+use io::{Task, TaskResult, WatcherChange, WatcherChangeData, Worker};
 
 /// `RootFilter` is a predicate that checks if a file can belong to a root. If
 /// several filters match a file (nested dirs), the most nested one wins.
@@ -100,7 +99,7 @@ pub struct Vfs {
     files: Arena<VfsFile, VfsFileData>,
     root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>,
     pending_changes: Vec<VfsChange>,
-    worker: io::Worker,
+    worker: Worker,
 }
 
 impl fmt::Debug for Vfs {
@@ -204,7 +203,7 @@ pub fn task_receiver(&self) -> &Receiver<io::TaskResult> {
 
     pub fn handle_task(&mut self, task: io::TaskResult) {
         match task {
-            io::TaskResult::AddRoot(task) => {
+            TaskResult::AddRoot(task) => {
                 let mut files = Vec::new();
                 // While we were scanning the root in the backgound, a file might have
                 // been open in the editor, so we need to account for that.
@@ -229,38 +228,35 @@ pub fn handle_task(&mut self, task: io::TaskResult) {
                 };
                 self.pending_changes.push(change);
             }
-            io::TaskResult::HandleChange(change) => match &change {
-                watcher::WatcherChange::Create(path) if path.is_dir() => {
+            TaskResult::HandleChange(change) => match &change {
+                WatcherChange::Create(path) if path.is_dir() => {
                     if let Some((root, _path, _file)) = self.find_root(&path) {
                         let root_filter = self.roots[root].clone();
                         let filter =
                             move |entry: &DirEntry| root_filter.can_contain(entry.path()).is_some();
                         self.worker
                             .sender()
-                            .send(io::Task::Watch {
+                            .send(Task::Watch {
                                 dir: path.to_path_buf(),
                                 filter: Box::new(filter),
                             })
                             .unwrap()
                     }
                 }
-                watcher::WatcherChange::Create(path)
-                | watcher::WatcherChange::Remove(path)
-                | watcher::WatcherChange::Write(path) => {
+                WatcherChange::Create(path)
+                | WatcherChange::Remove(path)
+                | WatcherChange::Write(path) => {
                     if self.should_handle_change(&path) {
-                        self.worker
-                            .sender()
-                            .send(io::Task::LoadChange(change))
-                            .unwrap()
+                        self.worker.sender().send(Task::LoadChange(change)).unwrap()
                     }
                 }
-                watcher::WatcherChange::Rescan => {
+                WatcherChange::Rescan => {
                     // TODO we should reload all files
                 }
             },
-            io::TaskResult::LoadChange(change) => match change {
-                io::WatcherChangeData::Create { path, text }
-                | io::WatcherChangeData::Write { path, text } => {
+            TaskResult::LoadChange(change) => match change {
+                WatcherChangeData::Create { path, text }
+                | WatcherChangeData::Write { path, text } => {
                     if let Some((root, path, file)) = self.find_root(&path) {
                         if let Some(file) = file {
                             self.do_change_file(file, text, false);
@@ -269,7 +265,7 @@ pub fn handle_task(&mut self, task: io::TaskResult) {
                         }
                     }
                 }
-                io::WatcherChangeData::Remove { path } => {
+                WatcherChangeData::Remove { path } => {
                     if let Some((root, path, file)) = self.find_root(&path) {
                         if let Some(file) = file {
                             self.do_remove_file(root, path, file, false);
@@ -277,7 +273,7 @@ pub fn handle_task(&mut self, task: io::TaskResult) {
                     }
                 }
             },
-            io::TaskResult::NoOp => {}
+            TaskResult::NoOp => {}
         }
     }
 
diff --git a/crates/ra_vfs/src/watcher.rs b/crates/ra_vfs/src/watcher.rs
deleted file mode 100644 (file)
index 6069358..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-use crate::io;
-use crossbeam_channel::Sender;
-use drop_bomb::DropBomb;
-use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
-use std::{
-    path::{Path, PathBuf},
-    sync::mpsc,
-    thread,
-    time::Duration,
-};
-use walkdir::{DirEntry, WalkDir};
-
-pub(crate) struct Watcher {
-    watcher: RecommendedWatcher,
-    thread: thread::JoinHandle<()>,
-    bomb: DropBomb,
-    sender: Sender<io::Task>,
-}
-
-#[derive(Debug)]
-pub enum WatcherChange {
-    Create(PathBuf),
-    Write(PathBuf),
-    Remove(PathBuf),
-    Rescan,
-}
-
-fn handle_change_event(
-    ev: DebouncedEvent,
-    sender: &Sender<io::Task>,
-) -> Result<(), Box<std::error::Error>> {
-    match ev {
-        DebouncedEvent::NoticeWrite(_)
-        | DebouncedEvent::NoticeRemove(_)
-        | DebouncedEvent::Chmod(_) => {
-            // ignore
-        }
-        DebouncedEvent::Rescan => {
-            sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
-        }
-        DebouncedEvent::Create(path) => {
-            sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
-        }
-        DebouncedEvent::Write(path) => {
-            sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
-        }
-        DebouncedEvent::Remove(path) => {
-            sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
-        }
-        DebouncedEvent::Rename(src, dst) => {
-            sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
-            sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
-        }
-        DebouncedEvent::Error(err, path) => {
-            // TODO should we reload the file contents?
-            log::warn!("watcher error \"{}\", {:?}", err, path);
-        }
-    }
-    Ok(())
-}
-
-const WATCHER_DELAY: Duration = Duration::from_millis(250);
-
-impl Watcher {
-    pub(crate) fn start(
-        output_sender: Sender<io::Task>,
-    ) -> Result<Watcher, Box<std::error::Error>> {
-        let (input_sender, input_receiver) = mpsc::channel();
-        let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
-        let sender = output_sender.clone();
-        let thread = thread::spawn(move || {
-            input_receiver
-                .into_iter()
-                // forward relevant events only
-                .try_for_each(|change| handle_change_event(change, &output_sender))
-                .unwrap()
-        });
-        Ok(Watcher {
-            watcher,
-            thread,
-            sender,
-            bomb: DropBomb::new(format!("Watcher was not shutdown")),
-        })
-    }
-
-    pub fn watch_recursive(
-        &mut self,
-        dir: &Path,
-        filter_entry: impl Fn(&DirEntry) -> bool,
-        emit_for_contents: bool,
-    ) {
-        for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
-            match res {
-                Ok(entry) => {
-                    if entry.path().is_dir() {
-                        match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
-                            Ok(()) => log::debug!("watching \"{}\"", dir.display()),
-                            Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
-                        }
-                    }
-                    if emit_for_contents && entry.depth() > 0 {
-                        // emit as create because we haven't seen it yet
-                        if let Err(e) =
-                            self.sender
-                                .send(io::Task::HandleChange(WatcherChange::Create(
-                                    entry.path().to_path_buf(),
-                                )))
-                        {
-                            log::warn!("watcher error: {}", e)
-                        }
-                    }
-                }
-                Err(e) => log::warn!("watcher error: {}", e),
-            }
-        }
-    }
-
-    pub fn shutdown(mut self) -> thread::Result<()> {
-        self.bomb.defuse();
-        drop(self.watcher);
-        let res = self.thread.join();
-        match &res {
-            Ok(()) => log::info!("... Watcher terminated with ok"),
-            Err(_) => log::error!("... Watcher terminated with err"),
-        }
-        res
-    }
-}