]> git.lizzy.rs Git - rust.git/blob - crates/rust-analyzer/src/main_loop.rs
Flatten Task enum ¯\_(ツ)_/¯
[rust.git] / crates / rust-analyzer / src / main_loop.rs
1 //! The main loop of `rust-analyzer` responsible for dispatching LSP
2 //! requests/replies and notifications back to the client.
3
4 mod handlers;
5 mod subscriptions;
6 pub(crate) mod pending_requests;
7 mod lsp_utils;
8
9 use std::{
10     borrow::Cow,
11     convert::TryFrom,
12     env,
13     error::Error,
14     fmt,
15     ops::Range,
16     panic,
17     sync::Arc,
18     time::{Duration, Instant},
19 };
20
21 use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
22 use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
23 use lsp_types::{DidChangeTextDocumentParams, NumberOrString, TextDocumentContentChangeEvent};
24 use ra_flycheck::CheckTask;
25 use ra_ide::{Canceled, FileId, LineIndex};
26 use ra_prof::profile;
27 use ra_project_model::{PackageRoot, ProjectWorkspace};
28 use ra_vfs::VfsTask;
29 use rustc_hash::FxHashSet;
30 use serde::{de::DeserializeOwned, Serialize};
31 use threadpool::ThreadPool;
32
33 use crate::{
34     config::{Config, FilesWatcher, LinkedProject},
35     diagnostics::DiagnosticTask,
36     from_proto,
37     global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot},
38     lsp_ext,
39     main_loop::{
40         pending_requests::{PendingRequest, PendingRequests},
41         subscriptions::Subscriptions,
42     },
43     Result,
44 };
45 pub use lsp_utils::show_message;
46 use lsp_utils::{is_canceled, notification_cast, notification_is, notification_new, request_new};
47 use ra_progress::{
48     IsDone, ProgressStatus, U32Progress, U32ProgressReport, U32ProgressSource, U32ProgressStatus,
49 };
50
51 const FLYCHECK_PROGRESS_TOKEN: &str = "rustAnalyzer/flycheck";
52 const ROOTS_SCANNED_PROGRESS_TOKEN: &str = "rustAnalyzer/rootsScanned";
53
54 #[derive(Debug)]
55 pub struct LspError {
56     pub code: i32,
57     pub message: String,
58 }
59
60 impl LspError {
61     pub const UNKNOWN_FILE: i32 = -32900;
62
63     pub fn new(code: i32, message: String) -> LspError {
64         LspError { code, message }
65     }
66 }
67
68 impl fmt::Display for LspError {
69     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
70         write!(f, "Language Server request failed with {}. ({})", self.code, self.message)
71     }
72 }
73
74 impl Error for LspError {}
75
76 pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
77     log::info!("initial config: {:#?}", config);
78
79     // Windows scheduler implements priority boosts: if thread waits for an
80     // event (like a condvar), and event fires, priority of the thread is
81     // temporary bumped. This optimization backfires in our case: each time the
82     // `main_loop` schedules a task to run on a threadpool, the worker threads
83     // gets a higher priority, and (on a machine with fewer cores) displaces the
84     // main loop! We work-around this by marking the main loop as a
85     // higher-priority thread.
86     //
87     // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
88     // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
89     // https://github.com/rust-analyzer/rust-analyzer/issues/2835
90     #[cfg(windows)]
91     unsafe {
92         use winapi::um::processthreadsapi::*;
93         let thread = GetCurrentThread();
94         let thread_priority_above_normal = 1;
95         SetThreadPriority(thread, thread_priority_above_normal);
96     }
97
98     let mut loop_state = LoopState::default();
99     let mut global_state = {
100         let workspaces = {
101             if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
102                 show_message(
103                     lsp_types::MessageType::Error,
104                     "rust-analyzer failed to discover workspace".to_string(),
105                     &connection.sender,
106                 );
107             };
108
109             config
110                 .linked_projects
111                 .iter()
112                 .filter_map(|project| match project {
113                     LinkedProject::ProjectManifest(manifest) => {
114                         ra_project_model::ProjectWorkspace::load(
115                             manifest.clone(),
116                             &config.cargo,
117                             config.with_sysroot,
118                         )
119                         .map_err(|err| {
120                             log::error!("failed to load workspace: {:#}", err);
121                             show_message(
122                                 lsp_types::MessageType::Error,
123                                 format!("rust-analyzer failed to load workspace: {:#}", err),
124                                 &connection.sender,
125                             );
126                         })
127                         .ok()
128                     }
129                     LinkedProject::InlineJsonProject(it) => {
130                         Some(ra_project_model::ProjectWorkspace::Json {
131                             project: it.clone(),
132                             project_location: config.root_path.clone(),
133                         })
134                     }
135                 })
136                 .collect::<Vec<_>>()
137         };
138
139         let globs = config
140             .files
141             .exclude
142             .iter()
143             .map(|glob| crate::vfs_glob::Glob::new(glob))
144             .collect::<std::result::Result<Vec<_>, _>>()?;
145
146         if let FilesWatcher::Client = config.files.watcher {
147             let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions {
148                 watchers: workspaces
149                     .iter()
150                     .flat_map(ProjectWorkspace::to_roots)
151                     .filter(PackageRoot::is_member)
152                     .map(|root| format!("{}/**/*.rs", root.path().display()))
153                     .map(|glob_pattern| lsp_types::FileSystemWatcher { glob_pattern, kind: None })
154                     .collect(),
155             };
156             let registration = lsp_types::Registration {
157                 id: "file-watcher".to_string(),
158                 method: "workspace/didChangeWatchedFiles".to_string(),
159                 register_options: Some(serde_json::to_value(registration_options).unwrap()),
160             };
161             let params = lsp_types::RegistrationParams { registrations: vec![registration] };
162             let request = request_new::<lsp_types::request::RegisterCapability>(
163                 loop_state.next_request_id(),
164                 params,
165             );
166             connection.sender.send(request.into()).unwrap();
167         }
168
169         GlobalState::new(workspaces, config.lru_capacity, &globs, config)
170     };
171
172     loop_state.roots_total = u32::try_from(global_state.vfs.read().n_roots())
173         .expect("Wow, your project is so huge, that it cannot fit into u32...");
174
175     loop_state.roots_scanned = 0;
176     let mut roots_scanned_progress_receiver = {
177         let (recv, mut progress_src) =
178             U32ProgressSource::real_if(global_state.config.client_caps.work_done_progress);
179         loop_state.roots_progress = Some(progress_src.begin(0, loop_state.roots_total));
180         recv
181     };
182
183     let pool = ThreadPool::default();
184     let (task_sender, task_receiver) = unbounded::<Task>();
185
186     log::info!("server initialized, serving requests");
187     {
188         let task_sender = task_sender;
189         loop {
190             log::trace!("selecting");
191             let event = select! {
192                 recv(&connection.receiver) -> msg => match msg {
193                     Ok(msg) => Event::Msg(msg),
194                     Err(RecvError) => return Err("client exited without shutdown".into()),
195                 },
196                 recv(task_receiver) -> task => Event::Task(task.unwrap()),
197                 recv(global_state.task_receiver) -> task => match task {
198                     Ok(task) => Event::Vfs(task),
199                     Err(RecvError) => return Err("vfs died".into()),
200                 },
201                 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.task_recv)) -> task => match task {
202                     Ok(task) => Event::CheckWatcher(task),
203                     Err(RecvError) => return Err("check watcher died".into()),
204                 },
205                 recv(global_state.flycheck_progress_receiver) -> status => match status {
206                     Ok(status) => Event::ProgressReport(ProgressReport::Flycheck(status)),
207                     Err(RecvError) => return Err("check watcher died".into()),
208                 },
209                 recv(roots_scanned_progress_receiver) -> status => match status {
210                     Ok(status) => Event::ProgressReport(ProgressReport::RootsScanned(status)),
211                     Err(RecvError) => {
212                         // Roots analysis has finished, we no longer need this receiver
213                         roots_scanned_progress_receiver = never();
214                         continue;
215                     }
216                 }
217             };
218             if let Event::Msg(Message::Request(req)) = &event {
219                 if connection.handle_shutdown(&req)? {
220                     break;
221                 };
222             }
223             loop_turn(&pool, &task_sender, &connection, &mut global_state, &mut loop_state, event)?;
224         }
225     }
226     global_state.analysis_host.request_cancellation();
227     log::info!("waiting for tasks to finish...");
228     task_receiver.into_iter().for_each(|task| {
229         on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut global_state)
230     });
231     log::info!("...tasks have finished");
232     log::info!("joining threadpool...");
233     pool.join();
234     drop(pool);
235     log::info!("...threadpool has finished");
236
237     let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead");
238     drop(vfs);
239
240     Ok(())
241 }
242
243 #[derive(Debug)]
244 enum Task {
245     Respond(Response),
246     Notify(Notification),
247     SendRequest(Request),
248     Diagnostic(DiagnosticTask),
249 }
250
251 enum Event {
252     Msg(Message),
253     Task(Task),
254     Vfs(VfsTask),
255     CheckWatcher(CheckTask),
256     ProgressReport(ProgressReport),
257 }
258
259 #[derive(Debug)]
260 enum ProgressReport {
261     Flycheck(ProgressStatus<(), String>),
262     RootsScanned(U32ProgressStatus),
263 }
264
265 impl fmt::Debug for Event {
266     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
267         let debug_verbose_not = |not: &Notification, f: &mut fmt::Formatter| {
268             f.debug_struct("Notification").field("method", &not.method).finish()
269         };
270
271         match self {
272             Event::Msg(Message::Notification(not)) => {
273                 if notification_is::<lsp_types::notification::DidOpenTextDocument>(not)
274                     || notification_is::<lsp_types::notification::DidChangeTextDocument>(not)
275                 {
276                     return debug_verbose_not(not, f);
277                 }
278             }
279             Event::Task(Task::Notify(not)) => {
280                 if notification_is::<lsp_types::notification::PublishDiagnostics>(not) {
281                     return debug_verbose_not(not, f);
282                 }
283             }
284             Event::Task(Task::Respond(resp)) => {
285                 return f
286                     .debug_struct("Response")
287                     .field("id", &resp.id)
288                     .field("error", &resp.error)
289                     .finish();
290             }
291             _ => (),
292         }
293         match self {
294             Event::Msg(it) => fmt::Debug::fmt(it, f),
295             Event::Task(it) => fmt::Debug::fmt(it, f),
296             Event::Vfs(it) => fmt::Debug::fmt(it, f),
297             Event::CheckWatcher(it) => fmt::Debug::fmt(it, f),
298             Event::ProgressReport(it) => fmt::Debug::fmt(it, f),
299         }
300     }
301 }
302
303 #[derive(Default)]
304 struct LoopState {
305     next_request_id: u64,
306     pending_responses: FxHashSet<RequestId>,
307     pending_requests: PendingRequests,
308     subscriptions: Subscriptions,
309     workspace_loaded: bool,
310     roots_progress: Option<U32Progress>,
311     roots_scanned: u32,
312     roots_total: u32,
313     configuration_request_id: Option<RequestId>,
314 }
315
316 impl LoopState {
317     fn next_request_id(&mut self) -> RequestId {
318         self.next_request_id += 1;
319         let res: RequestId = self.next_request_id.into();
320         let inserted = self.pending_responses.insert(res.clone());
321         assert!(inserted);
322         res
323     }
324 }
325
326 fn loop_turn(
327     pool: &ThreadPool,
328     task_sender: &Sender<Task>,
329     connection: &Connection,
330     global_state: &mut GlobalState,
331     loop_state: &mut LoopState,
332     event: Event,
333 ) -> Result<()> {
334     let loop_start = Instant::now();
335
336     // NOTE: don't count blocking select! call as a loop-turn time
337     let _p = profile("main_loop_inner/loop-turn");
338     log::info!("loop turn = {:?}", event);
339     let queue_count = pool.queued_count();
340     if queue_count > 0 {
341         log::info!("queued count = {}", queue_count);
342     }
343
344     match event {
345         Event::Task(task) => {
346             on_task(task, &connection.sender, &mut loop_state.pending_requests, global_state);
347             global_state.maybe_collect_garbage();
348         }
349         Event::Vfs(task) => {
350             global_state.vfs.write().handle_task(task);
351         }
352         Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
353         Event::ProgressReport(report) => {
354             on_progress_report(report, task_sender, loop_state, global_state)
355         }
356         Event::Msg(msg) => match msg {
357             Message::Request(req) => on_request(
358                 global_state,
359                 &mut loop_state.pending_requests,
360                 pool,
361                 task_sender,
362                 &connection.sender,
363                 loop_start,
364                 req,
365             )?,
366             Message::Notification(not) => {
367                 on_notification(&connection.sender, global_state, loop_state, not)?;
368             }
369             Message::Response(resp) => {
370                 let removed = loop_state.pending_responses.remove(&resp.id);
371                 if !removed {
372                     log::error!("unexpected response: {:?}", resp)
373                 }
374
375                 if Some(&resp.id) == loop_state.configuration_request_id.as_ref() {
376                     loop_state.configuration_request_id = None;
377                     log::debug!("config update response: '{:?}", resp);
378                     let Response { error, result, .. } = resp;
379
380                     match (error, result) {
381                         (Some(err), _) => {
382                             log::error!("failed to fetch the server settings: {:?}", err)
383                         }
384                         (None, Some(configs)) => {
385                             if let Some(new_config) = configs.get(0) {
386                                 let mut config = global_state.config.clone();
387                                 config.update(&new_config);
388                                 global_state.update_configuration(config);
389                             }
390                         }
391                         (None, None) => {
392                             log::error!("received empty server settings response from the client")
393                         }
394                     }
395                 }
396             }
397         },
398     };
399
400     let mut state_changed = global_state.process_changes(&mut loop_state.roots_scanned);
401
402     let show_progress =
403         !loop_state.workspace_loaded && global_state.config.client_caps.work_done_progress;
404
405     if !loop_state.workspace_loaded && loop_state.roots_scanned == loop_state.roots_total {
406         state_changed = true;
407         loop_state.workspace_loaded = true;
408         if let Some(flycheck) = &global_state.flycheck {
409             flycheck.update();
410         }
411     }
412
413     if show_progress {
414         if let Some(progress) = &mut loop_state.roots_progress {
415             if loop_state.workspace_loaded
416                 || progress.report(loop_state.roots_scanned) == IsDone(true)
417             {
418                 loop_state.roots_progress = None;
419             }
420         }
421     }
422
423     if state_changed && loop_state.workspace_loaded {
424         update_file_notifications_on_threadpool(
425             pool,
426             global_state.snapshot(),
427             task_sender.clone(),
428             loop_state.subscriptions.subscriptions(),
429         );
430         pool.execute({
431             let subs = loop_state.subscriptions.subscriptions();
432             let snap = global_state.snapshot();
433             move || snap.analysis().prime_caches(subs).unwrap_or_else(|_: Canceled| ())
434         });
435     }
436
437     let loop_duration = loop_start.elapsed();
438     if loop_duration > Duration::from_millis(100) {
439         log::error!("overly long loop turn: {:?}", loop_duration);
440         if env::var("RA_PROFILE").is_ok() {
441             show_message(
442                 lsp_types::MessageType::Error,
443                 format!("overly long loop turn: {:?}", loop_duration),
444                 &connection.sender,
445             );
446         }
447     }
448
449     Ok(())
450 }
451
452 fn on_progress_report(
453     report: ProgressReport,
454     task_sender: &Sender<Task>,
455     loop_state: &mut LoopState,
456     global_state: &GlobalState,
457 ) {
458     let end_report =
459         || lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd { message: None });
460     let mut create_progress = |token: &'static str| {
461         let create_progress_req = request_new::<lsp_types::request::WorkDoneProgressCreate>(
462             loop_state.next_request_id(),
463             lsp_types::WorkDoneProgressCreateParams {
464                 token: lsp_types::ProgressToken::String(token.to_string()),
465             },
466         );
467         task_sender.send(Task::SendRequest(create_progress_req.into())).unwrap();
468     };
469
470     let (token, progress) = match report {
471         ProgressReport::Flycheck(status) => {
472             let command = global_state
473                 .config
474                 .check
475                 .as_ref()
476                 .expect("There should be config, since flycheck is active");
477
478             let progress = match status {
479                 ProgressStatus::Begin(()) => {
480                     create_progress(FLYCHECK_PROGRESS_TOKEN);
481                     lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
482                         title: "".to_string(),
483                         cancellable: Some(false),
484                         message: Some(command.to_string()),
485                         percentage: None,
486                     })
487                 }
488                 ProgressStatus::Progress(target) => {
489                     lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport {
490                         cancellable: Some(false),
491                         message: Some(format!("{} [{}]", command, target)),
492                         percentage: None,
493                     })
494                 }
495                 ProgressStatus::End => end_report(),
496             };
497             (FLYCHECK_PROGRESS_TOKEN, progress)
498         }
499         ProgressReport::RootsScanned(status) => {
500             fn to_message(report: &U32ProgressReport) -> String {
501                 report.to_message("analyzing the workspace", "packages")
502             }
503             let progress = match status {
504                 ProgressStatus::Begin(report) => {
505                     create_progress(ROOTS_SCANNED_PROGRESS_TOKEN);
506                     lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
507                         title: "rust-analyzer".to_string(),
508                         cancellable: Some(false),
509                         message: Some(to_message(&report)),
510                         percentage: Some(report.percentage()),
511                     })
512                 }
513                 ProgressStatus::Progress(report) => {
514                     lsp_types::WorkDoneProgress::Report(lsp_types::WorkDoneProgressReport {
515                         cancellable: Some(false),
516                         message: Some(to_message(&report)),
517                         percentage: Some(report.percentage()),
518                     })
519                 }
520                 ProgressStatus::End => end_report(),
521             };
522             (ROOTS_SCANNED_PROGRESS_TOKEN, progress)
523         }
524     };
525     let params = lsp_types::ProgressParams {
526         token: lsp_types::ProgressToken::String(token.to_string()),
527         value: lsp_types::ProgressParamsValue::WorkDone(progress),
528     };
529     let not = notification_new::<lsp_types::notification::Progress>(params);
530     task_sender.send(Task::Notify(not.into())).unwrap()
531 }
532
533 fn on_task(
534     task: Task,
535     msg_sender: &Sender<Message>,
536     pending_requests: &mut PendingRequests,
537     state: &mut GlobalState,
538 ) {
539     match task {
540         Task::Respond(response) => {
541             if let Some(completed) = pending_requests.finish(&response.id) {
542                 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
543                 state.complete_request(completed);
544                 msg_sender.send(response.into()).unwrap();
545             }
546         }
547         Task::Notify(not) => msg_sender.send(not.into()).unwrap(),
548         Task::SendRequest(req) => msg_sender.send(req.into()).unwrap(),
549         Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state),
550     }
551 }
552
553 fn on_request(
554     global_state: &mut GlobalState,
555     pending_requests: &mut PendingRequests,
556     pool: &ThreadPool,
557     task_sender: &Sender<Task>,
558     msg_sender: &Sender<Message>,
559     request_received: Instant,
560     req: Request,
561 ) -> Result<()> {
562     let mut pool_dispatcher = PoolDispatcher {
563         req: Some(req),
564         pool,
565         global_state,
566         task_sender,
567         msg_sender,
568         pending_requests,
569         request_received,
570     };
571     pool_dispatcher
572         .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
573         .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
574         .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
575         .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
576             handlers::handle_selection_range(s.snapshot(), p)
577         })?
578         .on_sync::<lsp_ext::MatchingBrace>(|s, p| handlers::handle_matching_brace(s.snapshot(), p))?
579         .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
580         .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
581         .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
582         .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
583         .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
584         .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
585         .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
586         .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
587         .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
588         .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
589         .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
590         .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
591         .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
592         .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
593         .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
594         .on::<lsp_types::request::Completion>(handlers::handle_completion)?
595         .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
596         .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
597         .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
598         .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
599         .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
600         .on::<lsp_types::request::Rename>(handlers::handle_rename)?
601         .on::<lsp_types::request::References>(handlers::handle_references)?
602         .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
603         .on::<lsp_types::request::DocumentHighlightRequest>(handlers::handle_document_highlight)?
604         .on::<lsp_types::request::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
605         .on::<lsp_types::request::CallHierarchyIncomingCalls>(
606             handlers::handle_call_hierarchy_incoming,
607         )?
608         .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
609             handlers::handle_call_hierarchy_outgoing,
610         )?
611         .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
612         .on::<lsp_types::request::SemanticTokensRangeRequest>(
613             handlers::handle_semantic_tokens_range,
614         )?
615         .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
616         .finish();
617     Ok(())
618 }
619
620 fn on_notification(
621     msg_sender: &Sender<Message>,
622     state: &mut GlobalState,
623     loop_state: &mut LoopState,
624     not: Notification,
625 ) -> Result<()> {
626     let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
627         Ok(params) => {
628             let id: RequestId = match params.id {
629                 NumberOrString::Number(id) => id.into(),
630                 NumberOrString::String(id) => id.into(),
631             };
632             if loop_state.pending_requests.cancel(&id) {
633                 let response = Response::new_err(
634                     id,
635                     ErrorCode::RequestCanceled as i32,
636                     "canceled by client".to_string(),
637                 );
638                 msg_sender.send(response.into()).unwrap()
639             }
640             return Ok(());
641         }
642         Err(not) => not,
643     };
644     let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
645         Ok(params) => {
646             let uri = params.text_document.uri;
647             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
648             if let Some(file_id) =
649                 state.vfs.write().add_file_overlay(&path, params.text_document.text)
650             {
651                 loop_state.subscriptions.add_sub(FileId(file_id.0));
652             }
653             return Ok(());
654         }
655         Err(not) => not,
656     };
657     let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
658         Ok(params) => {
659             let DidChangeTextDocumentParams { text_document, content_changes } = params;
660             let world = state.snapshot();
661             let file_id = from_proto::file_id(&world, &text_document.uri)?;
662             let line_index = world.analysis().file_line_index(file_id)?;
663             let uri = text_document.uri;
664             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
665             state.vfs.write().change_file_overlay(&path, |old_text| {
666                 apply_document_changes(old_text, Cow::Borrowed(&line_index), content_changes);
667             });
668             return Ok(());
669         }
670         Err(not) => not,
671     };
672     let not = match notification_cast::<lsp_types::notification::DidSaveTextDocument>(not) {
673         Ok(_params) => {
674             if let Some(flycheck) = &state.flycheck {
675                 flycheck.update();
676             }
677             return Ok(());
678         }
679         Err(not) => not,
680     };
681     let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
682         Ok(params) => {
683             let uri = params.text_document.uri;
684             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
685             if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
686                 loop_state.subscriptions.remove_sub(FileId(file_id.0));
687             }
688             let params =
689                 lsp_types::PublishDiagnosticsParams { uri, diagnostics: Vec::new(), version: None };
690             let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
691             msg_sender.send(not.into()).unwrap();
692             return Ok(());
693         }
694         Err(not) => not,
695     };
696     let not = match notification_cast::<lsp_types::notification::DidChangeConfiguration>(not) {
697         Ok(_) => {
698             // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
699             // this notification's parameters should be ignored and the actual config queried separately.
700             let request_id = loop_state.next_request_id();
701             let request = request_new::<lsp_types::request::WorkspaceConfiguration>(
702                 request_id.clone(),
703                 lsp_types::ConfigurationParams {
704                     items: vec![lsp_types::ConfigurationItem {
705                         scope_uri: None,
706                         section: Some("rust-analyzer".to_string()),
707                     }],
708                 },
709             );
710             msg_sender.send(request.into())?;
711             loop_state.configuration_request_id = Some(request_id);
712
713             return Ok(());
714         }
715         Err(not) => not,
716     };
717     let not = match notification_cast::<lsp_types::notification::DidChangeWatchedFiles>(not) {
718         Ok(params) => {
719             let mut vfs = state.vfs.write();
720             for change in params.changes {
721                 let uri = change.uri;
722                 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
723                 vfs.notify_changed(path)
724             }
725             return Ok(());
726         }
727         Err(not) => not,
728     };
729     if not.method.starts_with("$/") {
730         return Ok(());
731     }
732     log::error!("unhandled notification: {:?}", not);
733     Ok(())
734 }
735
736 fn apply_document_changes(
737     old_text: &mut String,
738     mut line_index: Cow<'_, LineIndex>,
739     content_changes: Vec<TextDocumentContentChangeEvent>,
740 ) {
741     // The changes we got must be applied sequentially, but can cross lines so we
742     // have to keep our line index updated.
743     // Some clients (e.g. Code) sort the ranges in reverse. As an optimization, we
744     // remember the last valid line in the index and only rebuild it if needed.
745     // The VFS will normalize the end of lines to `\n`.
746     enum IndexValid {
747         All,
748         UpToLineExclusive(u64),
749     }
750
751     impl IndexValid {
752         fn covers(&self, line: u64) -> bool {
753             match *self {
754                 IndexValid::UpToLineExclusive(to) => to > line,
755                 _ => true,
756             }
757         }
758     }
759
760     let mut index_valid = IndexValid::All;
761     for change in content_changes {
762         match change.range {
763             Some(range) => {
764                 if !index_valid.covers(range.end.line) {
765                     line_index = Cow::Owned(LineIndex::new(&old_text));
766                 }
767                 index_valid = IndexValid::UpToLineExclusive(range.start.line);
768                 let range = from_proto::text_range(&line_index, range);
769                 old_text.replace_range(Range::<usize>::from(range), &change.text);
770             }
771             None => {
772                 *old_text = change.text;
773                 index_valid = IndexValid::UpToLineExclusive(0);
774             }
775         }
776     }
777 }
778
779 fn on_check_task(
780     task: CheckTask,
781     global_state: &mut GlobalState,
782     task_sender: &Sender<Task>,
783 ) -> Result<()> {
784     match task {
785         CheckTask::ClearDiagnostics => {
786             task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?;
787         }
788
789         CheckTask::AddDiagnostic { workspace_root, diagnostic } => {
790             let diagnostics = crate::diagnostics::to_proto::map_rust_diagnostic_to_lsp(
791                 &global_state.config.diagnostics,
792                 &diagnostic,
793                 &workspace_root,
794             );
795             for diag in diagnostics {
796                 let path = diag
797                     .location
798                     .uri
799                     .to_file_path()
800                     .map_err(|()| format!("invalid uri: {}", diag.location.uri))?;
801                 let file_id = match global_state.vfs.read().path2file(&path) {
802                     Some(file) => FileId(file.0),
803                     None => {
804                         log::error!(
805                             "File with cargo diagnostic not found in VFS: {}",
806                             path.display()
807                         );
808                         return Ok(());
809                     }
810                 };
811
812                 task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck(
813                     file_id,
814                     diag.diagnostic,
815                     diag.fixes.into_iter().map(|it| it.into()).collect(),
816                 )))?;
817             }
818         }
819     };
820
821     Ok(())
822 }
823
824 fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut GlobalState) {
825     let subscriptions = state.diagnostics.handle_task(task);
826
827     for file_id in subscriptions {
828         let url = file_id_to_url(&state.vfs.read(), file_id);
829         let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect();
830         let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None };
831         let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
832         msg_sender.send(not.into()).unwrap();
833     }
834 }
835
836 struct PoolDispatcher<'a> {
837     req: Option<Request>,
838     pool: &'a ThreadPool,
839     global_state: &'a mut GlobalState,
840     pending_requests: &'a mut PendingRequests,
841     msg_sender: &'a Sender<Message>,
842     task_sender: &'a Sender<Task>,
843     request_received: Instant,
844 }
845
846 impl<'a> PoolDispatcher<'a> {
847     /// Dispatches the request onto the current thread
848     fn on_sync<R>(
849         &mut self,
850         f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
851     ) -> Result<&mut Self>
852     where
853         R: lsp_types::request::Request + 'static,
854         R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
855         R::Result: Serialize + 'static,
856     {
857         let (id, params) = match self.parse::<R>() {
858             Some(it) => it,
859             None => {
860                 return Ok(self);
861             }
862         };
863         let world = panic::AssertUnwindSafe(&mut *self.global_state);
864         let task = panic::catch_unwind(move || {
865             let result = f(world.0, params);
866             result_to_task::<R>(id, result)
867         })
868         .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
869         on_task(task, self.msg_sender, self.pending_requests, self.global_state);
870         Ok(self)
871     }
872
873     /// Dispatches the request onto thread pool
874     fn on<R>(
875         &mut self,
876         f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
877     ) -> Result<&mut Self>
878     where
879         R: lsp_types::request::Request + 'static,
880         R::Params: DeserializeOwned + Send + 'static,
881         R::Result: Serialize + 'static,
882     {
883         let (id, params) = match self.parse::<R>() {
884             Some(it) => it,
885             None => {
886                 return Ok(self);
887             }
888         };
889
890         self.pool.execute({
891             let world = self.global_state.snapshot();
892             let sender = self.task_sender.clone();
893             move || {
894                 let result = f(world, params);
895                 let task = result_to_task::<R>(id, result);
896                 sender.send(task).unwrap();
897             }
898         });
899
900         Ok(self)
901     }
902
903     fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
904     where
905         R: lsp_types::request::Request + 'static,
906         R::Params: DeserializeOwned + 'static,
907     {
908         let req = self.req.take()?;
909         let (id, params) = match req.extract::<R::Params>(R::METHOD) {
910             Ok(it) => it,
911             Err(req) => {
912                 self.req = Some(req);
913                 return None;
914             }
915         };
916         self.pending_requests.start(PendingRequest {
917             id: id.clone(),
918             method: R::METHOD.to_string(),
919             received: self.request_received,
920         });
921         Some((id, params))
922     }
923
924     fn finish(&mut self) {
925         match self.req.take() {
926             None => (),
927             Some(req) => {
928                 log::error!("unknown request: {:?}", req);
929                 let resp = Response::new_err(
930                     req.id,
931                     ErrorCode::MethodNotFound as i32,
932                     "unknown request".to_string(),
933                 );
934                 self.msg_sender.send(resp.into()).unwrap();
935             }
936         }
937     }
938 }
939
940 fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
941 where
942     R: lsp_types::request::Request + 'static,
943     R::Params: DeserializeOwned + 'static,
944     R::Result: Serialize + 'static,
945 {
946     let response = match result {
947         Ok(resp) => Response::new_ok(id, &resp),
948         Err(e) => match e.downcast::<LspError>() {
949             Ok(lsp_error) => {
950                 if lsp_error.code == LspError::UNKNOWN_FILE {
951                     // Work-around for https://github.com/rust-analyzer/rust-analyzer/issues/1521
952                     Response::new_ok(id, ())
953                 } else {
954                     Response::new_err(id, lsp_error.code, lsp_error.message)
955                 }
956             }
957             Err(e) => {
958                 if is_canceled(&*e) {
959                     Response::new_err(
960                         id,
961                         ErrorCode::ContentModified as i32,
962                         "content modified".to_string(),
963                     )
964                 } else {
965                     Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
966                 }
967             }
968         },
969     };
970     Task::Respond(response)
971 }
972
973 fn update_file_notifications_on_threadpool(
974     pool: &ThreadPool,
975     world: GlobalStateSnapshot,
976     task_sender: Sender<Task>,
977     subscriptions: Vec<FileId>,
978 ) {
979     log::trace!("updating notifications for {:?}", subscriptions);
980     if world.config.publish_diagnostics {
981         pool.execute(move || {
982             for file_id in subscriptions {
983                 match handlers::publish_diagnostics(&world, file_id) {
984                     Err(e) => {
985                         if !is_canceled(&*e) {
986                             log::error!("failed to compute diagnostics: {:?}", e);
987                         }
988                     }
989                     Ok(task) => {
990                         task_sender.send(Task::Diagnostic(task)).unwrap();
991                     }
992                 }
993             }
994         })
995     }
996 }
997
998 #[cfg(test)]
999 mod tests {
1000     use std::borrow::Cow;
1001
1002     use lsp_types::{Position, Range, TextDocumentContentChangeEvent};
1003     use ra_ide::LineIndex;
1004
1005     #[test]
1006     fn apply_document_changes() {
1007         fn run(text: &mut String, changes: Vec<TextDocumentContentChangeEvent>) {
1008             let line_index = Cow::Owned(LineIndex::new(&text));
1009             super::apply_document_changes(text, line_index, changes);
1010         }
1011
1012         macro_rules! c {
1013             [$($sl:expr, $sc:expr; $el:expr, $ec:expr => $text:expr),+] => {
1014                 vec![$(TextDocumentContentChangeEvent {
1015                     range: Some(Range {
1016                         start: Position { line: $sl, character: $sc },
1017                         end: Position { line: $el, character: $ec },
1018                     }),
1019                     range_length: None,
1020                     text: String::from($text),
1021                 }),+]
1022             };
1023         }
1024
1025         let mut text = String::new();
1026         run(&mut text, vec![]);
1027         assert_eq!(text, "");
1028         run(
1029             &mut text,
1030             vec![TextDocumentContentChangeEvent {
1031                 range: None,
1032                 range_length: None,
1033                 text: String::from("the"),
1034             }],
1035         );
1036         assert_eq!(text, "the");
1037         run(&mut text, c![0, 3; 0, 3 => " quick"]);
1038         assert_eq!(text, "the quick");
1039         run(&mut text, c![0, 0; 0, 4 => "", 0, 5; 0, 5 => " foxes"]);
1040         assert_eq!(text, "quick foxes");
1041         run(&mut text, c![0, 11; 0, 11 => "\ndream"]);
1042         assert_eq!(text, "quick foxes\ndream");
1043         run(&mut text, c![1, 0; 1, 0 => "have "]);
1044         assert_eq!(text, "quick foxes\nhave dream");
1045         run(&mut text, c![0, 0; 0, 0 => "the ", 1, 4; 1, 4 => " quiet", 1, 16; 1, 16 => "s\n"]);
1046         assert_eq!(text, "the quick foxes\nhave quiet dreams\n");
1047         run(&mut text, c![0, 15; 0, 15 => "\n", 2, 17; 2, 17 => "\n"]);
1048         assert_eq!(text, "the quick foxes\n\nhave quiet dreams\n\n");
1049         run(
1050             &mut text,
1051             c![1, 0; 1, 0 => "DREAM", 2, 0; 2, 0 => "they ", 3, 0; 3, 0 => "DON'T THEY?"],
1052         );
1053         assert_eq!(text, "the quick foxes\nDREAM\nthey have quiet dreams\nDON'T THEY?\n");
1054         run(&mut text, c![0, 10; 1, 5 => "", 2, 0; 2, 12 => ""]);
1055         assert_eq!(text, "the quick \nthey have quiet dreams\n");
1056
1057         text = String::from("❤️");
1058         run(&mut text, c![0, 0; 0, 0 => "a"]);
1059         assert_eq!(text, "a❤️");
1060
1061         text = String::from("a\nb");
1062         run(&mut text, c![0, 1; 1, 0 => "\nțc", 0, 1; 1, 1 => "d"]);
1063         assert_eq!(text, "adcb");
1064
1065         text = String::from("a\nb");
1066         run(&mut text, c![0, 1; 1, 0 => "ț\nc", 0, 2; 0, 2 => "c"]);
1067         assert_eq!(text, "ațc\ncb");
1068     }
1069 }