]> git.lizzy.rs Git - rust.git/blob - crates/rust-analyzer/src/main_loop.rs
Merge branch 'Veetaha-feat/sync-branch'
[rust.git] / crates / rust-analyzer / src / main_loop.rs
1 //! The main loop of `rust-analyzer` responsible for dispatching LSP
2 //! requests/replies and notifications back to the client.
3 use std::{
4     env, fmt,
5     ops::Range,
6     panic,
7     sync::Arc,
8     time::{Duration, Instant},
9 };
10
11 use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
12 use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
13 use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent};
14 use ra_db::VfsPath;
15 use ra_flycheck::CheckTask;
16 use ra_ide::{Canceled, FileId, LineIndex};
17 use ra_prof::profile;
18 use ra_project_model::{PackageRoot, ProjectWorkspace};
19 use serde::{de::DeserializeOwned, Serialize};
20 use threadpool::ThreadPool;
21
22 use crate::{
23     config::{Config, FilesWatcher, LinkedProject},
24     diagnostics::DiagnosticTask,
25     from_proto,
26     global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status},
27     handlers, lsp_ext,
28     request_metrics::RequestMetrics,
29     LspError, Result,
30 };
31 pub use lsp_utils::show_message;
32 use lsp_utils::{is_canceled, notification_cast, notification_is, notification_new, request_new};
33 use ra_progress::{
34     IsDone, ProgressStatus, U32Progress, U32ProgressReport, U32ProgressSource, U32ProgressStatus,
35 };
36
37 const FLYCHECK_PROGRESS_TOKEN: &str = "rustAnalyzer/flycheck";
38 const ROOTS_SCANNED_PROGRESS_TOKEN: &str = "rustAnalyzer/rootsScanned";
39
40 pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
41     log::info!("initial config: {:#?}", config);
42
43     // Windows scheduler implements priority boosts: if thread waits for an
44     // event (like a condvar), and event fires, priority of the thread is
45     // temporary bumped. This optimization backfires in our case: each time the
46     // `main_loop` schedules a task to run on a threadpool, the worker threads
47     // gets a higher priority, and (on a machine with fewer cores) displaces the
48     // main loop! We work-around this by marking the main loop as a
49     // higher-priority thread.
50     //
51     // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
52     // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
53     // https://github.com/rust-analyzer/rust-analyzer/issues/2835
54     #[cfg(windows)]
55     unsafe {
56         use winapi::um::processthreadsapi::*;
57         let thread = GetCurrentThread();
58         let thread_priority_above_normal = 1;
59         SetThreadPriority(thread, thread_priority_above_normal);
60     }
61
62     let mut global_state = {
63         let workspaces = {
64             if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
65                 show_message(
66                     lsp_types::MessageType::Error,
67                     "rust-analyzer failed to discover workspace".to_string(),
68                     &connection.sender,
69                 );
70             };
71
72             config
73                 .linked_projects
74                 .iter()
75                 .filter_map(|project| match project {
76                     LinkedProject::ProjectManifest(manifest) => {
77                         ra_project_model::ProjectWorkspace::load(
78                             manifest.clone(),
79                             &config.cargo,
80                             config.with_sysroot,
81                         )
82                         .map_err(|err| {
83                             log::error!("failed to load workspace: {:#}", err);
84                             show_message(
85                                 lsp_types::MessageType::Error,
86                                 format!("rust-analyzer failed to load workspace: {:#}", err),
87                                 &connection.sender,
88                             );
89                         })
90                         .ok()
91                     }
92                     LinkedProject::InlineJsonProject(it) => {
93                         Some(ra_project_model::ProjectWorkspace::Json { project: it.clone() })
94                     }
95                 })
96                 .collect::<Vec<_>>()
97         };
98
99         let mut req_queue = ReqQueue::default();
100
101         if let FilesWatcher::Client = config.files.watcher {
102             let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions {
103                 watchers: workspaces
104                     .iter()
105                     .flat_map(ProjectWorkspace::to_roots)
106                     .filter(PackageRoot::is_member)
107                     .map(|root| format!("{}/**/*.rs", root.path().display()))
108                     .map(|glob_pattern| lsp_types::FileSystemWatcher { glob_pattern, kind: None })
109                     .collect(),
110             };
111             let registration = lsp_types::Registration {
112                 id: "file-watcher".to_string(),
113                 method: "workspace/didChangeWatchedFiles".to_string(),
114                 register_options: Some(serde_json::to_value(registration_options).unwrap()),
115             };
116             let params = lsp_types::RegistrationParams { registrations: vec![registration] };
117             let request = req_queue.outgoing.register(
118                 lsp_types::request::RegisterCapability::METHOD.to_string(),
119                 params,
120                 DO_NOTHING,
121             );
122             connection.sender.send(request.into()).unwrap();
123         }
124
125         GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
126     };
127
128     let pool = ThreadPool::default();
129     let (task_sender, task_receiver) = unbounded::<Task>();
130
131     log::info!("server initialized, serving requests");
132     {
133         let task_sender = task_sender;
134         loop {
135             log::trace!("selecting");
136             let event = select! {
137                 recv(&connection.receiver) -> msg => match msg {
138                     Ok(msg) => Event::Msg(msg),
139                     Err(RecvError) => return Err("client exited without shutdown".into()),
140                 },
141                 recv(task_receiver) -> task => Event::Task(task.unwrap()),
142                 recv(global_state.task_receiver) -> task => match task {
143                     Ok(task) => Event::Vfs(task),
144                     Err(RecvError) => return Err("vfs died".into()),
145                 },
146                 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.task_recv)) -> task => match task {
147                     Ok(task) => Event::CheckWatcher(task),
148                     Err(RecvError) => return Err("check watcher died".into()),
149                 },
150                 recv(global_state.flycheck_progress_receiver) -> status => match status {
151                     Ok(status) => Event::ProgressReport(ProgressReport::Flycheck(status)),
152                     Err(RecvError) => return Err("check watcher died".into()),
153                 },
154                 recv(roots_scanned_progress_receiver) -> status => match status {
155                     Ok(status) => Event::ProgressReport(ProgressReport::RootsScanned(status)),
156                     Err(RecvError) => {
157                         // Roots analysis has finished, we no longer need this receiver
158                         roots_scanned_progress_receiver = never();
159                         continue;
160                     }
161                 }
162             };
163             if let Event::Msg(Message::Request(req)) = &event {
164                 if connection.handle_shutdown(&req)? {
165                     break;
166                 };
167             }
168             assert!(!global_state.vfs.read().0.has_changes());
169             loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?;
170             assert!(!global_state.vfs.read().0.has_changes());
171         }
172     }
173     global_state.analysis_host.request_cancellation();
174     log::info!("waiting for tasks to finish...");
175     task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
176     log::info!("...tasks have finished");
177     log::info!("joining threadpool...");
178     pool.join();
179     drop(pool);
180     log::info!("...threadpool has finished");
181
182     let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead");
183     drop(vfs);
184
185     Ok(())
186 }
187
188 #[derive(Debug)]
189 enum Task {
190     Respond(Response),
191     Notify(Notification),
192     SendRequest(Request),
193     Diagnostic(DiagnosticTask),
194 }
195
196 enum Event {
197     Msg(Message),
198     Task(Task),
199     Vfs(vfs::loader::Message),
200     CheckWatcher(CheckTask),
201     ProgressReport(ProgressReport),
202 }
203
204 #[derive(Debug)]
205 enum ProgressReport {
206     Flycheck(ProgressStatus<(), String>),
207     RootsScanned(U32ProgressStatus),
208 }
209
210 impl fmt::Debug for Event {
211     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
212         let debug_verbose_not = |not: &Notification, f: &mut fmt::Formatter| {
213             f.debug_struct("Notification").field("method", &not.method).finish()
214         };
215
216         match self {
217             Event::Msg(Message::Notification(not)) => {
218                 if notification_is::<lsp_types::notification::DidOpenTextDocument>(not)
219                     || notification_is::<lsp_types::notification::DidChangeTextDocument>(not)
220                 {
221                     return debug_verbose_not(not, f);
222                 }
223             }
224             Event::Task(Task::Notify(not)) => {
225                 if notification_is::<lsp_types::notification::PublishDiagnostics>(not) {
226                     return debug_verbose_not(not, f);
227                 }
228             }
229             Event::Task(Task::Respond(resp)) => {
230                 return f
231                     .debug_struct("Response")
232                     .field("id", &resp.id)
233                     .field("error", &resp.error)
234                     .finish();
235             }
236             _ => (),
237         }
238         match self {
239             Event::Msg(it) => fmt::Debug::fmt(it, f),
240             Event::Task(it) => fmt::Debug::fmt(it, f),
241             Event::Vfs(it) => fmt::Debug::fmt(it, f),
242             Event::CheckWatcher(it) => fmt::Debug::fmt(it, f),
243             Event::ProgressReport(it) => fmt::Debug::fmt(it, f),
244         }
245     }
246 }
247
248 pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
249 pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
250 const DO_NOTHING: ReqHandler = |_, _| ();
251
252 fn loop_turn(
253     pool: &ThreadPool,
254     task_sender: &Sender<Task>,
255     connection: &Connection,
256     global_state: &mut GlobalState,
257     event: Event,
258 ) -> Result<()> {
259     let loop_start = Instant::now();
260
261     // NOTE: don't count blocking select! call as a loop-turn time
262     let _p = profile("main_loop_inner/loop-turn");
263     log::info!("loop turn = {:?}", event);
264     let queue_count = pool.queued_count();
265     if queue_count > 0 {
266         log::info!("queued count = {}", queue_count);
267     }
268
269     let mut became_ready = false;
270     match event {
271         Event::Task(task) => {
272             on_task(task, &connection.sender, global_state);
273             global_state.maybe_collect_garbage();
274         }
275         Event::Vfs(task) => match task {
276             vfs::loader::Message::Loaded { files } => {
277                 let vfs = &mut global_state.vfs.write().0;
278                 for (path, contents) in files {
279                     let path = VfsPath::from(path);
280                     if !global_state.mem_docs.contains(&path) {
281                         vfs.set_file_contents(path, contents)
282                     }
283                 }
284             }
285             vfs::loader::Message::Progress { n_total, n_done } => {
286                 if n_done == n_total {
287                     global_state.status = Status::Ready;
288                     became_ready = true;
289                 }
290                 report_progress(global_state, &connection.sender, n_done, n_total, "roots scanned")
291             }
292         },
293         Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
294         Event::ProgressReport(report) => {
295             on_progress_report(report, task_sender, loop_state, global_state)
296         }
297         Event::Msg(msg) => match msg {
298             Message::Request(req) => {
299                 on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)?
300             }
301             Message::Notification(not) => {
302                 on_notification(&connection.sender, global_state, not)?;
303             }
304             Message::Response(resp) => {
305                 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
306                 handler(global_state, resp)
307             }
308         },
309     };
310
311     let state_changed = global_state.process_changes();
312
313     if became_ready {
314         if let Some(flycheck) = &global_state.flycheck {
315             flycheck.update();
316         }
317     }
318
319     if global_state.status == Status::Ready && (state_changed || became_ready) {
320         let subscriptions = global_state
321             .mem_docs
322             .iter()
323             .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
324             .collect::<Vec<_>>();
325
326         update_file_notifications_on_threadpool(
327             pool,
328             global_state.snapshot(),
329             task_sender.clone(),
330             subscriptions.clone(),
331         );
332         pool.execute({
333             let subs = subscriptions;
334             let snap = global_state.snapshot();
335             move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ())
336         });
337     }
338
339     let loop_duration = loop_start.elapsed();
340     if loop_duration > Duration::from_millis(100) {
341         log::error!("overly long loop turn: {:?}", loop_duration);
342         if env::var("RA_PROFILE").is_ok() {
343             show_message(
344                 lsp_types::MessageType::Error,
345                 format!("overly long loop turn: {:?}", loop_duration),
346                 &connection.sender,
347             );
348         }
349     }
350
351     Ok(())
352 }
353
354 fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) {
355     match task {
356         Task::Respond(response) => {
357             if let Some((method, start)) =
358                 global_state.req_queue.incoming.complete(response.id.clone())
359             {
360                 let duration = start.elapsed();
361                 log::info!("handled req#{} in {:?}", response.id, duration);
362                 global_state.complete_request(RequestMetrics {
363                     id: response.id.clone(),
364                     method: method.to_string(),
365                     duration,
366                 });
367                 msg_sender.send(response.into()).unwrap();
368             }
369         }
370         Task::Notify(n) => {
371             msg_sender.send(n.into()).unwrap();
372         }
373         Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state),
374     }
375 }
376
377 fn on_request(
378     global_state: &mut GlobalState,
379     pool: &ThreadPool,
380     task_sender: &Sender<Task>,
381     msg_sender: &Sender<Message>,
382     request_received: Instant,
383     req: Request,
384 ) -> Result<()> {
385     let mut pool_dispatcher = PoolDispatcher {
386         req: Some(req),
387         pool,
388         global_state,
389         task_sender,
390         msg_sender,
391         request_received,
392     };
393     pool_dispatcher
394         .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
395         .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
396         .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
397         .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
398             handlers::handle_selection_range(s.snapshot(), p)
399         })?
400         .on_sync::<lsp_ext::MatchingBrace>(|s, p| handlers::handle_matching_brace(s.snapshot(), p))?
401         .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
402         .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
403         .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
404         .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
405         .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
406         .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
407         .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
408         .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
409         .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
410         .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
411         .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
412         .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
413         .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
414         .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
415         .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
416         .on::<lsp_types::request::Completion>(handlers::handle_completion)?
417         .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
418         .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
419         .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
420         .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
421         .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
422         .on::<lsp_types::request::Rename>(handlers::handle_rename)?
423         .on::<lsp_types::request::References>(handlers::handle_references)?
424         .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
425         .on::<lsp_types::request::DocumentHighlightRequest>(handlers::handle_document_highlight)?
426         .on::<lsp_types::request::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
427         .on::<lsp_types::request::CallHierarchyIncomingCalls>(
428             handlers::handle_call_hierarchy_incoming,
429         )?
430         .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
431             handlers::handle_call_hierarchy_outgoing,
432         )?
433         .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
434         .on::<lsp_types::request::SemanticTokensRangeRequest>(
435             handlers::handle_semantic_tokens_range,
436         )?
437         .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
438         .finish();
439     Ok(())
440 }
441
442 fn on_notification(
443     msg_sender: &Sender<Message>,
444     global_state: &mut GlobalState,
445     not: Notification,
446 ) -> Result<()> {
447     let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
448         Ok(params) => {
449             let id: RequestId = match params.id {
450                 NumberOrString::Number(id) => id.into(),
451                 NumberOrString::String(id) => id.into(),
452             };
453             if let Some(response) = global_state.req_queue.incoming.cancel(id) {
454                 msg_sender.send(response.into()).unwrap()
455             }
456             return Ok(());
457         }
458         Err(not) => not,
459     };
460     let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
461         Ok(params) => {
462             if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
463                 if !global_state.mem_docs.insert(path.clone()) {
464                     log::error!("duplicate DidOpenTextDocument: {}", path)
465                 }
466                 global_state
467                     .vfs
468                     .write()
469                     .0
470                     .set_file_contents(path, Some(params.text_document.text.into_bytes()));
471             }
472             return Ok(());
473         }
474         Err(not) => not,
475     };
476     let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
477         Ok(params) => {
478             if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
479                 assert!(global_state.mem_docs.contains(&path));
480                 let vfs = &mut global_state.vfs.write().0;
481                 let file_id = vfs.file_id(&path).unwrap();
482                 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
483                 apply_document_changes(&mut text, params.content_changes);
484                 vfs.set_file_contents(path, Some(text.into_bytes()))
485             }
486             return Ok(());
487         }
488         Err(not) => not,
489     };
490     let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
491         Ok(params) => {
492             if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
493                 if !global_state.mem_docs.remove(&path) {
494                     log::error!("orphan DidCloseTextDocument: {}", path)
495                 }
496                 if let Some(path) = path.as_path() {
497                     global_state.loader.invalidate(path.to_path_buf());
498                 }
499             }
500             let params = lsp_types::PublishDiagnosticsParams {
501                 uri: params.text_document.uri,
502                 diagnostics: Vec::new(),
503                 version: None,
504             };
505             let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
506             msg_sender.send(not.into()).unwrap();
507             return Ok(());
508         }
509         Err(not) => not,
510     };
511     let not = match notification_cast::<lsp_types::notification::DidSaveTextDocument>(not) {
512         Ok(_params) => {
513             if let Some(flycheck) = &global_state.flycheck {
514                 flycheck.update();
515             }
516             return Ok(());
517         }
518         Err(not) => not,
519     };
520     let not = match notification_cast::<lsp_types::notification::DidChangeConfiguration>(not) {
521         Ok(_) => {
522             // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
523             // this notification's parameters should be ignored and the actual config queried separately.
524             let request = global_state.req_queue.outgoing.register(
525                 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
526                 lsp_types::ConfigurationParams {
527                     items: vec![lsp_types::ConfigurationItem {
528                         scope_uri: None,
529                         section: Some("rust-analyzer".to_string()),
530                     }],
531                 },
532                 |global_state, resp| {
533                     log::debug!("config update response: '{:?}", resp);
534                     let Response { error, result, .. } = resp;
535
536                     match (error, result) {
537                         (Some(err), _) => {
538                             log::error!("failed to fetch the server settings: {:?}", err)
539                         }
540                         (None, Some(configs)) => {
541                             if let Some(new_config) = configs.get(0) {
542                                 let mut config = global_state.config.clone();
543                                 config.update(&new_config);
544                                 global_state.update_configuration(config);
545                             }
546                         }
547                         (None, None) => {
548                             log::error!("received empty server settings response from the client")
549                         }
550                     }
551                 },
552             );
553             msg_sender.send(request.into())?;
554
555             return Ok(());
556         }
557         Err(not) => not,
558     };
559     let not = match notification_cast::<lsp_types::notification::DidChangeWatchedFiles>(not) {
560         Ok(params) => {
561             for change in params.changes {
562                 if let Ok(path) = from_proto::abs_path(&change.uri) {
563                     global_state.loader.invalidate(path)
564                 }
565             }
566             return Ok(());
567         }
568         Err(not) => not,
569     };
570     if not.method.starts_with("$/") {
571         return Ok(());
572     }
573     log::error!("unhandled notification: {:?}", not);
574     Ok(())
575 }
576
577 fn apply_document_changes(
578     old_text: &mut String,
579     content_changes: Vec<TextDocumentContentChangeEvent>,
580 ) {
581     let mut line_index = LineIndex::new(old_text);
582     // The changes we got must be applied sequentially, but can cross lines so we
583     // have to keep our line index updated.
584     // Some clients (e.g. Code) sort the ranges in reverse. As an optimization, we
585     // remember the last valid line in the index and only rebuild it if needed.
586     // The VFS will normalize the end of lines to `\n`.
587     enum IndexValid {
588         All,
589         UpToLineExclusive(u64),
590     }
591
592     impl IndexValid {
593         fn covers(&self, line: u64) -> bool {
594             match *self {
595                 IndexValid::UpToLineExclusive(to) => to > line,
596                 _ => true,
597             }
598         }
599     }
600
601     let mut index_valid = IndexValid::All;
602     for change in content_changes {
603         match change.range {
604             Some(range) => {
605                 if !index_valid.covers(range.end.line) {
606                     line_index = LineIndex::new(&old_text);
607                 }
608                 index_valid = IndexValid::UpToLineExclusive(range.start.line);
609                 let range = from_proto::text_range(&line_index, range);
610                 old_text.replace_range(Range::<usize>::from(range), &change.text);
611             }
612             None => {
613                 *old_text = change.text;
614                 index_valid = IndexValid::UpToLineExclusive(0);
615             }
616         }
617     }
618 }
619
620 fn on_check_task(
621     task: CheckTask,
622     global_state: &mut GlobalState,
623     task_sender: &Sender<Task>,
624 ) -> Result<()> {
625     match task {
626         CheckTask::ClearDiagnostics => {
627             task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?;
628         }
629
630         CheckTask::AddDiagnostic { workspace_root, diagnostic } => {
631             let diagnostics = crate::diagnostics::to_proto::map_rust_diagnostic_to_lsp(
632                 &global_state.config.diagnostics,
633                 &diagnostic,
634                 &workspace_root,
635             );
636             for diag in diagnostics {
637                 let path = from_proto::vfs_path(&diag.location.uri)?;
638                 let file_id = match global_state.vfs.read().0.file_id(&path) {
639                     Some(file) => FileId(file.0),
640                     None => {
641                         log::error!("File with cargo diagnostic not found in VFS: {}", path);
642                         return Ok(());
643                     }
644                 };
645
646                 task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck(
647                     file_id,
648                     diag.diagnostic,
649                     diag.fixes.into_iter().map(|it| it.into()).collect(),
650                 )))?;
651             }
652         }
653
654         CheckTask::Status(status) => {
655             if global_state.config.client_caps.work_done_progress {
656                 let progress = match status {
657                     ra_flycheck::Status::Being => {
658                         lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
659                             title: "Running `cargo check`".to_string(),
660                             cancellable: Some(false),
661                             message: None,
662                             percentage: None,
663                         })
664                     }
665                     ra_flycheck::Status::Progress(target) => {
666                         lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport {
667                             cancellable: Some(false),
668                             message: Some(target),
669                             percentage: None,
670                         })
671                     }
672                     ra_flycheck::Status::End => {
673                         lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd {
674                             message: None,
675                         })
676                     }
677                 };
678
679                 let params = lsp_types::ProgressParams {
680                     token: lsp_types::ProgressToken::String(
681                         "rustAnalyzer/cargoWatcher".to_string(),
682                     ),
683                     value: lsp_types::ProgressParamsValue::WorkDone(progress),
684                 };
685                 let not = notification_new::<lsp_types::notification::Progress>(params);
686                 task_sender.send(Task::Notify(not)).unwrap();
687             }
688         }
689     };
690
691     Ok(())
692 }
693
694 fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut GlobalState) {
695     let subscriptions = state.diagnostics.handle_task(task);
696
697     for file_id in subscriptions {
698         let url = file_id_to_url(&state.vfs.read().0, file_id);
699         let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect();
700         let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None };
701         let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
702         msg_sender.send(not.into()).unwrap();
703     }
704 }
705
706 fn report_progress(
707     global_state: &mut GlobalState,
708     sender: &Sender<Message>,
709     done: usize,
710     total: usize,
711     message: &str,
712 ) {
713     let token = lsp_types::ProgressToken::String(format!("rustAnalyzer/{}", message));
714     let message = Some(format!("{}/{} {}", done, total, message));
715     let percentage = Some(100.0 * done as f64 / total.max(1) as f64);
716     let work_done_progress = if done == 0 {
717         let work_done_progress_create = global_state.req_queue.outgoing.register(
718             lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(),
719             lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
720             DO_NOTHING,
721         );
722         sender.send(work_done_progress_create.into()).unwrap();
723
724         lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
725             title: "rust-analyzer".into(),
726             cancellable: None,
727             message,
728             percentage,
729         })
730     } else if done < total {
731         lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport {
732             cancellable: None,
733             message,
734             percentage,
735         })
736     } else {
737         assert!(done == total);
738         lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd { message })
739     };
740     let notification =
741         notification_new::<lsp_types::notification::Progress>(lsp_types::ProgressParams {
742             token,
743             value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress),
744         });
745     sender.send(notification.into()).unwrap();
746 }
747
748 struct PoolDispatcher<'a> {
749     req: Option<Request>,
750     pool: &'a ThreadPool,
751     global_state: &'a mut GlobalState,
752     msg_sender: &'a Sender<Message>,
753     task_sender: &'a Sender<Task>,
754     request_received: Instant,
755 }
756
757 impl<'a> PoolDispatcher<'a> {
758     /// Dispatches the request onto the current thread
759     fn on_sync<R>(
760         &mut self,
761         f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
762     ) -> Result<&mut Self>
763     where
764         R: lsp_types::request::Request + 'static,
765         R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
766         R::Result: Serialize + 'static,
767     {
768         let (id, params) = match self.parse::<R>() {
769             Some(it) => it,
770             None => {
771                 return Ok(self);
772             }
773         };
774         let world = panic::AssertUnwindSafe(&mut *self.global_state);
775         let task = panic::catch_unwind(move || {
776             let result = f(world.0, params);
777             result_to_task::<R>(id, result)
778         })
779         .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
780         on_task(task, self.msg_sender, self.global_state);
781         Ok(self)
782     }
783
784     /// Dispatches the request onto thread pool
785     fn on<R>(
786         &mut self,
787         f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
788     ) -> Result<&mut Self>
789     where
790         R: lsp_types::request::Request + 'static,
791         R::Params: DeserializeOwned + Send + 'static,
792         R::Result: Serialize + 'static,
793     {
794         let (id, params) = match self.parse::<R>() {
795             Some(it) => it,
796             None => {
797                 return Ok(self);
798             }
799         };
800
801         self.pool.execute({
802             let world = self.global_state.snapshot();
803             let sender = self.task_sender.clone();
804             move || {
805                 let result = f(world, params);
806                 let task = result_to_task::<R>(id, result);
807                 sender.send(task).unwrap();
808             }
809         });
810
811         Ok(self)
812     }
813
814     fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
815     where
816         R: lsp_types::request::Request + 'static,
817         R::Params: DeserializeOwned + 'static,
818     {
819         let req = self.req.take()?;
820         let (id, params) = match req.extract::<R::Params>(R::METHOD) {
821             Ok(it) => it,
822             Err(req) => {
823                 self.req = Some(req);
824                 return None;
825             }
826         };
827         self.global_state
828             .req_queue
829             .incoming
830             .register(id.clone(), (R::METHOD, self.request_received));
831         Some((id, params))
832     }
833
834     fn finish(&mut self) {
835         match self.req.take() {
836             None => (),
837             Some(req) => {
838                 log::error!("unknown request: {:?}", req);
839                 let resp = Response::new_err(
840                     req.id,
841                     ErrorCode::MethodNotFound as i32,
842                     "unknown request".to_string(),
843                 );
844                 self.msg_sender.send(resp.into()).unwrap();
845             }
846         }
847     }
848 }
849
850 fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
851 where
852     R: lsp_types::request::Request + 'static,
853     R::Params: DeserializeOwned + 'static,
854     R::Result: Serialize + 'static,
855 {
856     let response = match result {
857         Ok(resp) => Response::new_ok(id, &resp),
858         Err(e) => match e.downcast::<LspError>() {
859             Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
860             Err(e) => {
861                 if is_canceled(&*e) {
862                     Response::new_err(
863                         id,
864                         ErrorCode::ContentModified as i32,
865                         "content modified".to_string(),
866                     )
867                 } else {
868                     Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
869                 }
870             }
871         },
872     };
873     Task::Respond(response)
874 }
875
876 fn update_file_notifications_on_threadpool(
877     pool: &ThreadPool,
878     world: GlobalStateSnapshot,
879     task_sender: Sender<Task>,
880     subscriptions: Vec<FileId>,
881 ) {
882     log::trace!("updating notifications for {:?}", subscriptions);
883     if world.config.publish_diagnostics {
884         pool.execute(move || {
885             for file_id in subscriptions {
886                 match handlers::publish_diagnostics(&world, file_id) {
887                     Err(e) => {
888                         if !is_canceled(&*e) {
889                             log::error!("failed to compute diagnostics: {:?}", e);
890                         }
891                     }
892                     Ok(task) => {
893                         task_sender.send(Task::Diagnostic(task)).unwrap();
894                     }
895                 }
896             }
897         })
898     }
899 }
900
901 pub fn show_message(
902     typ: lsp_types::MessageType,
903     message: impl Into<String>,
904     sender: &Sender<Message>,
905 ) {
906     let message = message.into();
907     let params = lsp_types::ShowMessageParams { typ, message };
908     let not = notification_new::<lsp_types::notification::ShowMessage>(params);
909     sender.send(not.into()).unwrap();
910 }
911
912 fn is_canceled(e: &Box<dyn std::error::Error + Send + Sync>) -> bool {
913     e.downcast_ref::<Canceled>().is_some()
914 }
915
916 fn notification_is<N: lsp_types::notification::Notification>(notification: &Notification) -> bool {
917     notification.method == N::METHOD
918 }
919
920 fn notification_cast<N>(notification: Notification) -> std::result::Result<N::Params, Notification>
921 where
922     N: lsp_types::notification::Notification,
923     N::Params: DeserializeOwned,
924 {
925     notification.extract(N::METHOD)
926 }
927
928 fn notification_new<N>(params: N::Params) -> Notification
929 where
930     N: lsp_types::notification::Notification,
931     N::Params: Serialize,
932 {
933     Notification::new(N::METHOD.to_string(), params)
934 }
935
936 #[cfg(test)]
937 mod tests {
938     use lsp_types::{Position, Range, TextDocumentContentChangeEvent};
939
940     use super::*;
941
942     #[test]
943     fn test_apply_document_changes() {
944         macro_rules! c {
945             [$($sl:expr, $sc:expr; $el:expr, $ec:expr => $text:expr),+] => {
946                 vec![$(TextDocumentContentChangeEvent {
947                     range: Some(Range {
948                         start: Position { line: $sl, character: $sc },
949                         end: Position { line: $el, character: $ec },
950                     }),
951                     range_length: None,
952                     text: String::from($text),
953                 }),+]
954             };
955         }
956
957         let mut text = String::new();
958         apply_document_changes(&mut text, vec![]);
959         assert_eq!(text, "");
960         apply_document_changes(
961             &mut text,
962             vec![TextDocumentContentChangeEvent {
963                 range: None,
964                 range_length: None,
965                 text: String::from("the"),
966             }],
967         );
968         assert_eq!(text, "the");
969         apply_document_changes(&mut text, c![0, 3; 0, 3 => " quick"]);
970         assert_eq!(text, "the quick");
971         apply_document_changes(&mut text, c![0, 0; 0, 4 => "", 0, 5; 0, 5 => " foxes"]);
972         assert_eq!(text, "quick foxes");
973         apply_document_changes(&mut text, c![0, 11; 0, 11 => "\ndream"]);
974         assert_eq!(text, "quick foxes\ndream");
975         apply_document_changes(&mut text, c![1, 0; 1, 0 => "have "]);
976         assert_eq!(text, "quick foxes\nhave dream");
977         apply_document_changes(
978             &mut text,
979             c![0, 0; 0, 0 => "the ", 1, 4; 1, 4 => " quiet", 1, 16; 1, 16 => "s\n"],
980         );
981         assert_eq!(text, "the quick foxes\nhave quiet dreams\n");
982         apply_document_changes(&mut text, c![0, 15; 0, 15 => "\n", 2, 17; 2, 17 => "\n"]);
983         assert_eq!(text, "the quick foxes\n\nhave quiet dreams\n\n");
984         apply_document_changes(
985             &mut text,
986             c![1, 0; 1, 0 => "DREAM", 2, 0; 2, 0 => "they ", 3, 0; 3, 0 => "DON'T THEY?"],
987         );
988         assert_eq!(text, "the quick foxes\nDREAM\nthey have quiet dreams\nDON'T THEY?\n");
989         apply_document_changes(&mut text, c![0, 10; 1, 5 => "", 2, 0; 2, 12 => ""]);
990         assert_eq!(text, "the quick \nthey have quiet dreams\n");
991
992         text = String::from("❤️");
993         apply_document_changes(&mut text, c![0, 0; 0, 0 => "a"]);
994         assert_eq!(text, "a❤️");
995
996         text = String::from("a\nb");
997         apply_document_changes(&mut text, c![0, 1; 1, 0 => "\nțc", 0, 1; 1, 1 => "d"]);
998         assert_eq!(text, "adcb");
999
1000         text = String::from("a\nb");
1001         apply_document_changes(&mut text, c![0, 1; 1, 0 => "ț\nc", 0, 2; 0, 2 => "c"]);
1002         assert_eq!(text, "ațc\ncb");
1003     }
1004 }