]> git.lizzy.rs Git - rust.git/blob - crates/ra_lsp_server/src/main_loop.rs
Re-implement status display using LSP 3.15 progress event
[rust.git] / crates / ra_lsp_server / src / main_loop.rs
1 //! The main loop of `ra_lsp_server` responsible for dispatching LSP requests/replies and
2 //! notifications back to the client.
3
4 mod handlers;
5 mod subscriptions;
6 pub(crate) mod pending_requests;
7
8 use std::{error::Error, fmt, panic, path::PathBuf, sync::Arc, time::Instant};
9
10 use crossbeam_channel::{select, unbounded, RecvError, Sender};
11 use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
12 use lsp_types::{ClientCapabilities, NumberOrString};
13 use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
14 use ra_prof::profile;
15 use ra_vfs::{VfsTask, Watch};
16 use relative_path::RelativePathBuf;
17 use rustc_hash::FxHashSet;
18 use serde::{de::DeserializeOwned, Serialize};
19 use threadpool::ThreadPool;
20
21 use crate::{
22     cargo_check::CheckTask,
23     main_loop::{
24         pending_requests::{PendingRequest, PendingRequests},
25         subscriptions::Subscriptions,
26     },
27     req,
28     world::{Options, WorldSnapshot, WorldState},
29     Result, ServerConfig,
30 };
31
32 const THREADPOOL_SIZE: usize = 8;
33 const MAX_IN_FLIGHT_LIBS: usize = THREADPOOL_SIZE - 3;
34
35 #[derive(Debug)]
36 pub struct LspError {
37     pub code: i32,
38     pub message: String,
39 }
40
41 impl LspError {
42     pub fn new(code: i32, message: String) -> LspError {
43         LspError { code, message }
44     }
45 }
46
47 impl fmt::Display for LspError {
48     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
49         write!(f, "Language Server request failed with {}. ({})", self.code, self.message)
50     }
51 }
52
53 impl Error for LspError {}
54
55 pub fn main_loop(
56     ws_roots: Vec<PathBuf>,
57     client_caps: ClientCapabilities,
58     config: ServerConfig,
59     connection: Connection,
60 ) -> Result<()> {
61     log::info!("server_config: {:#?}", config);
62
63     let mut loop_state = LoopState::default();
64     let mut world_state = {
65         // FIXME: support dynamic workspace loading.
66         let workspaces = {
67             let mut loaded_workspaces = Vec::new();
68             for ws_root in &ws_roots {
69                 let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot(
70                     ws_root.as_path(),
71                     config.with_sysroot,
72                     &config.cargo_features,
73                 );
74                 match workspace {
75                     Ok(workspace) => loaded_workspaces.push(workspace),
76                     Err(e) => {
77                         log::error!("loading workspace failed: {}", e);
78
79                         show_message(
80                             req::MessageType::Error,
81                             format!("rust-analyzer failed to load workspace: {}", e),
82                             &connection.sender,
83                         );
84                     }
85                 }
86             }
87             loaded_workspaces
88         };
89
90         let globs = config
91             .exclude_globs
92             .iter()
93             .map(|glob| ra_vfs_glob::Glob::new(glob))
94             .collect::<std::result::Result<Vec<_>, _>>()?;
95
96         if config.use_client_watching {
97             let registration_options = req::DidChangeWatchedFilesRegistrationOptions {
98                 watchers: workspaces
99                     .iter()
100                     .flat_map(|ws| ws.to_roots())
101                     .filter(|root| root.is_member())
102                     .map(|root| format!("{}/**/*.rs", root.path().display()))
103                     .map(|glob_pattern| req::FileSystemWatcher { glob_pattern, kind: None })
104                     .collect(),
105             };
106             let registration = req::Registration {
107                 id: "file-watcher".to_string(),
108                 method: "workspace/didChangeWatchedFiles".to_string(),
109                 register_options: Some(serde_json::to_value(registration_options).unwrap()),
110             };
111             let params = req::RegistrationParams { registrations: vec![registration] };
112             let request =
113                 request_new::<req::RegisterCapability>(loop_state.next_request_id(), params);
114             connection.sender.send(request.into()).unwrap();
115         }
116
117         let options = {
118             let text_document_caps = client_caps.text_document.as_ref();
119             Options {
120                 publish_decorations: config.publish_decorations,
121                 supports_location_link: text_document_caps
122                     .and_then(|it| it.definition)
123                     .and_then(|it| it.link_support)
124                     .unwrap_or(false),
125                 line_folding_only: text_document_caps
126                     .and_then(|it| it.folding_range.as_ref())
127                     .and_then(|it| it.line_folding_only)
128                     .unwrap_or(false),
129                 max_inlay_hint_length: config.max_inlay_hint_length,
130                 cargo_check_enable: config.cargo_check_enable,
131                 cargo_check_command: config.cargo_check_command,
132                 cargo_check_args: config.cargo_check_args,
133             }
134         };
135
136         let feature_flags = {
137             let mut ff = FeatureFlags::default();
138             for (flag, value) in config.feature_flags {
139                 if ff.set(flag.as_str(), value).is_err() {
140                     log::error!("unknown feature flag: {:?}", flag);
141                     show_message(
142                         req::MessageType::Error,
143                         format!("unknown feature flag: {:?}", flag),
144                         &connection.sender,
145                     );
146                 }
147             }
148             ff
149         };
150         log::info!("feature_flags: {:#?}", feature_flags);
151
152         WorldState::new(
153             ws_roots,
154             workspaces,
155             config.lru_capacity,
156             &globs,
157             Watch(!config.use_client_watching),
158             options,
159             feature_flags,
160         )
161     };
162
163     let pool = ThreadPool::new(THREADPOOL_SIZE);
164     let (task_sender, task_receiver) = unbounded::<Task>();
165     let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
166
167     log::info!("server initialized, serving requests");
168     {
169         let task_sender = task_sender;
170         let libdata_sender = libdata_sender;
171         loop {
172             log::trace!("selecting");
173             let event = select! {
174                 recv(&connection.receiver) -> msg => match msg {
175                     Ok(msg) => Event::Msg(msg),
176                     Err(RecvError) => Err("client exited without shutdown")?,
177                 },
178                 recv(task_receiver) -> task => Event::Task(task.unwrap()),
179                 recv(world_state.task_receiver) -> task => match task {
180                     Ok(task) => Event::Vfs(task),
181                     Err(RecvError) => Err("vfs died")?,
182                 },
183                 recv(libdata_receiver) -> data => Event::Lib(data.unwrap()),
184                 recv(world_state.check_watcher.task_recv) -> task => Event::CheckWatcher(task.unwrap())
185             };
186             if let Event::Msg(Message::Request(req)) = &event {
187                 if connection.handle_shutdown(&req)? {
188                     break;
189                 };
190             }
191             loop_turn(
192                 &pool,
193                 &task_sender,
194                 &libdata_sender,
195                 &connection,
196                 &mut world_state,
197                 &mut loop_state,
198                 event,
199             )?;
200         }
201     }
202
203     log::info!("waiting for tasks to finish...");
204     task_receiver.into_iter().for_each(|task| {
205         on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
206     });
207     libdata_receiver.into_iter().for_each(drop);
208     log::info!("...tasks have finished");
209     log::info!("joining threadpool...");
210     drop(pool);
211     log::info!("...threadpool has finished");
212
213     let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead");
214     drop(vfs);
215
216     Ok(())
217 }
218
219 #[derive(Debug)]
220 enum Task {
221     Respond(Response),
222     Notify(Notification),
223 }
224
225 enum Event {
226     Msg(Message),
227     Task(Task),
228     Vfs(VfsTask),
229     Lib(LibraryData),
230     CheckWatcher(CheckTask),
231 }
232
233 impl fmt::Debug for Event {
234     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235         let debug_verbose_not = |not: &Notification, f: &mut fmt::Formatter| {
236             f.debug_struct("Notification").field("method", &not.method).finish()
237         };
238
239         match self {
240             Event::Msg(Message::Notification(not)) => {
241                 if notification_is::<req::DidOpenTextDocument>(not)
242                     || notification_is::<req::DidChangeTextDocument>(not)
243                 {
244                     return debug_verbose_not(not, f);
245                 }
246             }
247             Event::Task(Task::Notify(not)) => {
248                 if notification_is::<req::PublishDecorations>(not)
249                     || notification_is::<req::PublishDiagnostics>(not)
250                 {
251                     return debug_verbose_not(not, f);
252                 }
253             }
254             Event::Task(Task::Respond(resp)) => {
255                 return f
256                     .debug_struct("Response")
257                     .field("id", &resp.id)
258                     .field("error", &resp.error)
259                     .finish();
260             }
261             _ => (),
262         }
263         match self {
264             Event::Msg(it) => fmt::Debug::fmt(it, f),
265             Event::Task(it) => fmt::Debug::fmt(it, f),
266             Event::Vfs(it) => fmt::Debug::fmt(it, f),
267             Event::Lib(it) => fmt::Debug::fmt(it, f),
268             Event::CheckWatcher(it) => fmt::Debug::fmt(it, f),
269         }
270     }
271 }
272
273 #[derive(Debug, Default)]
274 struct LoopState {
275     next_request_id: u64,
276     pending_responses: FxHashSet<RequestId>,
277     pending_requests: PendingRequests,
278     subscriptions: Subscriptions,
279     // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same
280     // time to always have a thread ready to react to input.
281     in_flight_libraries: usize,
282     pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>,
283     workspace_loaded: bool,
284 }
285
286 impl LoopState {
287     fn next_request_id(&mut self) -> RequestId {
288         self.next_request_id += 1;
289         let res: RequestId = self.next_request_id.into();
290         let inserted = self.pending_responses.insert(res.clone());
291         assert!(inserted);
292         res
293     }
294 }
295
296 fn loop_turn(
297     pool: &ThreadPool,
298     task_sender: &Sender<Task>,
299     libdata_sender: &Sender<LibraryData>,
300     connection: &Connection,
301     world_state: &mut WorldState,
302     loop_state: &mut LoopState,
303     event: Event,
304 ) -> Result<()> {
305     let loop_start = Instant::now();
306
307     // NOTE: don't count blocking select! call as a loop-turn time
308     let _p = profile("main_loop_inner/loop-turn");
309     log::info!("loop turn = {:?}", event);
310     let queue_count = pool.queued_count();
311     if queue_count > 0 {
312         log::info!("queued count = {}", queue_count);
313     }
314
315     match event {
316         Event::Task(task) => {
317             on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state);
318             world_state.maybe_collect_garbage();
319         }
320         Event::Vfs(task) => {
321             world_state.vfs.write().handle_task(task);
322         }
323         Event::Lib(lib) => {
324             world_state.add_lib(lib);
325             world_state.maybe_collect_garbage();
326             loop_state.in_flight_libraries -= 1;
327         }
328         Event::CheckWatcher(task) => match task {
329             CheckTask::Update(uri) => {
330                 // We manually send a diagnostic update when the watcher asks
331                 // us to, to avoid the issue of having to change the file to
332                 // receive updated diagnostics.
333                 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
334                 if let Some(file_id) = world_state.vfs.read().path2file(&path) {
335                     let params =
336                         handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
337                     let not = notification_new::<req::PublishDiagnostics>(params);
338                     task_sender.send(Task::Notify(not)).unwrap();
339                 }
340             }
341             CheckTask::Status(progress) => {
342                 let params = req::ProgressParams {
343                     token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
344                     value: req::ProgressParamsValue::WorkDone(progress),
345                 };
346                 let not = notification_new::<req::Progress>(params);
347                 task_sender.send(Task::Notify(not)).unwrap();
348             }
349         },
350         Event::Msg(msg) => match msg {
351             Message::Request(req) => on_request(
352                 world_state,
353                 &mut loop_state.pending_requests,
354                 pool,
355                 task_sender,
356                 &connection.sender,
357                 loop_start,
358                 req,
359             )?,
360             Message::Notification(not) => {
361                 on_notification(
362                     &connection.sender,
363                     world_state,
364                     &mut loop_state.pending_requests,
365                     &mut loop_state.subscriptions,
366                     not,
367                 )?;
368             }
369             Message::Response(resp) => {
370                 let removed = loop_state.pending_responses.remove(&resp.id);
371                 if !removed {
372                     log::error!("unexpected response: {:?}", resp)
373                 }
374             }
375         },
376     };
377
378     let mut state_changed = false;
379     if let Some(changes) = world_state.process_changes() {
380         state_changed = true;
381         loop_state.pending_libraries.extend(changes);
382     }
383
384     while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS
385         && !loop_state.pending_libraries.is_empty()
386     {
387         let (root, files) = loop_state.pending_libraries.pop().unwrap();
388         loop_state.in_flight_libraries += 1;
389         let sender = libdata_sender.clone();
390         pool.execute(move || {
391             log::info!("indexing {:?} ... ", root);
392             let _p = profile(&format!("indexed {:?}", root));
393             let data = LibraryData::prepare(root, files);
394             sender.send(data).unwrap();
395         });
396     }
397
398     if !loop_state.workspace_loaded
399         && world_state.roots_to_scan == 0
400         && loop_state.pending_libraries.is_empty()
401         && loop_state.in_flight_libraries == 0
402     {
403         loop_state.workspace_loaded = true;
404         let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum();
405         if world_state.feature_flags().get("notifications.workspace-loaded") {
406             let msg = format!("workspace loaded, {} rust packages", n_packages);
407             show_message(req::MessageType::Info, msg, &connection.sender);
408         }
409     }
410
411     if state_changed {
412         update_file_notifications_on_threadpool(
413             pool,
414             world_state.snapshot(),
415             world_state.options.publish_decorations,
416             task_sender.clone(),
417             loop_state.subscriptions.subscriptions(),
418         )
419     }
420     Ok(())
421 }
422
423 fn on_task(
424     task: Task,
425     msg_sender: &Sender<Message>,
426     pending_requests: &mut PendingRequests,
427     state: &mut WorldState,
428 ) {
429     match task {
430         Task::Respond(response) => {
431             if let Some(completed) = pending_requests.finish(&response.id) {
432                 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
433                 state.complete_request(completed);
434                 msg_sender.send(response.into()).unwrap();
435             }
436         }
437         Task::Notify(n) => {
438             msg_sender.send(n.into()).unwrap();
439         }
440     }
441 }
442
443 fn on_request(
444     world: &mut WorldState,
445     pending_requests: &mut PendingRequests,
446     pool: &ThreadPool,
447     sender: &Sender<Task>,
448     msg_sender: &Sender<Message>,
449     request_received: Instant,
450     req: Request,
451 ) -> Result<()> {
452     let mut pool_dispatcher = PoolDispatcher {
453         req: Some(req),
454         pool,
455         world,
456         sender,
457         msg_sender,
458         pending_requests,
459         request_received,
460     };
461     pool_dispatcher
462         .on_sync::<req::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
463         .on_sync::<req::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
464         .on_sync::<req::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
465         .on_sync::<req::SelectionRangeRequest>(|s, p| {
466             handlers::handle_selection_range(s.snapshot(), p)
467         })?
468         .on_sync::<req::FindMatchingBrace>(|s, p| {
469             handlers::handle_find_matching_brace(s.snapshot(), p)
470         })?
471         .on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)?
472         .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
473         .on::<req::ExpandMacro>(handlers::handle_expand_macro)?
474         .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)?
475         .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)?
476         .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
477         .on::<req::GotoDefinition>(handlers::handle_goto_definition)?
478         .on::<req::GotoImplementation>(handlers::handle_goto_implementation)?
479         .on::<req::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
480         .on::<req::ParentModule>(handlers::handle_parent_module)?
481         .on::<req::Runnables>(handlers::handle_runnables)?
482         .on::<req::DecorationsRequest>(handlers::handle_decorations)?
483         .on::<req::Completion>(handlers::handle_completion)?
484         .on::<req::CodeActionRequest>(handlers::handle_code_action)?
485         .on::<req::CodeLensRequest>(handlers::handle_code_lens)?
486         .on::<req::CodeLensResolve>(handlers::handle_code_lens_resolve)?
487         .on::<req::FoldingRangeRequest>(handlers::handle_folding_range)?
488         .on::<req::SignatureHelpRequest>(handlers::handle_signature_help)?
489         .on::<req::HoverRequest>(handlers::handle_hover)?
490         .on::<req::PrepareRenameRequest>(handlers::handle_prepare_rename)?
491         .on::<req::Rename>(handlers::handle_rename)?
492         .on::<req::References>(handlers::handle_references)?
493         .on::<req::Formatting>(handlers::handle_formatting)?
494         .on::<req::DocumentHighlightRequest>(handlers::handle_document_highlight)?
495         .on::<req::InlayHints>(handlers::handle_inlay_hints)?
496         .finish();
497     Ok(())
498 }
499
500 fn on_notification(
501     msg_sender: &Sender<Message>,
502     state: &mut WorldState,
503     pending_requests: &mut PendingRequests,
504     subs: &mut Subscriptions,
505     not: Notification,
506 ) -> Result<()> {
507     let not = match notification_cast::<req::Cancel>(not) {
508         Ok(params) => {
509             let id: RequestId = match params.id {
510                 NumberOrString::Number(id) => id.into(),
511                 NumberOrString::String(id) => id.into(),
512             };
513             if pending_requests.cancel(&id) {
514                 let response = Response::new_err(
515                     id,
516                     ErrorCode::RequestCanceled as i32,
517                     "canceled by client".to_string(),
518                 );
519                 msg_sender.send(response.into()).unwrap()
520             }
521             return Ok(());
522         }
523         Err(not) => not,
524     };
525     let not = match notification_cast::<req::DidOpenTextDocument>(not) {
526         Ok(params) => {
527             let uri = params.text_document.uri;
528             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
529             if let Some(file_id) =
530                 state.vfs.write().add_file_overlay(&path, params.text_document.text)
531             {
532                 subs.add_sub(FileId(file_id.0));
533             }
534             return Ok(());
535         }
536         Err(not) => not,
537     };
538     let not = match notification_cast::<req::DidChangeTextDocument>(not) {
539         Ok(mut params) => {
540             let uri = params.text_document.uri;
541             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
542             let text =
543                 params.content_changes.pop().ok_or_else(|| "empty changes".to_string())?.text;
544             state.vfs.write().change_file_overlay(path.as_path(), text);
545             return Ok(());
546         }
547         Err(not) => not,
548     };
549     let not = match notification_cast::<req::DidSaveTextDocument>(not) {
550         Ok(_params) => {
551             state.check_watcher.update();
552             return Ok(());
553         }
554         Err(not) => not,
555     };
556     let not = match notification_cast::<req::DidCloseTextDocument>(not) {
557         Ok(params) => {
558             let uri = params.text_document.uri;
559             let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
560             if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
561                 subs.remove_sub(FileId(file_id.0));
562             }
563             let params =
564                 req::PublishDiagnosticsParams { uri, diagnostics: Vec::new(), version: None };
565             let not = notification_new::<req::PublishDiagnostics>(params);
566             msg_sender.send(not.into()).unwrap();
567             return Ok(());
568         }
569         Err(not) => not,
570     };
571     let not = match notification_cast::<req::DidChangeConfiguration>(not) {
572         Ok(_params) => {
573             return Ok(());
574         }
575         Err(not) => not,
576     };
577     let not = match notification_cast::<req::DidChangeWatchedFiles>(not) {
578         Ok(params) => {
579             let mut vfs = state.vfs.write();
580             for change in params.changes {
581                 let uri = change.uri;
582                 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
583                 vfs.notify_changed(path)
584             }
585             return Ok(());
586         }
587         Err(not) => not,
588     };
589     log::error!("unhandled notification: {:?}", not);
590     Ok(())
591 }
592
593 struct PoolDispatcher<'a> {
594     req: Option<Request>,
595     pool: &'a ThreadPool,
596     world: &'a mut WorldState,
597     pending_requests: &'a mut PendingRequests,
598     msg_sender: &'a Sender<Message>,
599     sender: &'a Sender<Task>,
600     request_received: Instant,
601 }
602
603 impl<'a> PoolDispatcher<'a> {
604     /// Dispatches the request onto the current thread
605     fn on_sync<R>(
606         &mut self,
607         f: fn(&mut WorldState, R::Params) -> Result<R::Result>,
608     ) -> Result<&mut Self>
609     where
610         R: req::Request + 'static,
611         R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
612         R::Result: Serialize + 'static,
613     {
614         let (id, params) = match self.parse::<R>() {
615             Some(it) => it,
616             None => {
617                 return Ok(self);
618             }
619         };
620         let world = panic::AssertUnwindSafe(&mut *self.world);
621         let task = panic::catch_unwind(move || {
622             let result = f(world.0, params);
623             result_to_task::<R>(id, result)
624         })
625         .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
626         on_task(task, self.msg_sender, self.pending_requests, self.world);
627         Ok(self)
628     }
629
630     /// Dispatches the request onto thread pool
631     fn on<R>(&mut self, f: fn(WorldSnapshot, R::Params) -> Result<R::Result>) -> Result<&mut Self>
632     where
633         R: req::Request + 'static,
634         R::Params: DeserializeOwned + Send + 'static,
635         R::Result: Serialize + 'static,
636     {
637         let (id, params) = match self.parse::<R>() {
638             Some(it) => it,
639             None => {
640                 return Ok(self);
641             }
642         };
643
644         self.pool.execute({
645             let world = self.world.snapshot();
646             let sender = self.sender.clone();
647             move || {
648                 let result = f(world, params);
649                 let task = result_to_task::<R>(id, result);
650                 sender.send(task).unwrap();
651             }
652         });
653
654         Ok(self)
655     }
656
657     fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
658     where
659         R: req::Request + 'static,
660         R::Params: DeserializeOwned + 'static,
661     {
662         let req = self.req.take()?;
663         let (id, params) = match req.extract::<R::Params>(R::METHOD) {
664             Ok(it) => it,
665             Err(req) => {
666                 self.req = Some(req);
667                 return None;
668             }
669         };
670         self.pending_requests.start(PendingRequest {
671             id: id.clone(),
672             method: R::METHOD.to_string(),
673             received: self.request_received,
674         });
675         Some((id, params))
676     }
677
678     fn finish(&mut self) {
679         match self.req.take() {
680             None => (),
681             Some(req) => {
682                 log::error!("unknown request: {:?}", req);
683                 let resp = Response::new_err(
684                     req.id,
685                     ErrorCode::MethodNotFound as i32,
686                     "unknown request".to_string(),
687                 );
688                 self.msg_sender.send(resp.into()).unwrap();
689             }
690         }
691     }
692 }
693
694 fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
695 where
696     R: req::Request + 'static,
697     R::Params: DeserializeOwned + 'static,
698     R::Result: Serialize + 'static,
699 {
700     let response = match result {
701         Ok(resp) => Response::new_ok(id, &resp),
702         Err(e) => match e.downcast::<LspError>() {
703             Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
704             Err(e) => {
705                 if is_canceled(&e) {
706                     // FIXME: When https://github.com/Microsoft/vscode-languageserver-node/issues/457
707                     // gets fixed, we can return the proper response.
708                     // This works around the issue where "content modified" error would continuously
709                     // show an message pop-up in VsCode
710                     // Response::err(
711                     //     id,
712                     //     ErrorCode::ContentModified as i32,
713                     //     "content modified".to_string(),
714                     // )
715                     Response::new_ok(id, ())
716                 } else {
717                     Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
718                 }
719             }
720         },
721     };
722     Task::Respond(response)
723 }
724
725 fn update_file_notifications_on_threadpool(
726     pool: &ThreadPool,
727     world: WorldSnapshot,
728     publish_decorations: bool,
729     sender: Sender<Task>,
730     subscriptions: Vec<FileId>,
731 ) {
732     log::trace!("updating notifications for {:?}", subscriptions);
733     let publish_diagnostics = world.feature_flags().get("lsp.diagnostics");
734     pool.execute(move || {
735         for file_id in subscriptions {
736             if publish_diagnostics {
737                 match handlers::publish_diagnostics(&world, file_id) {
738                     Err(e) => {
739                         if !is_canceled(&e) {
740                             log::error!("failed to compute diagnostics: {:?}", e);
741                         }
742                     }
743                     Ok(params) => {
744                         let not = notification_new::<req::PublishDiagnostics>(params);
745                         sender.send(Task::Notify(not)).unwrap();
746                     }
747                 }
748             }
749             if publish_decorations {
750                 match handlers::publish_decorations(&world, file_id) {
751                     Err(e) => {
752                         if !is_canceled(&e) {
753                             log::error!("failed to compute decorations: {:?}", e);
754                         }
755                     }
756                     Ok(params) => {
757                         let not = notification_new::<req::PublishDecorations>(params);
758                         sender.send(Task::Notify(not)).unwrap();
759                     }
760                 }
761             }
762         }
763     });
764 }
765
766 pub fn show_message(typ: req::MessageType, message: impl Into<String>, sender: &Sender<Message>) {
767     let message = message.into();
768     let params = req::ShowMessageParams { typ, message };
769     let not = notification_new::<req::ShowMessage>(params);
770     sender.send(not.into()).unwrap();
771 }
772
773 fn is_canceled(e: &Box<dyn std::error::Error + Send + Sync>) -> bool {
774     e.downcast_ref::<Canceled>().is_some()
775 }
776
777 fn notification_is<N: lsp_types::notification::Notification>(notification: &Notification) -> bool {
778     notification.method == N::METHOD
779 }
780
781 fn notification_cast<N>(notification: Notification) -> std::result::Result<N::Params, Notification>
782 where
783     N: lsp_types::notification::Notification,
784     N::Params: DeserializeOwned,
785 {
786     notification.extract(N::METHOD)
787 }
788
789 fn notification_new<N>(params: N::Params) -> Notification
790 where
791     N: lsp_types::notification::Notification,
792     N::Params: Serialize,
793 {
794     Notification::new(N::METHOD.to_string(), params)
795 }
796
797 fn request_new<R>(id: RequestId, params: R::Params) -> Request
798 where
799     R: lsp_types::request::Request,
800     R::Params: Serialize,
801 {
802     Request::new(id, R::METHOD.to_string(), params)
803 }