]> git.lizzy.rs Git - rust.git/blob - crates/rust-analyzer/src/main_loop.rs
95e676e0f45cb0ad3838e54996b0d53ee5d3d09c
[rust.git] / crates / rust-analyzer / src / main_loop.rs
1 //! The main loop of `rust-analyzer` responsible for dispatching LSP
2 //! requests/replies and notifications back to the client.
3
4 mod handlers;
5 mod subscriptions;
6 pub(crate) mod pending_requests;
7
8 use std::{
9     env,
10     error::Error,
11     fmt, panic,
12     path::PathBuf,
13     sync::Arc,
14     time::{Duration, Instant},
15 };
16
17 use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
18 use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
19 use lsp_types::{
20     NumberOrString, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressCreateParams,
21     WorkDoneProgressEnd, WorkDoneProgressReport,
22 };
23 use ra_flycheck::{url_from_path_with_drive_lowercasing, CheckTask};
24 use ra_ide::{Canceled, FileId, LibraryData, SourceRootId};
25 use ra_prof::profile;
26 use ra_vfs::{VfsFile, VfsTask, Watch};
27 use relative_path::RelativePathBuf;
28 use rustc_hash::FxHashSet;
29 use serde::{de::DeserializeOwned, Serialize};
30 use threadpool::ThreadPool;
31
32 use crate::{
33     config::{Config, FilesWatcher},
34     diagnostics::DiagnosticTask,
35     main_loop::{
36         pending_requests::{PendingRequest, PendingRequests},
37         subscriptions::Subscriptions,
38     },
39     req,
40     world::{WorldSnapshot, WorldState},
41     Result,
42 };
43
44 #[derive(Debug)]
45 pub struct LspError {
46     pub code: i32,
47     pub message: String,
48 }
49
50 impl LspError {
51     pub const UNKNOWN_FILE: i32 = -32900;
52
53     pub fn new(code: i32, message: String) -> LspError {
54         LspError { code, message }
55     }
56 }
57
58 impl fmt::Display for LspError {
59     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
60         write!(f, "Language Server request failed with {}. ({})", self.code, self.message)
61     }
62 }
63
64 impl Error for LspError {}
65
66 pub fn main_loop(ws_roots: Vec<PathBuf>, config: Config, connection: Connection) -> Result<()> {
67     log::info!("initial config: {:#?}", config);
68
69     // Windows scheduler implements priority boosts: if thread waits for an
70     // event (like a condvar), and event fires, priority of the thread is
71     // temporary bumped. This optimization backfires in our case: each time the
72     // `main_loop` schedules a task to run on a threadpool, the worker threads
73     // gets a higher priority, and (on a machine with fewer cores) displaces the
74     // main loop! We work-around this by marking the main loop as a
75     // higher-priority thread.
76     //
77     // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
78     // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
79     // https://github.com/rust-analyzer/rust-analyzer/issues/2835
80     #[cfg(windows)]
81     unsafe {
82         use winapi::um::processthreadsapi::*;
83         let thread = GetCurrentThread();
84         let thread_priority_above_normal = 1;
85         SetThreadPriority(thread, thread_priority_above_normal);
86     }
87
88     let mut loop_state = LoopState::default();
89     let mut world_state = {
90         // FIXME: support dynamic workspace loading.
91         let workspaces = {
92             let mut loaded_workspaces = Vec::new();
93             for ws_root in &ws_roots {
94                 let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot(
95                     ws_root.as_path(),
96                     config.with_sysroot,
97                     &config.cargo,
98                 );
99                 match workspace {
100                     Ok(workspace) => loaded_workspaces.push(workspace),
101                     Err(e) => {
102                         log::error!("loading workspace failed: {:?}", e);
103
104                         if let Some(ra_project_model::CargoTomlNotFoundError { .. }) =
105                             e.downcast_ref()
106                         {
107                             if !config.notifications.cargo_toml_not_found {
108                                 continue;
109                             }
110                         }
111
112                         show_message(
113                             req::MessageType::Error,
114                             format!("rust-analyzer failed to load workspace: {:?}", e),
115                             &connection.sender,
116                         );
117                     }
118                 }
119             }
120             loaded_workspaces
121         };
122
123         let globs = config
124             .files
125             .exclude
126             .iter()
127             .map(|glob| crate::vfs_glob::Glob::new(glob))
128             .collect::<std::result::Result<Vec<_>, _>>()?;
129
130         if let FilesWatcher::Client = config.files.watcher {
131             let registration_options = req::DidChangeWatchedFilesRegistrationOptions {
132                 watchers: workspaces
133                     .iter()
134                     .flat_map(|ws| ws.to_roots())
135                     .filter(|root| root.is_member())
136                     .map(|root| format!("{}/**/*.rs", root.path().display()))
137                     .map(|glob_pattern| req::FileSystemWatcher { glob_pattern, kind: None })
138                     .collect(),
139             };
140             let registration = req::Registration {
141                 id: "file-watcher".to_string(),
142                 method: "workspace/didChangeWatchedFiles".to_string(),
143                 register_options: Some(serde_json::to_value(registration_options).unwrap()),
144             };
145             let params = req::RegistrationParams { registrations: vec![registration] };
146             let request =
147                 request_new::<req::RegisterCapability>(loop_state.next_request_id(), params);
148             connection.sender.send(request.into()).unwrap();
149         }
150
151         WorldState::new(
152             ws_roots,
153             workspaces,
154             config.lru_capacity,
155             &globs,
156             Watch(matches!(config.files.watcher, FilesWatcher::Notify)),
157             config,
158         )
159     };
160
161     loop_state.roots_total = world_state.vfs.read().n_roots();
162     loop_state.roots_scanned = 0;
163
164     let pool = ThreadPool::default();
165     let (task_sender, task_receiver) = unbounded::<Task>();
166     let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
167
168     log::info!("server initialized, serving requests");
169     {
170         let task_sender = task_sender;
171         let libdata_sender = libdata_sender;
172         loop {
173             log::trace!("selecting");
174             let event = select! {
175                 recv(&connection.receiver) -> msg => match msg {
176                     Ok(msg) => Event::Msg(msg),
177                     Err(RecvError) => return Err("client exited without shutdown".into()),
178                 },
179                 recv(task_receiver) -> task => Event::Task(task.unwrap()),
180                 recv(world_state.task_receiver) -> task => match task {
181                     Ok(task) => Event::Vfs(task),
182                     Err(RecvError) => return Err("vfs died".into()),
183                 },
184                 recv(libdata_receiver) -> data => Event::Lib(data.unwrap()),
185                 recv(world_state.flycheck.as_ref().map_or(&never(), |it| &it.task_recv)) -> task => match task {
186                     Ok(task) => Event::CheckWatcher(task),
187                     Err(RecvError) => return Err("check watcher died".into()),
188                 }
189             };
190             if let Event::Msg(Message::Request(req)) = &event {
191                 if connection.handle_shutdown(&req)? {
192                     break;
193                 };
194             }
195             loop_turn(
196                 &pool,
197                 &task_sender,
198                 &libdata_sender,
199                 &connection,
200                 &mut world_state,
201                 &mut loop_state,
202                 event,
203             )?;
204         }
205     }
206     world_state.analysis_host.request_cancellation();
207     log::info!("waiting for tasks to finish...");
208     task_receiver.into_iter().for_each(|task| {
209         on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
210     });
211     libdata_receiver.into_iter().for_each(drop);
212     log::info!("...tasks have finished");
213     log::info!("joining threadpool...");
214     drop(pool);
215     log::info!("...threadpool has finished");
216
217     let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
218     drop(vfs);
219
220     Ok(())
221 }
222
223 #[derive(Debug)]
224 enum Task {
225     Respond(Response),
226     Notify(Notification),
227     Diagnostic(DiagnosticTask),
228 }
229
230 enum Event {
231     Msg(Message),
232     Task(Task),
233     Vfs(VfsTask),
234     Lib(LibraryData),
235     CheckWatcher(CheckTask),
236 }
237
238 impl fmt::Debug for Event {
239     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
240         let debug_verbose_not = |not: &Notification, f: &mut fmt::Formatter| {
241             f.debug_struct("Notification").field("method", &not.method).finish()
242         };
243
244         match self {
245             Event::Msg(Message::Notification(not)) => {
246                 if notification_is::<req::DidOpenTextDocument>(not)
247                     || notification_is::<req::DidChangeTextDocument>(not)
248                 {
249                     return debug_verbose_not(not, f);
250                 }
251             }
252             Event::Task(Task::Notify(not)) => {
253                 if notification_is::<req::PublishDiagnostics>(not) {
254                     return debug_verbose_not(not, f);
255                 }
256             }
257             Event::Task(Task::Respond(resp)) => {
258                 return f
259                     .debug_struct("Response")
260                     .field("id", &resp.id)
261                     .field("error", &resp.error)
262                     .finish();
263             }
264             _ => (),
265         }
266         match self {
267             Event::Msg(it) => fmt::Debug::fmt(it, f),
268             Event::Task(it) => fmt::Debug::fmt(it, f),
269             Event::Vfs(it) => fmt::Debug::fmt(it, f),
270             Event::Lib(it) => fmt::Debug::fmt(it, f),
271             Event::CheckWatcher(it) => fmt::Debug::fmt(it, f),
272         }
273     }
274 }
275
276 #[derive(Debug, Default)]
277 struct LoopState {
278     next_request_id: u64,
279     pending_responses: FxHashSet<RequestId>,
280     pending_requests: PendingRequests,
281     subscriptions: Subscriptions,
282     // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
283     // time to always have a thread ready to react to input.
284     in_flight_libraries: usize,
285     pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>,
286     workspace_loaded: bool,
287     roots_progress_reported: Option<usize>,
288     roots_scanned: usize,
289     roots_total: usize,
290     configuration_request_id: Option<RequestId>,
291 }
292
293 impl LoopState {
294     fn next_request_id(&mut self) -> RequestId {
295         self.next_request_id += 1;
296         let res: RequestId = self.next_request_id.into();
297         let inserted = self.pending_responses.insert(res.clone());
298         assert!(inserted);
299         res
300     }
301 }
302
303 fn loop_turn(
304     pool: &ThreadPool,
305     task_sender: &Sender<Task>,
306     libdata_sender: &Sender<LibraryData>,
307     connection: &Connection,
308     world_state: &mut WorldState,
309     loop_state: &mut LoopState,
310     event: Event,
311 ) -> Result<()> {
312     let loop_start = Instant::now();
313
314     // NOTE: don't count blocking select! call as a loop-turn time
315     let _p = profile("main_loop_inner/loop-turn");
316     log::info!("loop turn = {:?}", event);
317     let queue_count = pool.queued_count();
318     if queue_count > 0 {
319         log::info!("queued count = {}", queue_count);
320     }
321
322     match event {
323         Event::Task(task) => {
324             on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
325             world_state.maybe_collect_garbage();
326         }
327         Event::Vfs(task) => {
328             world_state.vfs.write().handle_task(task);
329         }
330         Event::Lib(lib) => {
331             world_state.add_lib(lib);
332             world_state.maybe_collect_garbage();
333             loop_state.in_flight_libraries -= 1;
334             loop_state.roots_scanned += 1;
335         }
336         Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?,
337         Event::Msg(msg) => match msg {
338             Message::Request(req) => on_request(
339                 world_state,
340                 &mut loop_state.pending_requests,
341                 pool,
342                 task_sender,
343                 &connection.sender,
344                 loop_start,
345                 req,
346             )?,
347             Message::Notification(not) => {
348                 on_notification(&connection.sender, world_state, loop_state, not)?;
349             }
350             Message::Response(resp) => {
351                 let removed = loop_state.pending_responses.remove(&resp.id);
352                 if !removed {
353                     log::error!("unexpected response: {:?}", resp)
354                 }
355
356                 if Some(&resp.id) == loop_state.configuration_request_id.as_ref() {
357                     loop_state.configuration_request_id = None;
358                     log::debug!("config update response: '{:?}", resp);
359                     let Response { error, result, .. } = resp;
360
361                     match (error, result) {
362                         (Some(err), _) => {
363                             log::error!("failed to fetch the server settings: {:?}", err)
364                         }
365                         (None, Some(configs)) => {
366                             if let Some(new_config) = configs.get(0) {
367                                 let mut config = world_state.config.clone();
368                                 config.update(&new_config);
369                                 world_state.update_configuration(config);
370                             }
371                         }
372                         (None, None) => {
373                             log::error!("received empty server settings response from the client")
374                         }
375                     }
376                 }
377             }
378         },
379     };
380
381     let mut state_changed = false;
382     if let Some(changes) = world_state.process_changes(&mut loop_state.roots_scanned) {
383         state_changed = true;
384         loop_state.pending_libraries.extend(changes);
385     }
386
387     let max_in_flight_libs = pool.max_count().saturating_sub(2).max(1);
388     while loop_state.in_flight_libraries < max_in_flight_libs
389         && !loop_state.pending_libraries.is_empty()
390     {
391         let (root, files) = loop_state.pending_libraries.pop().unwrap();
392         loop_state.in_flight_libraries += 1;
393         let sender = libdata_sender.clone();
394         pool.execute(move || {
395             log::info!("indexing {:?} ... ", root);
396             let data = LibraryData::prepare(root, files);
397             sender.send(data).unwrap();
398         });
399     }
400
401     let show_progress =
402         !loop_state.workspace_loaded && world_state.config.notifications.workspace_loaded;
403
404     if !loop_state.workspace_loaded
405         && loop_state.roots_scanned == loop_state.roots_total
406         && loop_state.pending_libraries.is_empty()
407         && loop_state.in_flight_libraries == 0
408     {
409         loop_state.workspace_loaded = true;
410         if let Some(flycheck) = &world_state.flycheck {
411             flycheck.update();
412         }
413         pool.execute({
414             let subs = loop_state.subscriptions.subscriptions();
415             let snap = world_state.snapshot();
416             move || snap.analysis().prime_caches(subs).unwrap_or_else(|_: Canceled| ())
417         });
418     }
419
420     if show_progress {
421         send_startup_progress(&connection.sender, loop_state);
422     }
423
424     if state_changed {
425         update_file_notifications_on_threadpool(
426             pool,
427             world_state.snapshot(),
428             task_sender.clone(),
429             loop_state.subscriptions.subscriptions(),
430         )
431     }
432
433     let loop_duration = loop_start.elapsed();
434     if loop_duration > Duration::from_millis(100) {
435         log::error!("overly long loop turn: {:?}", loop_duration);
436         if env::var("RA_PROFILE").is_ok() {
437             show_message(
438                 req::MessageType::Error,
439                 format!("overly long loop turn: {:?}", loop_duration),
440                 &connection.sender,
441             );
442         }
443     }
444
445     Ok(())
446 }
447
448 fn on_task(
449     task: Task,
450     msg_sender: &Sender<Message>,
451     pending_requests: &mut PendingRequests,
452     state: &mut WorldState,
453 ) {
454     match task {
455         Task::Respond(response) => {
456             if let Some(completed) = pending_requests.finish(&response.id) {
457                 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
458                 state.complete_request(completed);
459                 msg_sender.send(response.into()).unwrap();
460             }
461         }
462         Task::Notify(n) => {
463             msg_sender.send(n.into()).unwrap();
464         }
465         Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state),
466     }
467 }
468
469 fn on_request(
470     world: &mut WorldState,
471     pending_requests: &mut PendingRequests,
472     pool: &ThreadPool,
473     task_sender: &Sender<Task>,
474     msg_sender: &Sender<Message>,
475     request_received: Instant,
476     req: Request,
477 ) -> Result<()> {
478     let mut pool_dispatcher = PoolDispatcher {
479         req: Some(req),
480         pool,
481         world,
482         task_sender,
483         msg_sender,
484         pending_requests,
485         request_received,
486     };
487     pool_dispatcher
488         .on_sync::<req::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
489         .on_sync::<req::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
490         .on_sync::<req::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
491         .on_sync::<req::SelectionRangeRequest>(|s, p| {
492             handlers::handle_selection_range(s.snapshot(), p)
493         })?
494         .on_sync::<req::FindMatchingBrace>(|s, p| {
495             handlers::handle_find_matching_brace(s.snapshot(), p)
496         })?
497         .on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)?
498         .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
499         .on::<req::ExpandMacro>(handlers::handle_expand_macro)?
500         .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)?
501         .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)?
502         .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
503         .on::<req::GotoDefinition>(handlers::handle_goto_definition)?
504         .on::<req::GotoImplementation>(handlers::handle_goto_implementation)?
505         .on::<req::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
506         .on::<req::ParentModule>(handlers::handle_parent_module)?
507         .on::<req::Runnables>(handlers::handle_runnables)?
508         .on::<req::Completion>(handlers::handle_completion)?
509         .on::<req::CodeActionRequest>(handlers::handle_code_action)?
510         .on::<req::CodeLensRequest>(handlers::handle_code_lens)?
511         .on::<req::CodeLensResolve>(handlers::handle_code_lens_resolve)?
512         .on::<req::FoldingRangeRequest>(handlers::handle_folding_range)?
513         .on::<req::SignatureHelpRequest>(handlers::handle_signature_help)?
514         .on::<req::HoverRequest>(handlers::handle_hover)?
515         .on::<req::PrepareRenameRequest>(handlers::handle_prepare_rename)?
516         .on::<req::Rename>(handlers::handle_rename)?
517         .on::<req::References>(handlers::handle_references)?
518         .on::<req::Formatting>(handlers::handle_formatting)?
519         .on::<req::DocumentHighlightRequest>(handlers::handle_document_highlight)?
520         .on::<req::InlayHints>(handlers::handle_inlay_hints)?
521         .on::<req::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
522         .on::<req::CallHierarchyIncomingCalls>(handlers::handle_call_hierarchy_incoming)?
523         .on::<req::CallHierarchyOutgoingCalls>(handlers::handle_call_hierarchy_outgoing)?
524         .on::<req::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
525         .on::<req::SemanticTokensRangeRequest>(handlers::handle_semantic_tokens_range)?
526         .on::<req::Ssr>(handlers::handle_ssr)?
527         .finish();
528     Ok(())
529 }
530
531 fn on_notification(
532     msg_sender: &Sender<Message>,
533     state: &mut WorldState,
534     loop_state: &mut LoopState,
535     not: Notification,
536 ) -> Result<()> {
537     let not = match notification_cast::<req::Cancel>(not) {
538         Ok(params) => {
539             let id: RequestId = match params.id {
540                 NumberOrString::Number(id) => id.into(),
541                 NumberOrString::String(id) => id.into(),
542             };
543             if loop_state.pending_requests.cancel(&id) {
544                 let response = Response::new_err(
545                     id,
546                     ErrorCode::RequestCanceled as i32,
547                     "canceled by client".to_string(),
548                 );
549                 msg_sender.send(response.into()).unwrap()
550             }
551             return Ok(());
552         }
553         Err(not) => not,
554     };
555     let not = match notification_cast::<req::DidOpenTextDocument>(not) {
556         Ok(params) => {
557             let uri = params.text_document.uri;
558             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
559             if let Some(file_id) =
560                 state.vfs.write().add_file_overlay(&path, params.text_document.text)
561             {
562                 loop_state.subscriptions.add_sub(FileId(file_id.0));
563             }
564             return Ok(());
565         }
566         Err(not) => not,
567     };
568     let not = match notification_cast::<req::DidChangeTextDocument>(not) {
569         Ok(mut params) => {
570             let uri = params.text_document.uri;
571             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
572             let text =
573                 params.content_changes.pop().ok_or_else(|| "empty changes".to_string())?.text;
574             state.vfs.write().change_file_overlay(path.as_path(), text);
575             return Ok(());
576         }
577         Err(not) => not,
578     };
579     let not = match notification_cast::<req::DidSaveTextDocument>(not) {
580         Ok(_params) => {
581             if let Some(flycheck) = &state.flycheck {
582                 flycheck.update();
583             }
584             return Ok(());
585         }
586         Err(not) => not,
587     };
588     let not = match notification_cast::<req::DidCloseTextDocument>(not) {
589         Ok(params) => {
590             let uri = params.text_document.uri;
591             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
592             if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
593                 loop_state.subscriptions.remove_sub(FileId(file_id.0));
594             }
595             let params =
596                 req::PublishDiagnosticsParams { uri, diagnostics: Vec::new(), version: None };
597             let not = notification_new::<req::PublishDiagnostics>(params);
598             msg_sender.send(not.into()).unwrap();
599             return Ok(());
600         }
601         Err(not) => not,
602     };
603     let not = match notification_cast::<req::DidChangeConfiguration>(not) {
604         Ok(_) => {
605             // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
606             // this notification's parameters should be ignored and the actual config queried separately.
607             let request_id = loop_state.next_request_id();
608             let request = request_new::<req::WorkspaceConfiguration>(
609                 request_id.clone(),
610                 req::ConfigurationParams {
611                     items: vec![req::ConfigurationItem {
612                         scope_uri: None,
613                         section: Some("rust-analyzer".to_string()),
614                     }],
615                 },
616             );
617             msg_sender.send(request.into())?;
618             loop_state.configuration_request_id = Some(request_id);
619
620             return Ok(());
621         }
622         Err(not) => not,
623     };
624     let not = match notification_cast::<req::DidChangeWatchedFiles>(not) {
625         Ok(params) => {
626             let mut vfs = state.vfs.write();
627             for change in params.changes {
628                 let uri = change.uri;
629                 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
630                 vfs.notify_changed(path)
631             }
632             return Ok(());
633         }
634         Err(not) => not,
635     };
636     if not.method.starts_with("$/") {
637         return Ok(());
638     }
639     log::error!("unhandled notification: {:?}", not);
640     Ok(())
641 }
642
643 fn on_check_task(
644     task: CheckTask,
645     world_state: &mut WorldState,
646     task_sender: &Sender<Task>,
647 ) -> Result<()> {
648     match task {
649         CheckTask::ClearDiagnostics => {
650             task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?;
651         }
652
653         CheckTask::AddDiagnostic { url, diagnostic, fixes } => {
654             let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
655             let file_id = match world_state.vfs.read().path2file(&path) {
656                 Some(file) => FileId(file.0),
657                 None => {
658                     log::error!("File with cargo diagnostic not found in VFS: {}", path.display());
659                     return Ok(());
660                 }
661             };
662
663             task_sender
664                 .send(Task::Diagnostic(DiagnosticTask::AddCheck(file_id, diagnostic, fixes)))?;
665         }
666
667         CheckTask::Status(progress) => {
668             let params = req::ProgressParams {
669                 token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
670                 value: req::ProgressParamsValue::WorkDone(progress),
671             };
672             let not = notification_new::<req::Progress>(params);
673             task_sender.send(Task::Notify(not)).unwrap();
674         }
675     };
676
677     Ok(())
678 }
679
680 fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut WorldState) {
681     let subscriptions = state.diagnostics.handle_task(task);
682
683     for file_id in subscriptions {
684         let path = state.vfs.read().file2path(VfsFile(file_id.0));
685         let uri = match url_from_path_with_drive_lowercasing(&path) {
686             Ok(uri) => uri,
687             Err(err) => {
688                 log::error!("Couldn't convert path to url ({}): {:?}", err, path.to_string_lossy());
689                 continue;
690             }
691         };
692
693         let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect();
694         let params = req::PublishDiagnosticsParams { uri, diagnostics, version: None };
695         let not = notification_new::<req::PublishDiagnostics>(params);
696         msg_sender.send(not.into()).unwrap();
697     }
698 }
699
700 fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
701     let total: usize = loop_state.roots_total;
702     let prev = loop_state.roots_progress_reported;
703     let progress = loop_state.roots_scanned;
704     loop_state.roots_progress_reported = Some(progress);
705
706     match (prev, loop_state.workspace_loaded) {
707         (None, false) => {
708             let work_done_progress_create = request_new::<req::WorkDoneProgressCreate>(
709                 loop_state.next_request_id(),
710                 WorkDoneProgressCreateParams {
711                     token: req::ProgressToken::String("rustAnalyzer/startup".into()),
712                 },
713             );
714             sender.send(work_done_progress_create.into()).unwrap();
715             send_startup_progress_notif(
716                 sender,
717                 WorkDoneProgress::Begin(WorkDoneProgressBegin {
718                     title: "rust-analyzer".into(),
719                     cancellable: None,
720                     message: Some(format!("{}/{} packages", progress, total)),
721                     percentage: Some(100.0 * progress as f64 / total as f64),
722                 }),
723             );
724         }
725         (Some(prev), false) if progress != prev => send_startup_progress_notif(
726             sender,
727             WorkDoneProgress::Report(WorkDoneProgressReport {
728                 cancellable: None,
729                 message: Some(format!("{}/{} packages", progress, total)),
730                 percentage: Some(100.0 * progress as f64 / total as f64),
731             }),
732         ),
733         (_, true) => send_startup_progress_notif(
734             sender,
735             WorkDoneProgress::End(WorkDoneProgressEnd {
736                 message: Some(format!("rust-analyzer loaded, {} packages", progress)),
737             }),
738         ),
739         _ => {}
740     }
741
742     fn send_startup_progress_notif(sender: &Sender<Message>, work_done_progress: WorkDoneProgress) {
743         let notif = notification_new::<req::Progress>(req::ProgressParams {
744             token: req::ProgressToken::String("rustAnalyzer/startup".into()),
745             value: req::ProgressParamsValue::WorkDone(work_done_progress),
746         });
747         sender.send(notif.into()).unwrap();
748     }
749 }
750
751 struct PoolDispatcher<'a> {
752     req: Option<Request>,
753     pool: &'a ThreadPool,
754     world: &'a mut WorldState,
755     pending_requests: &'a mut PendingRequests,
756     msg_sender: &'a Sender<Message>,
757     task_sender: &'a Sender<Task>,
758     request_received: Instant,
759 }
760
761 impl<'a> PoolDispatcher<'a> {
762     /// Dispatches the request onto the current thread
763     fn on_sync<R>(
764         &mut self,
765         f: fn(&mut WorldState, R::Params) -> Result<R::Result>,
766     ) -> Result<&mut Self>
767     where
768         R: req::Request + 'static,
769         R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
770         R::Result: Serialize + 'static,
771     {
772         let (id, params) = match self.parse::<R>() {
773             Some(it) => it,
774             None => {
775                 return Ok(self);
776             }
777         };
778         let world = panic::AssertUnwindSafe(&mut *self.world);
779         let task = panic::catch_unwind(move || {
780             let result = f(world.0, params);
781             result_to_task::<R>(id, result)
782         })
783         .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
784         on_task(task, self.msg_sender, self.pending_requests, self.world);
785         Ok(self)
786     }
787
788     /// Dispatches the request onto thread pool
789     fn on<R>(&mut self, f: fn(WorldSnapshot, R::Params) -> Result<R::Result>) -> Result<&mut Self>
790     where
791         R: req::Request + 'static,
792         R::Params: DeserializeOwned + Send + 'static,
793         R::Result: Serialize + 'static,
794     {
795         let (id, params) = match self.parse::<R>() {
796             Some(it) => it,
797             None => {
798                 return Ok(self);
799             }
800         };
801
802         self.pool.execute({
803             let world = self.world.snapshot();
804             let sender = self.task_sender.clone();
805             move || {
806                 let result = f(world, params);
807                 let task = result_to_task::<R>(id, result);
808                 sender.send(task).unwrap();
809             }
810         });
811
812         Ok(self)
813     }
814
815     fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
816     where
817         R: req::Request + 'static,
818         R::Params: DeserializeOwned + 'static,
819     {
820         let req = self.req.take()?;
821         let (id, params) = match req.extract::<R::Params>(R::METHOD) {
822             Ok(it) => it,
823             Err(req) => {
824                 self.req = Some(req);
825                 return None;
826             }
827         };
828         self.pending_requests.start(PendingRequest {
829             id: id.clone(),
830             method: R::METHOD.to_string(),
831             received: self.request_received,
832         });
833         Some((id, params))
834     }
835
836     fn finish(&mut self) {
837         match self.req.take() {
838             None => (),
839             Some(req) => {
840                 log::error!("unknown request: {:?}", req);
841                 let resp = Response::new_err(
842                     req.id,
843                     ErrorCode::MethodNotFound as i32,
844                     "unknown request".to_string(),
845                 );
846                 self.msg_sender.send(resp.into()).unwrap();
847             }
848         }
849     }
850 }
851
852 fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
853 where
854     R: req::Request + 'static,
855     R::Params: DeserializeOwned + 'static,
856     R::Result: Serialize + 'static,
857 {
858     let response = match result {
859         Ok(resp) => Response::new_ok(id, &resp),
860         Err(e) => match e.downcast::<LspError>() {
861             Ok(lsp_error) => {
862                 if lsp_error.code == LspError::UNKNOWN_FILE {
863                     // Work-around for https://github.com/rust-analyzer/rust-analyzer/issues/1521
864                     Response::new_ok(id, ())
865                 } else {
866                     Response::new_err(id, lsp_error.code, lsp_error.message)
867                 }
868             }
869             Err(e) => {
870                 if is_canceled(&e) {
871                     Response::new_err(
872                         id,
873                         ErrorCode::ContentModified as i32,
874                         "content modified".to_string(),
875                     )
876                 } else {
877                     Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
878                 }
879             }
880         },
881     };
882     Task::Respond(response)
883 }
884
885 fn update_file_notifications_on_threadpool(
886     pool: &ThreadPool,
887     world: WorldSnapshot,
888     task_sender: Sender<Task>,
889     subscriptions: Vec<FileId>,
890 ) {
891     log::trace!("updating notifications for {:?}", subscriptions);
892     if world.config.publish_diagnostics {
893         pool.execute(move || {
894             for file_id in subscriptions {
895                 match handlers::publish_diagnostics(&world, file_id) {
896                     Err(e) => {
897                         if !is_canceled(&e) {
898                             log::error!("failed to compute diagnostics: {:?}", e);
899                         }
900                     }
901                     Ok(task) => {
902                         task_sender.send(Task::Diagnostic(task)).unwrap();
903                     }
904                 }
905             }
906         })
907     }
908 }
909
910 pub fn show_message(typ: req::MessageType, message: impl Into<String>, sender: &Sender<Message>) {
911     let message = message.into();
912     let params = req::ShowMessageParams { typ, message };
913     let not = notification_new::<req::ShowMessage>(params);
914     sender.send(not.into()).unwrap();
915 }
916
917 fn is_canceled(e: &Box<dyn std::error::Error + Send + Sync>) -> bool {
918     e.downcast_ref::<Canceled>().is_some()
919 }
920
921 fn notification_is<N: lsp_types::notification::Notification>(notification: &Notification) -> bool {
922     notification.method == N::METHOD
923 }
924
925 fn notification_cast<N>(notification: Notification) -> std::result::Result<N::Params, Notification>
926 where
927     N: lsp_types::notification::Notification,
928     N::Params: DeserializeOwned,
929 {
930     notification.extract(N::METHOD)
931 }
932
933 fn notification_new<N>(params: N::Params) -> Notification
934 where
935     N: lsp_types::notification::Notification,
936     N::Params: Serialize,
937 {
938     Notification::new(N::METHOD.to_string(), params)
939 }
940
941 fn request_new<R>(id: RequestId, params: R::Params) -> Request
942 where
943     R: lsp_types::request::Request,
944     R::Params: Serialize,
945 {
946     Request::new(id, R::METHOD.to_string(), params)
947 }