]> git.lizzy.rs Git - rust.git/blob - crates/rust-analyzer/src/main_loop.rs
Merge #3309
[rust.git] / crates / rust-analyzer / src / main_loop.rs
1 //! The main loop of `rust-analyzer` responsible for dispatching LSP
2 //! requests/replies and notifications back to the client.
3
4 mod handlers;
5 mod subscriptions;
6 pub(crate) mod pending_requests;
7
8 use std::{
9     env,
10     error::Error,
11     fmt, panic,
12     path::PathBuf,
13     sync::Arc,
14     time::{Duration, Instant},
15 };
16
17 use crossbeam_channel::{select, unbounded, RecvError, Sender};
18 use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
19 use lsp_types::{ClientCapabilities, NumberOrString};
20 use ra_cargo_watch::{url_from_path_with_drive_lowercasing, CheckOptions, CheckTask};
21 use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
22 use ra_prof::profile;
23 use ra_vfs::{VfsFile, VfsTask, Watch};
24 use relative_path::RelativePathBuf;
25 use rustc_hash::FxHashSet;
26 use serde::{de::DeserializeOwned, Serialize};
27 use threadpool::ThreadPool;
28
29 use crate::{
30     diagnostics::DiagnosticTask,
31     main_loop::{
32         pending_requests::{PendingRequest, PendingRequests},
33         subscriptions::Subscriptions,
34     },
35     req,
36     world::{Options, WorldSnapshot, WorldState},
37     Result, ServerConfig,
38 };
39
40 #[derive(Debug)]
41 pub struct LspError {
42     pub code: i32,
43     pub message: String,
44 }
45
46 impl LspError {
47     pub fn new(code: i32, message: String) -> LspError {
48         LspError { code, message }
49     }
50 }
51
52 impl fmt::Display for LspError {
53     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54         write!(f, "Language Server request failed with {}. ({})", self.code, self.message)
55     }
56 }
57
58 impl Error for LspError {}
59
60 pub fn main_loop(
61     ws_roots: Vec<PathBuf>,
62     client_caps: ClientCapabilities,
63     config: ServerConfig,
64     connection: Connection,
65 ) -> Result<()> {
66     log::info!("server_config: {:#?}", config);
67
68     // Windows scheduler implements priority boosts: if thread waits for an
69     // event (like a condvar), and event fires, priority of the thread is
70     // temporary bumped. This optimization backfires in our case: each time the
71     // `main_loop` schedules a task to run on a threadpool, the worker threads
72     // gets a higher priority, and (on a machine with fewer cores) displaces the
73     // main loop! We work-around this by marking the main loop as a
74     // higher-priority thread.
75     //
76     // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
77     // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
78     // https://github.com/rust-analyzer/rust-analyzer/issues/2835
79     #[cfg(windows)]
80     unsafe {
81         use winapi::um::processthreadsapi::*;
82         let thread = GetCurrentThread();
83         let thread_priority_above_normal = 1;
84         SetThreadPriority(thread, thread_priority_above_normal);
85     }
86
87     let mut loop_state = LoopState::default();
88     let mut world_state = {
89         let feature_flags = {
90             let mut ff = FeatureFlags::default();
91             for (flag, value) in config.feature_flags {
92                 if ff.set(flag.as_str(), value).is_err() {
93                     log::error!("unknown feature flag: {:?}", flag);
94                     show_message(
95                         req::MessageType::Error,
96                         format!("unknown feature flag: {:?}", flag),
97                         &connection.sender,
98                     );
99                 }
100             }
101             ff
102         };
103         log::info!("feature_flags: {:#?}", feature_flags);
104
105         // FIXME: support dynamic workspace loading.
106         let workspaces = {
107             let mut loaded_workspaces = Vec::new();
108             for ws_root in &ws_roots {
109                 let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot(
110                     ws_root.as_path(),
111                     config.with_sysroot,
112                     &config.cargo_features,
113                 );
114                 match workspace {
115                     Ok(workspace) => loaded_workspaces.push(workspace),
116                     Err(e) => {
117                         log::error!("loading workspace failed: {:?}", e);
118
119                         if let Some(ra_project_model::CargoTomlNotFoundError { .. }) =
120                             e.downcast_ref()
121                         {
122                             if !feature_flags.get("notifications.cargo-toml-not-found") {
123                                 continue;
124                             }
125                         }
126
127                         show_message(
128                             req::MessageType::Error,
129                             format!("rust-analyzer failed to load workspace: {:?}", e),
130                             &connection.sender,
131                         );
132                     }
133                 }
134             }
135             loaded_workspaces
136         };
137
138         let globs = config
139             .exclude_globs
140             .iter()
141             .map(|glob| crate::vfs_glob::Glob::new(glob))
142             .collect::<std::result::Result<Vec<_>, _>>()?;
143
144         if config.use_client_watching {
145             let registration_options = req::DidChangeWatchedFilesRegistrationOptions {
146                 watchers: workspaces
147                     .iter()
148                     .flat_map(|ws| ws.to_roots())
149                     .filter(|root| root.is_member())
150                     .map(|root| format!("{}/**/*.rs", root.path().display()))
151                     .map(|glob_pattern| req::FileSystemWatcher { glob_pattern, kind: None })
152                     .collect(),
153             };
154             let registration = req::Registration {
155                 id: "file-watcher".to_string(),
156                 method: "workspace/didChangeWatchedFiles".to_string(),
157                 register_options: Some(serde_json::to_value(registration_options).unwrap()),
158             };
159             let params = req::RegistrationParams { registrations: vec![registration] };
160             let request =
161                 request_new::<req::RegisterCapability>(loop_state.next_request_id(), params);
162             connection.sender.send(request.into()).unwrap();
163         }
164
165         let options = {
166             let text_document_caps = client_caps.text_document.as_ref();
167             Options {
168                 publish_decorations: config.publish_decorations,
169                 supports_location_link: text_document_caps
170                     .and_then(|it| it.definition)
171                     .and_then(|it| it.link_support)
172                     .unwrap_or(false),
173                 line_folding_only: text_document_caps
174                     .and_then(|it| it.folding_range.as_ref())
175                     .and_then(|it| it.line_folding_only)
176                     .unwrap_or(false),
177                 max_inlay_hint_length: config.max_inlay_hint_length,
178                 cargo_watch: CheckOptions {
179                     enable: config.cargo_watch_enable,
180                     args: config.cargo_watch_args,
181                     command: config.cargo_watch_command,
182                     all_targets: config.cargo_watch_all_targets,
183                 },
184                 rustfmt_args: config.rustfmt_args,
185             }
186         };
187
188         WorldState::new(
189             ws_roots,
190             workspaces,
191             config.lru_capacity,
192             &globs,
193             Watch(!config.use_client_watching),
194             options,
195             feature_flags,
196         )
197     };
198
199     let pool = ThreadPool::default();
200     let (task_sender, task_receiver) = unbounded::<Task>();
201     let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
202
203     log::info!("server initialized, serving requests");
204     {
205         let task_sender = task_sender;
206         let libdata_sender = libdata_sender;
207         loop {
208             log::trace!("selecting");
209             let event = select! {
210                 recv(&connection.receiver) -> msg => match msg {
211                     Ok(msg) => Event::Msg(msg),
212                     Err(RecvError) => return Err("client exited without shutdown".into()),
213                 },
214                 recv(task_receiver) -> task => Event::Task(task.unwrap()),
215                 recv(world_state.task_receiver) -> task => match task {
216                     Ok(task) => Event::Vfs(task),
217                     Err(RecvError) => return Err("vfs died".into()),
218                 },
219                 recv(libdata_receiver) -> data => Event::Lib(data.unwrap()),
220                 recv(world_state.check_watcher.task_recv) -> task => match task {
221                     Ok(task) => Event::CheckWatcher(task),
222                     Err(RecvError) => return Err("check watcher died".into()),
223                 }
224             };
225             if let Event::Msg(Message::Request(req)) = &event {
226                 if connection.handle_shutdown(&req)? {
227                     break;
228                 };
229             }
230             loop_turn(
231                 &pool,
232                 &task_sender,
233                 &libdata_sender,
234                 &connection,
235                 &mut world_state,
236                 &mut loop_state,
237                 event,
238             )?;
239         }
240     }
241     world_state.analysis_host.request_cancellation();
242     log::info!("waiting for tasks to finish...");
243     task_receiver.into_iter().for_each(|task| {
244         on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
245     });
246     libdata_receiver.into_iter().for_each(drop);
247     log::info!("...tasks have finished");
248     log::info!("joining threadpool...");
249     drop(pool);
250     log::info!("...threadpool has finished");
251
252     let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
253     drop(vfs);
254
255     Ok(())
256 }
257
258 #[derive(Debug)]
259 enum Task {
260     Respond(Response),
261     Notify(Notification),
262     Diagnostic(DiagnosticTask),
263 }
264
265 enum Event {
266     Msg(Message),
267     Task(Task),
268     Vfs(VfsTask),
269     Lib(LibraryData),
270     CheckWatcher(CheckTask),
271 }
272
273 impl fmt::Debug for Event {
274     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275         let debug_verbose_not = |not: &Notification, f: &mut fmt::Formatter| {
276             f.debug_struct("Notification").field("method", &not.method).finish()
277         };
278
279         match self {
280             Event::Msg(Message::Notification(not)) => {
281                 if notification_is::<req::DidOpenTextDocument>(not)
282                     || notification_is::<req::DidChangeTextDocument>(not)
283                 {
284                     return debug_verbose_not(not, f);
285                 }
286             }
287             Event::Task(Task::Notify(not)) => {
288                 if notification_is::<req::PublishDecorations>(not)
289                     || notification_is::<req::PublishDiagnostics>(not)
290                 {
291                     return debug_verbose_not(not, f);
292                 }
293             }
294             Event::Task(Task::Respond(resp)) => {
295                 return f
296                     .debug_struct("Response")
297                     .field("id", &resp.id)
298                     .field("error", &resp.error)
299                     .finish();
300             }
301             _ => (),
302         }
303         match self {
304             Event::Msg(it) => fmt::Debug::fmt(it, f),
305             Event::Task(it) => fmt::Debug::fmt(it, f),
306             Event::Vfs(it) => fmt::Debug::fmt(it, f),
307             Event::Lib(it) => fmt::Debug::fmt(it, f),
308             Event::CheckWatcher(it) => fmt::Debug::fmt(it, f),
309         }
310     }
311 }
312
313 #[derive(Debug, Default)]
314 struct LoopState {
315     next_request_id: u64,
316     pending_responses: FxHashSet<RequestId>,
317     pending_requests: PendingRequests,
318     subscriptions: Subscriptions,
319     // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
320     // time to always have a thread ready to react to input.
321     in_flight_libraries: usize,
322     pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>,
323     workspace_loaded: bool,
324 }
325
326 impl LoopState {
327     fn next_request_id(&mut self) -> RequestId {
328         self.next_request_id += 1;
329         let res: RequestId = self.next_request_id.into();
330         let inserted = self.pending_responses.insert(res.clone());
331         assert!(inserted);
332         res
333     }
334 }
335
336 fn loop_turn(
337     pool: &ThreadPool,
338     task_sender: &Sender<Task>,
339     libdata_sender: &Sender<LibraryData>,
340     connection: &Connection,
341     world_state: &mut WorldState,
342     loop_state: &mut LoopState,
343     event: Event,
344 ) -> Result<()> {
345     let loop_start = Instant::now();
346
347     // NOTE: don't count blocking select! call as a loop-turn time
348     let _p = profile("main_loop_inner/loop-turn");
349     log::info!("loop turn = {:?}", event);
350     let queue_count = pool.queued_count();
351     if queue_count > 0 {
352         log::info!("queued count = {}", queue_count);
353     }
354
355     match event {
356         Event::Task(task) => {
357             on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
358             world_state.maybe_collect_garbage();
359         }
360         Event::Vfs(task) => {
361             world_state.vfs.write().handle_task(task);
362         }
363         Event::Lib(lib) => {
364             world_state.add_lib(lib);
365             world_state.maybe_collect_garbage();
366             loop_state.in_flight_libraries -= 1;
367         }
368         Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?,
369         Event::Msg(msg) => match msg {
370             Message::Request(req) => on_request(
371                 world_state,
372                 &mut loop_state.pending_requests,
373                 pool,
374                 task_sender,
375                 &connection.sender,
376                 loop_start,
377                 req,
378             )?,
379             Message::Notification(not) => {
380                 on_notification(
381                     &connection.sender,
382                     world_state,
383                     &mut loop_state.pending_requests,
384                     &mut loop_state.subscriptions,
385                     not,
386                 )?;
387             }
388             Message::Response(resp) => {
389                 let removed = loop_state.pending_responses.remove(&resp.id);
390                 if !removed {
391                     log::error!("unexpected response: {:?}", resp)
392                 }
393             }
394         },
395     };
396
397     let mut state_changed = false;
398     if let Some(changes) = world_state.process_changes() {
399         state_changed = true;
400         loop_state.pending_libraries.extend(changes);
401     }
402
403     let max_in_flight_libs = pool.max_count().saturating_sub(2).max(1);
404     while loop_state.in_flight_libraries < max_in_flight_libs
405         && !loop_state.pending_libraries.is_empty()
406     {
407         let (root, files) = loop_state.pending_libraries.pop().unwrap();
408         loop_state.in_flight_libraries += 1;
409         let sender = libdata_sender.clone();
410         pool.execute(move || {
411             log::info!("indexing {:?} ... ", root);
412             let data = LibraryData::prepare(root, files);
413             sender.send(data).unwrap();
414         });
415     }
416
417     if !loop_state.workspace_loaded
418         && world_state.roots_to_scan == 0
419         && loop_state.pending_libraries.is_empty()
420         && loop_state.in_flight_libraries == 0
421     {
422         loop_state.workspace_loaded = true;
423         let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum();
424         if world_state.feature_flags().get("notifications.workspace-loaded") {
425             let msg = format!("workspace loaded, {} rust packages", n_packages);
426             show_message(req::MessageType::Info, msg, &connection.sender);
427         }
428         world_state.check_watcher.update();
429     }
430
431     if state_changed {
432         update_file_notifications_on_threadpool(
433             pool,
434             world_state.snapshot(),
435             world_state.options.publish_decorations,
436             task_sender.clone(),
437             loop_state.subscriptions.subscriptions(),
438         )
439     }
440
441     let loop_duration = loop_start.elapsed();
442     if loop_duration > Duration::from_millis(100) {
443         log::error!("overly long loop turn: {:?}", loop_duration);
444         if env::var("RA_PROFILE").is_ok() {
445             show_message(
446                 req::MessageType::Error,
447                 format!("overly long loop turn: {:?}", loop_duration),
448                 &connection.sender,
449             );
450         }
451     }
452
453     Ok(())
454 }
455
456 fn on_task(
457     task: Task,
458     msg_sender: &Sender<Message>,
459     pending_requests: &mut PendingRequests,
460     state: &mut WorldState,
461 ) {
462     match task {
463         Task::Respond(response) => {
464             if let Some(completed) = pending_requests.finish(&response.id) {
465                 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
466                 state.complete_request(completed);
467                 msg_sender.send(response.into()).unwrap();
468             }
469         }
470         Task::Notify(n) => {
471             msg_sender.send(n.into()).unwrap();
472         }
473         Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state),
474     }
475 }
476
477 fn on_request(
478     world: &mut WorldState,
479     pending_requests: &mut PendingRequests,
480     pool: &ThreadPool,
481     task_sender: &Sender<Task>,
482     msg_sender: &Sender<Message>,
483     request_received: Instant,
484     req: Request,
485 ) -> Result<()> {
486     let mut pool_dispatcher = PoolDispatcher {
487         req: Some(req),
488         pool,
489         world,
490         task_sender,
491         msg_sender,
492         pending_requests,
493         request_received,
494     };
495     pool_dispatcher
496         .on_sync::<req::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
497         .on_sync::<req::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
498         .on_sync::<req::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
499         .on_sync::<req::SelectionRangeRequest>(|s, p| {
500             handlers::handle_selection_range(s.snapshot(), p)
501         })?
502         .on_sync::<req::FindMatchingBrace>(|s, p| {
503             handlers::handle_find_matching_brace(s.snapshot(), p)
504         })?
505         .on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)?
506         .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
507         .on::<req::ExpandMacro>(handlers::handle_expand_macro)?
508         .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)?
509         .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)?
510         .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
511         .on::<req::GotoDefinition>(handlers::handle_goto_definition)?
512         .on::<req::GotoImplementation>(handlers::handle_goto_implementation)?
513         .on::<req::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
514         .on::<req::ParentModule>(handlers::handle_parent_module)?
515         .on::<req::Runnables>(handlers::handle_runnables)?
516         .on::<req::DecorationsRequest>(handlers::handle_decorations)?
517         .on::<req::Completion>(handlers::handle_completion)?
518         .on::<req::CodeActionRequest>(handlers::handle_code_action)?
519         .on::<req::CodeLensRequest>(handlers::handle_code_lens)?
520         .on::<req::CodeLensResolve>(handlers::handle_code_lens_resolve)?
521         .on::<req::FoldingRangeRequest>(handlers::handle_folding_range)?
522         .on::<req::SignatureHelpRequest>(handlers::handle_signature_help)?
523         .on::<req::HoverRequest>(handlers::handle_hover)?
524         .on::<req::PrepareRenameRequest>(handlers::handle_prepare_rename)?
525         .on::<req::Rename>(handlers::handle_rename)?
526         .on::<req::References>(handlers::handle_references)?
527         .on::<req::Formatting>(handlers::handle_formatting)?
528         .on::<req::DocumentHighlightRequest>(handlers::handle_document_highlight)?
529         .on::<req::InlayHints>(handlers::handle_inlay_hints)?
530         .on::<req::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
531         .on::<req::CallHierarchyIncomingCalls>(handlers::handle_call_hierarchy_incoming)?
532         .on::<req::CallHierarchyOutgoingCalls>(handlers::handle_call_hierarchy_outgoing)?
533         .on::<req::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
534         .on::<req::SemanticTokensRangeRequest>(handlers::handle_semantic_tokens_range)?
535         .on::<req::Ssr>(handlers::handle_ssr)?
536         .finish();
537     Ok(())
538 }
539
540 fn on_notification(
541     msg_sender: &Sender<Message>,
542     state: &mut WorldState,
543     pending_requests: &mut PendingRequests,
544     subs: &mut Subscriptions,
545     not: Notification,
546 ) -> Result<()> {
547     let not = match notification_cast::<req::Cancel>(not) {
548         Ok(params) => {
549             let id: RequestId = match params.id {
550                 NumberOrString::Number(id) => id.into(),
551                 NumberOrString::String(id) => id.into(),
552             };
553             if pending_requests.cancel(&id) {
554                 let response = Response::new_err(
555                     id,
556                     ErrorCode::RequestCanceled as i32,
557                     "canceled by client".to_string(),
558                 );
559                 msg_sender.send(response.into()).unwrap()
560             }
561             return Ok(());
562         }
563         Err(not) => not,
564     };
565     let not = match notification_cast::<req::DidOpenTextDocument>(not) {
566         Ok(params) => {
567             let uri = params.text_document.uri;
568             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
569             if let Some(file_id) =
570                 state.vfs.write().add_file_overlay(&path, params.text_document.text)
571             {
572                 subs.add_sub(FileId(file_id.0));
573             }
574             return Ok(());
575         }
576         Err(not) => not,
577     };
578     let not = match notification_cast::<req::DidChangeTextDocument>(not) {
579         Ok(mut params) => {
580             let uri = params.text_document.uri;
581             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
582             let text =
583                 params.content_changes.pop().ok_or_else(|| "empty changes".to_string())?.text;
584             state.vfs.write().change_file_overlay(path.as_path(), text);
585             return Ok(());
586         }
587         Err(not) => not,
588     };
589     let not = match notification_cast::<req::DidSaveTextDocument>(not) {
590         Ok(_params) => {
591             state.check_watcher.update();
592             return Ok(());
593         }
594         Err(not) => not,
595     };
596     let not = match notification_cast::<req::DidCloseTextDocument>(not) {
597         Ok(params) => {
598             let uri = params.text_document.uri;
599             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
600             if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
601                 subs.remove_sub(FileId(file_id.0));
602             }
603             let params =
604                 req::PublishDiagnosticsParams { uri, diagnostics: Vec::new(), version: None };
605             let not = notification_new::<req::PublishDiagnostics>(params);
606             msg_sender.send(not.into()).unwrap();
607             return Ok(());
608         }
609         Err(not) => not,
610     };
611     let not = match notification_cast::<req::DidChangeConfiguration>(not) {
612         Ok(_params) => {
613             return Ok(());
614         }
615         Err(not) => not,
616     };
617     let not = match notification_cast::<req::DidChangeWatchedFiles>(not) {
618         Ok(params) => {
619             let mut vfs = state.vfs.write();
620             for change in params.changes {
621                 let uri = change.uri;
622                 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
623                 vfs.notify_changed(path)
624             }
625             return Ok(());
626         }
627         Err(not) => not,
628     };
629     log::error!("unhandled notification: {:?}", not);
630     Ok(())
631 }
632
633 fn on_check_task(
634     task: CheckTask,
635     world_state: &mut WorldState,
636     task_sender: &Sender<Task>,
637 ) -> Result<()> {
638     match task {
639         CheckTask::ClearDiagnostics => {
640             task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?;
641         }
642
643         CheckTask::AddDiagnostic { url, diagnostic, fixes } => {
644             let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
645             let file_id = match world_state.vfs.read().path2file(&path) {
646                 Some(file) => FileId(file.0),
647                 None => {
648                     log::error!("File with cargo diagnostic not found in VFS: {}", path.display());
649                     return Ok(());
650                 }
651             };
652
653             task_sender
654                 .send(Task::Diagnostic(DiagnosticTask::AddCheck(file_id, diagnostic, fixes)))?;
655         }
656
657         CheckTask::Status(progress) => {
658             let params = req::ProgressParams {
659                 token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
660                 value: req::ProgressParamsValue::WorkDone(progress),
661             };
662             let not = notification_new::<req::Progress>(params);
663             task_sender.send(Task::Notify(not)).unwrap();
664         }
665     };
666
667     Ok(())
668 }
669
670 fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut WorldState) {
671     let subscriptions = state.diagnostics.handle_task(task);
672
673     for file_id in subscriptions {
674         let path = state.vfs.read().file2path(VfsFile(file_id.0));
675         let uri = match url_from_path_with_drive_lowercasing(&path) {
676             Ok(uri) => uri,
677             Err(err) => {
678                 log::error!("Couldn't convert path to url ({}): {:?}", err, path.to_string_lossy());
679                 continue;
680             }
681         };
682
683         let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect();
684         let params = req::PublishDiagnosticsParams { uri, diagnostics, version: None };
685         let not = notification_new::<req::PublishDiagnostics>(params);
686         msg_sender.send(not.into()).unwrap();
687     }
688 }
689
690 struct PoolDispatcher<'a> {
691     req: Option<Request>,
692     pool: &'a ThreadPool,
693     world: &'a mut WorldState,
694     pending_requests: &'a mut PendingRequests,
695     msg_sender: &'a Sender<Message>,
696     task_sender: &'a Sender<Task>,
697     request_received: Instant,
698 }
699
700 impl<'a> PoolDispatcher<'a> {
701     /// Dispatches the request onto the current thread
702     fn on_sync<R>(
703         &mut self,
704         f: fn(&mut WorldState, R::Params) -> Result<R::Result>,
705     ) -> Result<&mut Self>
706     where
707         R: req::Request + 'static,
708         R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
709         R::Result: Serialize + 'static,
710     {
711         let (id, params) = match self.parse::<R>() {
712             Some(it) => it,
713             None => {
714                 return Ok(self);
715             }
716         };
717         let world = panic::AssertUnwindSafe(&mut *self.world);
718         let task = panic::catch_unwind(move || {
719             let result = f(world.0, params);
720             result_to_task::<R>(id, result)
721         })
722         .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
723         on_task(task, self.msg_sender, self.pending_requests, self.world);
724         Ok(self)
725     }
726
727     /// Dispatches the request onto thread pool
728     fn on<R>(&mut self, f: fn(WorldSnapshot, R::Params) -> Result<R::Result>) -> Result<&mut Self>
729     where
730         R: req::Request + 'static,
731         R::Params: DeserializeOwned + Send + 'static,
732         R::Result: Serialize + 'static,
733     {
734         let (id, params) = match self.parse::<R>() {
735             Some(it) => it,
736             None => {
737                 return Ok(self);
738             }
739         };
740
741         self.pool.execute({
742             let world = self.world.snapshot();
743             let sender = self.task_sender.clone();
744             move || {
745                 let result = f(world, params);
746                 let task = result_to_task::<R>(id, result);
747                 sender.send(task).unwrap();
748             }
749         });
750
751         Ok(self)
752     }
753
754     fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
755     where
756         R: req::Request + 'static,
757         R::Params: DeserializeOwned + 'static,
758     {
759         let req = self.req.take()?;
760         let (id, params) = match req.extract::<R::Params>(R::METHOD) {
761             Ok(it) => it,
762             Err(req) => {
763                 self.req = Some(req);
764                 return None;
765             }
766         };
767         self.pending_requests.start(PendingRequest {
768             id: id.clone(),
769             method: R::METHOD.to_string(),
770             received: self.request_received,
771         });
772         Some((id, params))
773     }
774
775     fn finish(&mut self) {
776         match self.req.take() {
777             None => (),
778             Some(req) => {
779                 log::error!("unknown request: {:?}", req);
780                 let resp = Response::new_err(
781                     req.id,
782                     ErrorCode::MethodNotFound as i32,
783                     "unknown request".to_string(),
784                 );
785                 self.msg_sender.send(resp.into()).unwrap();
786             }
787         }
788     }
789 }
790
791 fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
792 where
793     R: req::Request + 'static,
794     R::Params: DeserializeOwned + 'static,
795     R::Result: Serialize + 'static,
796 {
797     let response = match result {
798         Ok(resp) => Response::new_ok(id, &resp),
799         Err(e) => match e.downcast::<LspError>() {
800             Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
801             Err(e) => {
802                 if is_canceled(&e) {
803                     Response::new_err(
804                         id,
805                         ErrorCode::ContentModified as i32,
806                         "content modified".to_string(),
807                     )
808                 } else {
809                     Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
810                 }
811             }
812         },
813     };
814     Task::Respond(response)
815 }
816
817 fn update_file_notifications_on_threadpool(
818     pool: &ThreadPool,
819     world: WorldSnapshot,
820     publish_decorations: bool,
821     task_sender: Sender<Task>,
822     subscriptions: Vec<FileId>,
823 ) {
824     log::trace!("updating notifications for {:?}", subscriptions);
825     let publish_diagnostics = world.feature_flags().get("lsp.diagnostics");
826     pool.execute(move || {
827         for file_id in subscriptions {
828             if publish_diagnostics {
829                 match handlers::publish_diagnostics(&world, file_id) {
830                     Err(e) => {
831                         if !is_canceled(&e) {
832                             log::error!("failed to compute diagnostics: {:?}", e);
833                         }
834                     }
835                     Ok(task) => {
836                         task_sender.send(Task::Diagnostic(task)).unwrap();
837                     }
838                 }
839             }
840             if publish_decorations {
841                 match handlers::publish_decorations(&world, file_id) {
842                     Err(e) => {
843                         if !is_canceled(&e) {
844                             log::error!("failed to compute decorations: {:?}", e);
845                         }
846                     }
847                     Ok(params) => {
848                         let not = notification_new::<req::PublishDecorations>(params);
849                         task_sender.send(Task::Notify(not)).unwrap();
850                     }
851                 }
852             }
853         }
854     });
855 }
856
857 pub fn show_message(typ: req::MessageType, message: impl Into<String>, sender: &Sender<Message>) {
858     let message = message.into();
859     let params = req::ShowMessageParams { typ, message };
860     let not = notification_new::<req::ShowMessage>(params);
861     sender.send(not.into()).unwrap();
862 }
863
864 fn is_canceled(e: &Box<dyn std::error::Error + Send + Sync>) -> bool {
865     e.downcast_ref::<Canceled>().is_some()
866 }
867
868 fn notification_is<N: lsp_types::notification::Notification>(notification: &Notification) -> bool {
869     notification.method == N::METHOD
870 }
871
872 fn notification_cast<N>(notification: Notification) -> std::result::Result<N::Params, Notification>
873 where
874     N: lsp_types::notification::Notification,
875     N::Params: DeserializeOwned,
876 {
877     notification.extract(N::METHOD)
878 }
879
880 fn notification_new<N>(params: N::Params) -> Notification
881 where
882     N: lsp_types::notification::Notification,
883     N::Params: Serialize,
884 {
885     Notification::new(N::METHOD.to_string(), params)
886 }
887
888 fn request_new<R>(id: RequestId, params: R::Params) -> Request
889 where
890     R: lsp_types::request::Request,
891     R::Params: Serialize,
892 {
893     Request::new(id, R::METHOD.to_string(), params)
894 }