+++ /dev/null
-use std::{
- fmt, fs,
- path::{Path, PathBuf},
- sync::Arc,
- thread,
-};
-
-use crossbeam_channel::{Receiver, Sender};
-use parking_lot::Mutex;
-use relative_path::RelativePathBuf;
-use thread_worker::WorkerHandle;
-use walkdir::{DirEntry, WalkDir};
-
-use crate::{
- watcher::{Watcher, WatcherChange},
- VfsRoot,
-};
-
-pub(crate) enum Task {
- AddRoot {
- root: VfsRoot,
- path: PathBuf,
- filter: Box<Fn(&DirEntry) -> bool + Send>,
- },
- HandleChange(WatcherChange),
- LoadChange(WatcherChange),
- Watch {
- dir: PathBuf,
- filter: Box<Fn(&DirEntry) -> bool + Send>,
- },
-}
-
-#[derive(Debug)]
-pub struct AddRootResult {
- pub(crate) root: VfsRoot,
- pub(crate) files: Vec<(RelativePathBuf, String)>,
-}
-
-#[derive(Debug)]
-pub enum WatcherChangeData {
- Create { path: PathBuf, text: String },
- Write { path: PathBuf, text: String },
- Remove { path: PathBuf },
-}
-
-pub enum TaskResult {
- AddRoot(AddRootResult),
- HandleChange(WatcherChange),
- LoadChange(WatcherChangeData),
- NoOp,
-}
-
-impl fmt::Debug for TaskResult {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.write_str("TaskResult { ... }")
- }
-}
-
-pub(crate) struct Worker {
- worker: thread_worker::Worker<Task, TaskResult>,
- worker_handle: WorkerHandle,
- watcher: Arc<Mutex<Option<Watcher>>>,
-}
-
-impl Worker {
- pub(crate) fn start() -> Worker {
- let watcher = Arc::new(Mutex::new(None));
- let watcher_clone = watcher.clone();
- let (worker, worker_handle) =
- thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
- input_receiver
- .into_iter()
- .map(|t| handle_task(t, &watcher_clone))
- .try_for_each(|it| output_sender.send(it))
- .unwrap()
- });
- match Watcher::start(worker.inp.clone()) {
- Ok(w) => {
- watcher.lock().replace(w);
- }
- Err(e) => log::error!("could not start watcher: {}", e),
- };
- Worker {
- worker,
- worker_handle,
- watcher,
- }
- }
-
- pub(crate) fn sender(&self) -> &Sender<Task> {
- &self.worker.inp
- }
-
- pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
- &self.worker.out
- }
-
- pub(crate) fn shutdown(self) -> thread::Result<()> {
- if let Some(watcher) = self.watcher.lock().take() {
- let _ = watcher.shutdown();
- }
- let _ = self.worker.shutdown();
- self.worker_handle.shutdown()
- }
-}
-
-fn watch(
- watcher: &Arc<Mutex<Option<Watcher>>>,
- dir: &Path,
- filter_entry: impl Fn(&DirEntry) -> bool,
- emit_for_existing: bool,
-) {
- let mut watcher = watcher.lock();
- let watcher = match *watcher {
- Some(ref mut w) => w,
- None => {
- // watcher dropped or couldn't start
- return;
- }
- };
- watcher.watch_recursive(dir, filter_entry, emit_for_existing)
-}
-
-fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
- match task {
- Task::AddRoot { root, path, filter } => {
- watch(watcher, &path, &*filter, false);
- log::debug!("loading {} ...", path.as_path().display());
- let files = load_root(path.as_path(), &*filter);
- log::debug!("... loaded {}", path.as_path().display());
- TaskResult::AddRoot(AddRootResult { root, files })
- }
- Task::HandleChange(change) => {
- // forward as is because Vfs has to decide if we should load it
- TaskResult::HandleChange(change)
- }
- Task::LoadChange(change) => {
- log::debug!("loading {:?} ...", change);
- match load_change(change) {
- Some(data) => TaskResult::LoadChange(data),
- None => TaskResult::NoOp,
- }
- }
- Task::Watch { dir, filter } => {
- watch(watcher, &dir, &*filter, true);
- TaskResult::NoOp
- }
- }
-}
-
-fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
- let mut res = Vec::new();
- for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
- let entry = match entry {
- Ok(entry) => entry,
- Err(e) => {
- log::warn!("watcher error: {}", e);
- continue;
- }
- };
- if !entry.file_type().is_file() {
- continue;
- }
- let path = entry.path();
- let text = match fs::read_to_string(path) {
- Ok(text) => text,
- Err(e) => {
- log::warn!("watcher error: {}", e);
- continue;
- }
- };
- let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
- res.push((path.to_owned(), text))
- }
- res
-}
-
-fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
- let data = match change {
- WatcherChange::Create(path) => {
- if path.is_dir() {
- return None;
- }
- let text = match fs::read_to_string(&path) {
- Ok(text) => text,
- Err(e) => {
- log::warn!("watcher error \"{}\": {}", path.display(), e);
- return None;
- }
- };
- WatcherChangeData::Create { path, text }
- }
- WatcherChange::Write(path) => {
- let text = match fs::read_to_string(&path) {
- Ok(text) => text,
- Err(e) => {
- log::warn!("watcher error \"{}\": {}", path.display(), e);
- return None;
- }
- };
- WatcherChangeData::Write { path, text }
- }
- WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
- WatcherChange::Rescan => {
- // this should be handled by Vfs::handle_task
- return None;
- }
- };
- Some(data)
-}
--- /dev/null
+use std::{
+ fmt, fs,
+ path::{Path, PathBuf},
+ sync::Arc,
+ thread,
+};
+
+use crossbeam_channel::{Receiver, Sender};
+use parking_lot::Mutex;
+use relative_path::RelativePathBuf;
+use thread_worker::WorkerHandle;
+use walkdir::{DirEntry, WalkDir};
+
+mod watcher;
+use watcher::Watcher;
+pub use watcher::WatcherChange;
+
+use crate::VfsRoot;
+
+pub(crate) enum Task {
+ AddRoot {
+ root: VfsRoot,
+ path: PathBuf,
+ filter: Box<Fn(&DirEntry) -> bool + Send>,
+ },
+ /// this variant should only be created by the watcher
+ HandleChange(WatcherChange),
+ LoadChange(WatcherChange),
+ Watch {
+ dir: PathBuf,
+ filter: Box<Fn(&DirEntry) -> bool + Send>,
+ },
+}
+
+#[derive(Debug)]
+pub struct AddRootResult {
+ pub(crate) root: VfsRoot,
+ pub(crate) files: Vec<(RelativePathBuf, String)>,
+}
+
+#[derive(Debug)]
+pub enum WatcherChangeData {
+ Create { path: PathBuf, text: String },
+ Write { path: PathBuf, text: String },
+ Remove { path: PathBuf },
+}
+
+pub enum TaskResult {
+ AddRoot(AddRootResult),
+ HandleChange(WatcherChange),
+ LoadChange(WatcherChangeData),
+ NoOp,
+}
+
+impl fmt::Debug for TaskResult {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.write_str("TaskResult { ... }")
+ }
+}
+
+pub(crate) struct Worker {
+ worker: thread_worker::Worker<Task, TaskResult>,
+ worker_handle: WorkerHandle,
+ watcher: Arc<Mutex<Option<Watcher>>>,
+}
+
+impl Worker {
+ pub(crate) fn start() -> Worker {
+ let watcher = Arc::new(Mutex::new(None));
+ let watcher_clone = watcher.clone();
+ let (worker, worker_handle) =
+ thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
+ input_receiver
+ .into_iter()
+ .map(|t| handle_task(t, &watcher_clone))
+ .try_for_each(|it| output_sender.send(it))
+ .unwrap()
+ });
+ match Watcher::start(worker.inp.clone()) {
+ Ok(w) => {
+ watcher.lock().replace(w);
+ }
+ Err(e) => log::error!("could not start watcher: {}", e),
+ };
+ Worker {
+ worker,
+ worker_handle,
+ watcher,
+ }
+ }
+
+ pub(crate) fn sender(&self) -> &Sender<Task> {
+ &self.worker.inp
+ }
+
+ pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
+ &self.worker.out
+ }
+
+ pub(crate) fn shutdown(self) -> thread::Result<()> {
+ if let Some(watcher) = self.watcher.lock().take() {
+ let _ = watcher.shutdown();
+ }
+ let _ = self.worker.shutdown();
+ self.worker_handle.shutdown()
+ }
+}
+
+fn watch(
+ watcher: &Arc<Mutex<Option<Watcher>>>,
+ dir: &Path,
+ filter_entry: impl Fn(&DirEntry) -> bool,
+ emit_for_existing: bool,
+) {
+ let mut watcher = watcher.lock();
+ let watcher = match *watcher {
+ Some(ref mut w) => w,
+ None => {
+ // watcher dropped or couldn't start
+ return;
+ }
+ };
+ watcher.watch_recursive(dir, filter_entry, emit_for_existing)
+}
+
+fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
+ match task {
+ Task::AddRoot { root, path, filter } => {
+ watch(watcher, &path, &*filter, false);
+ log::debug!("loading {} ...", path.as_path().display());
+ let files = load_root(path.as_path(), &*filter);
+ log::debug!("... loaded {}", path.as_path().display());
+ TaskResult::AddRoot(AddRootResult { root, files })
+ }
+ Task::HandleChange(change) => {
+ // forward as is because Vfs has to decide if we should load it
+ TaskResult::HandleChange(change)
+ }
+ Task::LoadChange(change) => {
+ log::debug!("loading {:?} ...", change);
+ match load_change(change) {
+ Some(data) => TaskResult::LoadChange(data),
+ None => TaskResult::NoOp,
+ }
+ }
+ Task::Watch { dir, filter } => {
+ watch(watcher, &dir, &*filter, true);
+ TaskResult::NoOp
+ }
+ }
+}
+
+fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
+ let mut res = Vec::new();
+ for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
+ let entry = match entry {
+ Ok(entry) => entry,
+ Err(e) => {
+ log::warn!("watcher error: {}", e);
+ continue;
+ }
+ };
+ if !entry.file_type().is_file() {
+ continue;
+ }
+ let path = entry.path();
+ let text = match fs::read_to_string(path) {
+ Ok(text) => text,
+ Err(e) => {
+ log::warn!("watcher error: {}", e);
+ continue;
+ }
+ };
+ let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
+ res.push((path.to_owned(), text))
+ }
+ res
+}
+
+fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
+ let data = match change {
+ WatcherChange::Create(path) => {
+ if path.is_dir() {
+ return None;
+ }
+ let text = match fs::read_to_string(&path) {
+ Ok(text) => text,
+ Err(e) => {
+ log::warn!("watcher error \"{}\": {}", path.display(), e);
+ return None;
+ }
+ };
+ WatcherChangeData::Create { path, text }
+ }
+ WatcherChange::Write(path) => {
+ let text = match fs::read_to_string(&path) {
+ Ok(text) => text,
+ Err(e) => {
+ log::warn!("watcher error \"{}\": {}", path.display(), e);
+ return None;
+ }
+ };
+ WatcherChangeData::Write { path, text }
+ }
+ WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
+ WatcherChange::Rescan => {
+ // this should be handled by Vfs::handle_task
+ return None;
+ }
+ };
+ Some(data)
+}
--- /dev/null
+use crate::io;
+use crossbeam_channel::Sender;
+use drop_bomb::DropBomb;
+use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
+use std::{
+ path::{Path, PathBuf},
+ sync::mpsc,
+ thread,
+ time::Duration,
+};
+use walkdir::{DirEntry, WalkDir};
+
+#[derive(Debug)]
+pub enum WatcherChange {
+ Create(PathBuf),
+ Write(PathBuf),
+ Remove(PathBuf),
+ Rescan,
+}
+
+fn handle_change_event(
+ ev: DebouncedEvent,
+ sender: &Sender<io::Task>,
+) -> Result<(), Box<std::error::Error>> {
+ match ev {
+ DebouncedEvent::NoticeWrite(_)
+ | DebouncedEvent::NoticeRemove(_)
+ | DebouncedEvent::Chmod(_) => {
+ // ignore
+ }
+ DebouncedEvent::Rescan => {
+ sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
+ }
+ DebouncedEvent::Create(path) => {
+ sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
+ }
+ DebouncedEvent::Write(path) => {
+ sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
+ }
+ DebouncedEvent::Remove(path) => {
+ sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
+ }
+ DebouncedEvent::Rename(src, dst) => {
+ sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
+ sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
+ }
+ DebouncedEvent::Error(err, path) => {
+ // TODO should we reload the file contents?
+ log::warn!("watcher error \"{}\", {:?}", err, path);
+ }
+ }
+ Ok(())
+}
+
+const WATCHER_DELAY: Duration = Duration::from_millis(250);
+
+pub(crate) struct Watcher {
+ watcher: RecommendedWatcher,
+ thread: thread::JoinHandle<()>,
+ bomb: DropBomb,
+ sender: Sender<io::Task>,
+}
+
+impl Watcher {
+ pub(crate) fn start(
+ output_sender: Sender<io::Task>,
+ ) -> Result<Watcher, Box<std::error::Error>> {
+ let (input_sender, input_receiver) = mpsc::channel();
+ let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
+ let sender = output_sender.clone();
+ let thread = thread::spawn(move || {
+ input_receiver
+ .into_iter()
+ // forward relevant events only
+ .try_for_each(|change| handle_change_event(change, &output_sender))
+ .unwrap()
+ });
+ Ok(Watcher {
+ watcher,
+ thread,
+ sender,
+ bomb: DropBomb::new(format!("Watcher was not shutdown")),
+ })
+ }
+
+ pub fn watch_recursive(
+ &mut self,
+ dir: &Path,
+ filter_entry: impl Fn(&DirEntry) -> bool,
+ emit_for_contents: bool,
+ ) {
+ for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
+ match res {
+ Ok(entry) => {
+ if entry.path().is_dir() {
+ match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
+ Ok(()) => log::debug!("watching \"{}\"", dir.display()),
+ Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
+ }
+ }
+ if emit_for_contents && entry.depth() > 0 {
+ // emit as create because we haven't seen it yet
+ if let Err(e) =
+ self.sender
+ .send(io::Task::HandleChange(WatcherChange::Create(
+ entry.path().to_path_buf(),
+ )))
+ {
+ log::warn!("watcher error: {}", e)
+ }
+ }
+ }
+ Err(e) => log::warn!("watcher error: {}", e),
+ }
+ }
+ }
+
+ pub fn shutdown(mut self) -> thread::Result<()> {
+ self.bomb.defuse();
+ drop(self.watcher);
+ let res = self.thread.join();
+ match &res {
+ Ok(()) => log::info!("... Watcher terminated with ok"),
+ Err(_) => log::error!("... Watcher terminated with err"),
+ }
+ res
+ }
+}
//! which are watched for changes. Typically, there will be a root for each
//! Cargo package.
mod io;
-mod watcher;
use std::{
cmp::Reverse,
use walkdir::DirEntry;
pub use crate::io::TaskResult as VfsTask;
-pub use crate::watcher::WatcherChange;
+use io::{Task, TaskResult, WatcherChange, WatcherChangeData, Worker};
/// `RootFilter` is a predicate that checks if a file can belong to a root. If
/// several filters match a file (nested dirs), the most nested one wins.
files: Arena<VfsFile, VfsFileData>,
root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>,
pending_changes: Vec<VfsChange>,
- worker: io::Worker,
+ worker: Worker,
}
impl fmt::Debug for Vfs {
pub fn handle_task(&mut self, task: io::TaskResult) {
match task {
- io::TaskResult::AddRoot(task) => {
+ TaskResult::AddRoot(task) => {
let mut files = Vec::new();
// While we were scanning the root in the backgound, a file might have
// been open in the editor, so we need to account for that.
};
self.pending_changes.push(change);
}
- io::TaskResult::HandleChange(change) => match &change {
- watcher::WatcherChange::Create(path) if path.is_dir() => {
+ TaskResult::HandleChange(change) => match &change {
+ WatcherChange::Create(path) if path.is_dir() => {
if let Some((root, _path, _file)) = self.find_root(&path) {
let root_filter = self.roots[root].clone();
let filter =
move |entry: &DirEntry| root_filter.can_contain(entry.path()).is_some();
self.worker
.sender()
- .send(io::Task::Watch {
+ .send(Task::Watch {
dir: path.to_path_buf(),
filter: Box::new(filter),
})
.unwrap()
}
}
- watcher::WatcherChange::Create(path)
- | watcher::WatcherChange::Remove(path)
- | watcher::WatcherChange::Write(path) => {
+ WatcherChange::Create(path)
+ | WatcherChange::Remove(path)
+ | WatcherChange::Write(path) => {
if self.should_handle_change(&path) {
- self.worker
- .sender()
- .send(io::Task::LoadChange(change))
- .unwrap()
+ self.worker.sender().send(Task::LoadChange(change)).unwrap()
}
}
- watcher::WatcherChange::Rescan => {
+ WatcherChange::Rescan => {
// TODO we should reload all files
}
},
- io::TaskResult::LoadChange(change) => match change {
- io::WatcherChangeData::Create { path, text }
- | io::WatcherChangeData::Write { path, text } => {
+ TaskResult::LoadChange(change) => match change {
+ WatcherChangeData::Create { path, text }
+ | WatcherChangeData::Write { path, text } => {
if let Some((root, path, file)) = self.find_root(&path) {
if let Some(file) = file {
self.do_change_file(file, text, false);
}
}
}
- io::WatcherChangeData::Remove { path } => {
+ WatcherChangeData::Remove { path } => {
if let Some((root, path, file)) = self.find_root(&path) {
if let Some(file) = file {
self.do_remove_file(root, path, file, false);
}
}
},
- io::TaskResult::NoOp => {}
+ TaskResult::NoOp => {}
}
}
+++ /dev/null
-use crate::io;
-use crossbeam_channel::Sender;
-use drop_bomb::DropBomb;
-use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
-use std::{
- path::{Path, PathBuf},
- sync::mpsc,
- thread,
- time::Duration,
-};
-use walkdir::{DirEntry, WalkDir};
-
-pub(crate) struct Watcher {
- watcher: RecommendedWatcher,
- thread: thread::JoinHandle<()>,
- bomb: DropBomb,
- sender: Sender<io::Task>,
-}
-
-#[derive(Debug)]
-pub enum WatcherChange {
- Create(PathBuf),
- Write(PathBuf),
- Remove(PathBuf),
- Rescan,
-}
-
-fn handle_change_event(
- ev: DebouncedEvent,
- sender: &Sender<io::Task>,
-) -> Result<(), Box<std::error::Error>> {
- match ev {
- DebouncedEvent::NoticeWrite(_)
- | DebouncedEvent::NoticeRemove(_)
- | DebouncedEvent::Chmod(_) => {
- // ignore
- }
- DebouncedEvent::Rescan => {
- sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
- }
- DebouncedEvent::Create(path) => {
- sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
- }
- DebouncedEvent::Write(path) => {
- sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
- }
- DebouncedEvent::Remove(path) => {
- sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
- }
- DebouncedEvent::Rename(src, dst) => {
- sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
- sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
- }
- DebouncedEvent::Error(err, path) => {
- // TODO should we reload the file contents?
- log::warn!("watcher error \"{}\", {:?}", err, path);
- }
- }
- Ok(())
-}
-
-const WATCHER_DELAY: Duration = Duration::from_millis(250);
-
-impl Watcher {
- pub(crate) fn start(
- output_sender: Sender<io::Task>,
- ) -> Result<Watcher, Box<std::error::Error>> {
- let (input_sender, input_receiver) = mpsc::channel();
- let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
- let sender = output_sender.clone();
- let thread = thread::spawn(move || {
- input_receiver
- .into_iter()
- // forward relevant events only
- .try_for_each(|change| handle_change_event(change, &output_sender))
- .unwrap()
- });
- Ok(Watcher {
- watcher,
- thread,
- sender,
- bomb: DropBomb::new(format!("Watcher was not shutdown")),
- })
- }
-
- pub fn watch_recursive(
- &mut self,
- dir: &Path,
- filter_entry: impl Fn(&DirEntry) -> bool,
- emit_for_contents: bool,
- ) {
- for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
- match res {
- Ok(entry) => {
- if entry.path().is_dir() {
- match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
- Ok(()) => log::debug!("watching \"{}\"", dir.display()),
- Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
- }
- }
- if emit_for_contents && entry.depth() > 0 {
- // emit as create because we haven't seen it yet
- if let Err(e) =
- self.sender
- .send(io::Task::HandleChange(WatcherChange::Create(
- entry.path().to_path_buf(),
- )))
- {
- log::warn!("watcher error: {}", e)
- }
- }
- }
- Err(e) => log::warn!("watcher error: {}", e),
- }
- }
- }
-
- pub fn shutdown(mut self) -> thread::Result<()> {
- self.bomb.defuse();
- drop(self.watcher);
- let res = self.thread.join();
- match &res {
- Ok(()) => log::info!("... Watcher terminated with ok"),
- Err(_) => log::error!("... Watcher terminated with err"),
- }
- res
- }
-}