sync::{mpsc, Arc},
time::Duration,
};
-use crossbeam_channel::{Receiver, Sender};
+use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
use relative_path::RelativePathBuf;
use thread_worker::WorkerHandle;
use walkdir::WalkDir;
impl Worker {
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
- let (worker, worker_handle) =
- thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
+ // This is a pretty elaborate setup of threads & channels! It is
+ // explained by the following concerns:
+
+ // * we need to burn a thread translating from notify's mpsc to
+ // crossbeam_channel.
+ // * we want to read all files from a single thread, to gurantee that
+ // we always get fresher versions and never go back in time.
+ // * we want to tear down everything neatly during shutdown.
+ let (worker, worker_handle) = thread_worker::spawn(
+ "vfs",
+ 128,
+ // This are the channels we use to communicate with outside world.
+ // If `input_receiver` is closed we need to tear ourselves down.
+ // `output_sender` should not be closed unless the parent died.
+ move |input_receiver, output_sender| {
+ // These are `std` channels notify will send events to
let (notify_sender, notify_receiver) = mpsc::channel();
+ // These are the corresponding crossbeam channels
+ let (watcher_sender, watcher_receiver) = unbounded();
let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
.map_err(|e| log::error!("failed to spawn notify {}", e))
.ok();
watcher: Arc::new(Mutex::new(watcher)),
sender: output_sender,
};
- let thread = thread::spawn({
- let ctx = ctx.clone();
- move || {
- let _ = notify_receiver
- .into_iter()
- // forward relevant events only
- .try_for_each(|change| ctx.handle_debounced_event(change));
- }
- });
- let res1 = input_receiver.into_iter().try_for_each(|t| match t {
- Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
+ let thread = thread::spawn(move || {
+ let _ = notify_receiver
+ .into_iter()
+ // forward relevant events only
+ .for_each(|event| convert_notify_event(event, &watcher_sender));
});
+
+ loop {
+ select! {
+ // Received request from the caller. If this channel is
+ // closed, we should shutdown everything.
+ recv(input_receiver) -> t => match t {
+ Err(RecvError) => break,
+ Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)),
+ },
+ // Watcher send us changes. If **this** channel is
+ // closed, the watcher has died, which indicates a bug
+ // -- escalate!
+ recv(watcher_receiver) -> event => match event {
+ Err(RecvError) => panic!("watcher is dead"),
+ Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(),
+ },
+ }
+ }
drop(ctx.watcher.lock().take());
drop(ctx);
let res2 = thread.join();
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
- res1.unwrap();
res2.unwrap();
- });
+ },
+ );
Worker {
worker,
worker_handle,
}
}
-fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Result<()> {
+fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
let mut guard = woker.watcher.lock();
log::debug!("loading {} ...", config.root.as_path().display());
let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
.collect();
woker
.sender
- .send(TaskResult::BulkLoadRoot { root, files })?;
+ .send(TaskResult::BulkLoadRoot { root, files })
+ .unwrap();
log::debug!("... loaded {}", config.root.as_path().display());
- Ok(())
}
#[derive(Clone)]
sender: Sender<TaskResult>,
}
-impl WatcherCtx {
- fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> {
- match ev {
- DebouncedEvent::NoticeWrite(_)
- | DebouncedEvent::NoticeRemove(_)
- | DebouncedEvent::Chmod(_) => {
- // ignore
- }
- DebouncedEvent::Rescan => {
- // TODO rescan all roots
- }
- DebouncedEvent::Create(path) => {
- self.handle_change(path, ChangeKind::Create)?;
- }
- DebouncedEvent::Write(path) => {
- self.handle_change(path, ChangeKind::Write)?;
- }
- DebouncedEvent::Remove(path) => {
- self.handle_change(path, ChangeKind::Remove)?;
- }
- DebouncedEvent::Rename(src, dst) => {
- self.handle_change(src, ChangeKind::Remove)?;
- self.handle_change(dst, ChangeKind::Create)?;
- }
- DebouncedEvent::Error(err, path) => {
- // TODO should we reload the file contents?
- log::warn!("watcher error \"{}\", {:?}", err, path);
- }
+fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
+ match event {
+ DebouncedEvent::NoticeWrite(_)
+ | DebouncedEvent::NoticeRemove(_)
+ | DebouncedEvent::Chmod(_) => {
+ // ignore
+ }
+ DebouncedEvent::Rescan => {
+ // TODO rescan all roots
+ }
+ DebouncedEvent::Create(path) => {
+ sender.send((path, ChangeKind::Create)).unwrap();
+ }
+ DebouncedEvent::Write(path) => {
+ sender.send((path, ChangeKind::Write)).unwrap();
+ }
+ DebouncedEvent::Remove(path) => {
+ sender.send((path, ChangeKind::Remove)).unwrap();
+ }
+ DebouncedEvent::Rename(src, dst) => {
+ sender.send((src, ChangeKind::Remove)).unwrap();
+ sender.send((dst, ChangeKind::Create)).unwrap();
+ }
+ DebouncedEvent::Error(err, path) => {
+ // TODO should we reload the file contents?
+ log::warn!("watcher error \"{}\", {:?}", err, path);
}
- Ok(())
}
+}
+impl WatcherCtx {
fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> {
let (root, rel_path) = match self.roots.find(&path) {
None => return Ok(()),