]> git.lizzy.rs Git - rust.git/blob - crates/ra_lsp_server/src/main_loop.rs
automatically collect garbage
[rust.git] / crates / ra_lsp_server / src / main_loop.rs
1 mod handlers;
2 mod subscriptions;
3
4 use std::{fmt, path::PathBuf, sync::Arc};
5
6 use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
7 use failure::{bail, format_err};
8 use failure_derive::Fail;
9 use gen_lsp_server::{
10     handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse,
11 };
12 use lsp_types::NumberOrString;
13 use ra_ide_api::{Canceled, FileId, LibraryData};
14 use ra_vfs::VfsTask;
15 use rustc_hash::FxHashSet;
16 use serde::{de::DeserializeOwned, Serialize};
17 use threadpool::ThreadPool;
18
19 use crate::{
20     main_loop::subscriptions::Subscriptions,
21     project_model::workspace_loader,
22     req,
23     server_world::{ServerWorld, ServerWorldState},
24     Result,
25 };
26
27 #[derive(Debug, Fail)]
28 #[fail(
29     display = "Language Server request failed with {}. ({})",
30     code, message
31 )]
32 pub struct LspError {
33     pub code: i32,
34     pub message: String,
35 }
36
37 impl LspError {
38     pub fn new(code: i32, message: String) -> LspError {
39         LspError { code, message }
40     }
41 }
42
43 #[derive(Debug)]
44 enum Task {
45     Respond(RawResponse),
46     Notify(RawNotification),
47 }
48
49 const THREADPOOL_SIZE: usize = 8;
50
51 pub fn main_loop(
52     internal_mode: bool,
53     ws_root: PathBuf,
54     supports_decorations: bool,
55     msg_receiver: &Receiver<RawMessage>,
56     msg_sender: &Sender<RawMessage>,
57 ) -> Result<()> {
58     let pool = ThreadPool::new(THREADPOOL_SIZE);
59     let (task_sender, task_receiver) = unbounded::<Task>();
60     let (ws_worker, ws_watcher) = workspace_loader();
61
62     ws_worker.send(ws_root.clone()).unwrap();
63     // FIXME: support dynamic workspace loading.
64     let workspaces = match ws_worker.recv().unwrap() {
65         Ok(ws) => vec![ws],
66         Err(e) => {
67             log::error!("loading workspace failed: {}", e);
68             Vec::new()
69         }
70     };
71     ws_worker.shutdown();
72     ws_watcher
73         .shutdown()
74         .map_err(|_| format_err!("ws watcher died"))?;
75     let mut state = ServerWorldState::new(ws_root.clone(), workspaces);
76
77     log::info!("server initialized, serving requests");
78
79     let mut pending_requests = FxHashSet::default();
80     let mut subs = Subscriptions::new();
81     let main_res = main_loop_inner(
82         internal_mode,
83         supports_decorations,
84         &pool,
85         msg_sender,
86         msg_receiver,
87         task_sender,
88         task_receiver.clone(),
89         &mut state,
90         &mut pending_requests,
91         &mut subs,
92     );
93
94     log::info!("waiting for tasks to finish...");
95     task_receiver
96         .into_iter()
97         .for_each(|task| on_task(task, msg_sender, &mut pending_requests));
98     log::info!("...tasks have finished");
99     log::info!("joining threadpool...");
100     drop(pool);
101     log::info!("...threadpool has finished");
102
103     let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
104     let vfs_res = vfs.into_inner().shutdown();
105
106     main_res?;
107     vfs_res.map_err(|_| format_err!("fs watcher died"))?;
108
109     Ok(())
110 }
111
112 enum Event {
113     Msg(RawMessage),
114     Task(Task),
115     Vfs(VfsTask),
116     Lib(LibraryData),
117 }
118
119 impl fmt::Debug for Event {
120     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121         let debug_verbose_not = |not: &RawNotification, f: &mut fmt::Formatter| {
122             f.debug_struct("RawNotification")
123                 .field("method", &not.method)
124                 .finish()
125         };
126
127         match self {
128             Event::Msg(RawMessage::Notification(not)) => {
129                 if not.is::<req::DidOpenTextDocument>() || not.is::<req::DidChangeTextDocument>() {
130                     return debug_verbose_not(not, f);
131                 }
132             }
133             Event::Task(Task::Notify(not)) => {
134                 if not.is::<req::PublishDecorations>() || not.is::<req::PublishDiagnostics>() {
135                     return debug_verbose_not(not, f);
136                 }
137             }
138             Event::Task(Task::Respond(resp)) => {
139                 return f
140                     .debug_struct("RawResponse")
141                     .field("id", &resp.id)
142                     .field("error", &resp.error)
143                     .finish();
144             }
145             _ => (),
146         }
147         match self {
148             Event::Msg(it) => fmt::Debug::fmt(it, f),
149             Event::Task(it) => fmt::Debug::fmt(it, f),
150             Event::Vfs(it) => fmt::Debug::fmt(it, f),
151             Event::Lib(it) => fmt::Debug::fmt(it, f),
152         }
153     }
154 }
155
156 fn main_loop_inner(
157     internal_mode: bool,
158     supports_decorations: bool,
159     pool: &ThreadPool,
160     msg_sender: &Sender<RawMessage>,
161     msg_receiver: &Receiver<RawMessage>,
162     task_sender: Sender<Task>,
163     task_receiver: Receiver<Task>,
164     state: &mut ServerWorldState,
165     pending_requests: &mut FxHashSet<u64>,
166     subs: &mut Subscriptions,
167 ) -> Result<()> {
168     // We try not to index more than THREADPOOL_SIZE - 3 libraries at the same
169     // time to always have a thread ready to react to input.
170     let mut in_flight_libraries = 0;
171     let mut pending_libraries = Vec::new();
172
173     let (libdata_sender, libdata_receiver) = unbounded();
174     loop {
175         state.maybe_collect_garbage();
176         log::trace!("selecting");
177         let event = select! {
178             recv(msg_receiver) -> msg => match msg {
179                 Ok(msg) => Event::Msg(msg),
180                 Err(RecvError) => bail!("client exited without shutdown"),
181             },
182             recv(task_receiver) -> task => Event::Task(task.unwrap()),
183             recv(state.vfs.read().task_receiver()) -> task => match task {
184                 Ok(task) => Event::Vfs(task),
185                 Err(RecvError) => bail!("vfs died"),
186             },
187             recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
188         };
189         log::info!("loop_turn = {:?}", event);
190         let start = std::time::Instant::now();
191         let mut state_changed = false;
192         match event {
193             Event::Task(task) => on_task(task, msg_sender, pending_requests),
194             Event::Vfs(task) => {
195                 state.vfs.write().handle_task(task);
196                 state_changed = true;
197             }
198             Event::Lib(lib) => {
199                 feedback(internal_mode, "library loaded", msg_sender);
200                 state.add_lib(lib);
201                 in_flight_libraries -= 1;
202             }
203             Event::Msg(msg) => match msg {
204                 RawMessage::Request(req) => {
205                     let req = match handle_shutdown(req, msg_sender) {
206                         Some(req) => req,
207                         None => return Ok(()),
208                     };
209                     match req.cast::<req::CollectGarbage>() {
210                         Ok((id, ())) => {
211                             state.collect_garbage();
212                             let resp = RawResponse::ok::<req::CollectGarbage>(id, &());
213                             msg_sender.send(RawMessage::Response(resp)).unwrap()
214                         }
215                         Err(req) => {
216                             match on_request(state, pending_requests, pool, &task_sender, req)? {
217                                 None => (),
218                                 Some(req) => {
219                                     log::error!("unknown request: {:?}", req);
220                                     let resp = RawResponse::err(
221                                         req.id,
222                                         ErrorCode::MethodNotFound as i32,
223                                         "unknown request".to_string(),
224                                     );
225                                     msg_sender.send(RawMessage::Response(resp)).unwrap()
226                                 }
227                             }
228                         }
229                     }
230                 }
231                 RawMessage::Notification(not) => {
232                     on_notification(msg_sender, state, pending_requests, subs, not)?;
233                     state_changed = true;
234                 }
235                 RawMessage::Response(resp) => log::error!("unexpected response: {:?}", resp),
236             },
237         };
238
239         pending_libraries.extend(state.process_changes());
240         while in_flight_libraries < THREADPOOL_SIZE - 3 && !pending_libraries.is_empty() {
241             let (root, files) = pending_libraries.pop().unwrap();
242             in_flight_libraries += 1;
243             let sender = libdata_sender.clone();
244             pool.execute(move || {
245                 let start = ::std::time::Instant::now();
246                 log::info!("indexing {:?} ... ", root);
247                 let data = LibraryData::prepare(root, files);
248                 log::info!("indexed {:?} {:?}", start.elapsed(), root);
249                 sender.send(data).unwrap();
250             });
251         }
252
253         if state.roots_to_scan == 0 && pending_libraries.is_empty() && in_flight_libraries == 0 {
254             feedback(internal_mode, "workspace loaded", msg_sender);
255         }
256
257         if state_changed {
258             update_file_notifications_on_threadpool(
259                 pool,
260                 state.snapshot(),
261                 supports_decorations,
262                 task_sender.clone(),
263                 subs.subscriptions(),
264             )
265         }
266         log::info!("loop_turn = {:?}", start.elapsed());
267     }
268 }
269
270 fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut FxHashSet<u64>) {
271     match task {
272         Task::Respond(response) => {
273             if pending_requests.remove(&response.id) {
274                 msg_sender.send(RawMessage::Response(response)).unwrap();
275             }
276         }
277         Task::Notify(n) => {
278             msg_sender.send(RawMessage::Notification(n)).unwrap();
279         }
280     }
281 }
282
283 fn on_request(
284     world: &mut ServerWorldState,
285     pending_requests: &mut FxHashSet<u64>,
286     pool: &ThreadPool,
287     sender: &Sender<Task>,
288     req: RawRequest,
289 ) -> Result<Option<RawRequest>> {
290     let mut pool_dispatcher = PoolDispatcher {
291         req: Some(req),
292         res: None,
293         pool,
294         world,
295         sender,
296     };
297     let req = pool_dispatcher
298         .on::<req::AnalyzerStatus>(handlers::handle_analyzer_status)?
299         .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
300         .on::<req::ExtendSelection>(handlers::handle_extend_selection)?
301         .on::<req::FindMatchingBrace>(handlers::handle_find_matching_brace)?
302         .on::<req::JoinLines>(handlers::handle_join_lines)?
303         .on::<req::OnEnter>(handlers::handle_on_enter)?
304         .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)?
305         .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)?
306         .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
307         .on::<req::GotoDefinition>(handlers::handle_goto_definition)?
308         .on::<req::ParentModule>(handlers::handle_parent_module)?
309         .on::<req::Runnables>(handlers::handle_runnables)?
310         .on::<req::DecorationsRequest>(handlers::handle_decorations)?
311         .on::<req::Completion>(handlers::handle_completion)?
312         .on::<req::CodeActionRequest>(handlers::handle_code_action)?
313         .on::<req::CodeLensRequest>(handlers::handle_code_lens)?
314         .on::<req::FoldingRangeRequest>(handlers::handle_folding_range)?
315         .on::<req::SignatureHelpRequest>(handlers::handle_signature_help)?
316         .on::<req::HoverRequest>(handlers::handle_hover)?
317         .on::<req::PrepareRenameRequest>(handlers::handle_prepare_rename)?
318         .on::<req::Rename>(handlers::handle_rename)?
319         .on::<req::References>(handlers::handle_references)?
320         .on::<req::Formatting>(handlers::handle_formatting)?
321         .on::<req::DocumentHighlightRequest>(handlers::handle_document_highlight)?
322         .finish();
323     match req {
324         Ok(id) => {
325             let inserted = pending_requests.insert(id);
326             assert!(inserted, "duplicate request: {}", id);
327             Ok(None)
328         }
329         Err(req) => Ok(Some(req)),
330     }
331 }
332
333 fn on_notification(
334     msg_sender: &Sender<RawMessage>,
335     state: &mut ServerWorldState,
336     pending_requests: &mut FxHashSet<u64>,
337     subs: &mut Subscriptions,
338     not: RawNotification,
339 ) -> Result<()> {
340     let not = match not.cast::<req::Cancel>() {
341         Ok(params) => {
342             let id = match params.id {
343                 NumberOrString::Number(id) => id,
344                 NumberOrString::String(id) => {
345                     panic!("string id's not supported: {:?}", id);
346                 }
347             };
348             if pending_requests.remove(&id) {
349                 let response = RawResponse::err(
350                     id,
351                     ErrorCode::RequestCanceled as i32,
352                     "canceled by client".to_string(),
353                 );
354                 msg_sender.send(RawMessage::Response(response)).unwrap()
355             }
356             return Ok(());
357         }
358         Err(not) => not,
359     };
360     let not = match not.cast::<req::DidOpenTextDocument>() {
361         Ok(params) => {
362             let uri = params.text_document.uri;
363             let path = uri
364                 .to_file_path()
365                 .map_err(|()| format_err!("invalid uri: {}", uri))?;
366             if let Some(file_id) = state
367                 .vfs
368                 .write()
369                 .add_file_overlay(&path, params.text_document.text)
370             {
371                 subs.add_sub(FileId(file_id.0.into()));
372             }
373             return Ok(());
374         }
375         Err(not) => not,
376     };
377     let not = match not.cast::<req::DidChangeTextDocument>() {
378         Ok(mut params) => {
379             let uri = params.text_document.uri;
380             let path = uri
381                 .to_file_path()
382                 .map_err(|()| format_err!("invalid uri: {}", uri))?;
383             let text = params
384                 .content_changes
385                 .pop()
386                 .ok_or_else(|| format_err!("empty changes"))?
387                 .text;
388             state.vfs.write().change_file_overlay(path.as_path(), text);
389             return Ok(());
390         }
391         Err(not) => not,
392     };
393     let not = match not.cast::<req::DidCloseTextDocument>() {
394         Ok(params) => {
395             let uri = params.text_document.uri;
396             let path = uri
397                 .to_file_path()
398                 .map_err(|()| format_err!("invalid uri: {}", uri))?;
399             if let Some(file_id) = state.vfs.write().remove_file_overlay(path.as_path()) {
400                 subs.remove_sub(FileId(file_id.0.into()));
401             }
402             let params = req::PublishDiagnosticsParams {
403                 uri,
404                 diagnostics: Vec::new(),
405             };
406             let not = RawNotification::new::<req::PublishDiagnostics>(&params);
407             msg_sender.send(RawMessage::Notification(not)).unwrap();
408             return Ok(());
409         }
410         Err(not) => not,
411     };
412     log::error!("unhandled notification: {:?}", not);
413     Ok(())
414 }
415
416 struct PoolDispatcher<'a> {
417     req: Option<RawRequest>,
418     res: Option<u64>,
419     pool: &'a ThreadPool,
420     world: &'a ServerWorldState,
421     sender: &'a Sender<Task>,
422 }
423
424 impl<'a> PoolDispatcher<'a> {
425     fn on<R>(&mut self, f: fn(ServerWorld, R::Params) -> Result<R::Result>) -> Result<&mut Self>
426     where
427         R: req::Request,
428         R::Params: DeserializeOwned + Send + 'static,
429         R::Result: Serialize + 'static,
430     {
431         let req = match self.req.take() {
432             None => return Ok(self),
433             Some(req) => req,
434         };
435         match req.cast::<R>() {
436             Ok((id, params)) => {
437                 let world = self.world.snapshot();
438                 let sender = self.sender.clone();
439                 self.pool.execute(move || {
440                     let resp = match f(world, params) {
441                         Ok(resp) => RawResponse::ok::<R>(id, &resp),
442                         Err(e) => match e.downcast::<LspError>() {
443                             Ok(lsp_error) => {
444                                 RawResponse::err(id, lsp_error.code, lsp_error.message)
445                             }
446                             Err(e) => {
447                                 if is_canceled(&e) {
448                                     RawResponse::err(
449                                         id,
450                                         ErrorCode::ContentModified as i32,
451                                         "content modified".to_string(),
452                                     )
453                                 } else {
454                                     RawResponse::err(
455                                         id,
456                                         ErrorCode::InternalError as i32,
457                                         format!("{}\n{}", e, e.backtrace()),
458                                     )
459                                 }
460                             }
461                         },
462                     };
463                     let task = Task::Respond(resp);
464                     sender.send(task).unwrap();
465                 });
466                 self.res = Some(id);
467             }
468             Err(req) => self.req = Some(req),
469         }
470         Ok(self)
471     }
472
473     fn finish(&mut self) -> ::std::result::Result<u64, RawRequest> {
474         match (self.res.take(), self.req.take()) {
475             (Some(res), None) => Ok(res),
476             (None, Some(req)) => Err(req),
477             _ => unreachable!(),
478         }
479     }
480 }
481
482 fn update_file_notifications_on_threadpool(
483     pool: &ThreadPool,
484     world: ServerWorld,
485     publish_decorations: bool,
486     sender: Sender<Task>,
487     subscriptions: Vec<FileId>,
488 ) {
489     pool.execute(move || {
490         for file_id in subscriptions {
491             match handlers::publish_diagnostics(&world, file_id) {
492                 Err(e) => {
493                     if !is_canceled(&e) {
494                         log::error!("failed to compute diagnostics: {:?}", e);
495                     }
496                 }
497                 Ok(params) => {
498                     let not = RawNotification::new::<req::PublishDiagnostics>(&params);
499                     sender.send(Task::Notify(not)).unwrap();
500                 }
501             }
502             if publish_decorations {
503                 match handlers::publish_decorations(&world, file_id) {
504                     Err(e) => {
505                         if !is_canceled(&e) {
506                             log::error!("failed to compute decorations: {:?}", e);
507                         }
508                     }
509                     Ok(params) => {
510                         let not = RawNotification::new::<req::PublishDecorations>(&params);
511                         sender.send(Task::Notify(not)).unwrap();
512                     }
513                 }
514             }
515         }
516     });
517 }
518
519 fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) {
520     if !intrnal_mode {
521         return;
522     }
523     let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string());
524     sender.send(RawMessage::Notification(not)).unwrap();
525 }
526
527 fn is_canceled(e: &failure::Error) -> bool {
528     e.downcast_ref::<Canceled>().is_some()
529 }