]> git.lizzy.rs Git - minetest.git/blob - src/script/cpp_api/s_async.cpp
Add keybind to swap items between hands
[minetest.git] / src / script / cpp_api / s_async.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <cstdio>
21 #include <cstdlib>
22
23 extern "C" {
24 #include <lua.h>
25 #include <lauxlib.h>
26 #include <lualib.h>
27 }
28
29 #include "server.h"
30 #include "s_async.h"
31 #include "log.h"
32 #include "filesys.h"
33 #include "porting.h"
34 #include "common/c_internal.h"
35 #include "common/c_packer.h"
36 #include "lua_api/l_base.h"
37
38 /******************************************************************************/
39 AsyncEngine::~AsyncEngine()
40 {
41         // Request all threads to stop
42         for (AsyncWorkerThread *workerThread : workerThreads) {
43                 workerThread->stop();
44         }
45
46         // Wake up all threads
47         for (auto it : workerThreads) {
48                 (void)it;
49                 jobQueueCounter.post();
50         }
51
52         // Wait for threads to finish
53         for (AsyncWorkerThread *workerThread : workerThreads) {
54                 workerThread->wait();
55         }
56
57         // Force kill all threads
58         for (AsyncWorkerThread *workerThread : workerThreads) {
59                 delete workerThread;
60         }
61
62         jobQueueMutex.lock();
63         jobQueue.clear();
64         jobQueueMutex.unlock();
65         workerThreads.clear();
66 }
67
68 /******************************************************************************/
69 void AsyncEngine::registerStateInitializer(StateInitializer func)
70 {
71         FATAL_ERROR_IF(initDone, "Initializer may not be registered after init");
72         stateInitializers.push_back(func);
73 }
74
75 /******************************************************************************/
76 void AsyncEngine::initialize(unsigned int numEngines)
77 {
78         initDone = true;
79
80         if (numEngines == 0) {
81                 // Leave one core for the main thread and one for whatever else
82                 autoscaleMaxWorkers = Thread::getNumberOfProcessors();
83                 if (autoscaleMaxWorkers >= 2)
84                         autoscaleMaxWorkers -= 2;
85                 infostream << "AsyncEngine: using at most " << autoscaleMaxWorkers
86                         << " threads with automatic scaling" << std::endl;
87
88                 addWorkerThread();
89         } else {
90                 for (unsigned int i = 0; i < numEngines; i++)
91                         addWorkerThread();
92         }
93 }
94
95 void AsyncEngine::addWorkerThread()
96 {
97         AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
98                 std::string("AsyncWorker-") + itos(workerThreads.size()));
99         workerThreads.push_back(toAdd);
100         toAdd->start();
101 }
102
103 /******************************************************************************/
104 u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &&params,
105                 const std::string &mod_origin)
106 {
107         MutexAutoLock autolock(jobQueueMutex);
108         u32 jobId = jobIdCounter++;
109
110         jobQueue.emplace_back();
111         auto &to_add = jobQueue.back();
112         to_add.id = jobId;
113         to_add.function = std::move(func);
114         to_add.params = std::move(params);
115         to_add.mod_origin = mod_origin;
116
117         jobQueueCounter.post();
118         return jobId;
119 }
120
121 u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params,
122                 const std::string &mod_origin)
123 {
124         MutexAutoLock autolock(jobQueueMutex);
125         u32 jobId = jobIdCounter++;
126
127         jobQueue.emplace_back();
128         auto &to_add = jobQueue.back();
129         to_add.id = jobId;
130         to_add.function = std::move(func);
131         to_add.params_ext.reset(params);
132         to_add.mod_origin = mod_origin;
133
134         jobQueueCounter.post();
135         return jobId;
136 }
137
138 /******************************************************************************/
139 bool AsyncEngine::getJob(LuaJobInfo *job)
140 {
141         jobQueueCounter.wait();
142         jobQueueMutex.lock();
143
144         bool retval = false;
145
146         if (!jobQueue.empty()) {
147                 *job = std::move(jobQueue.front());
148                 jobQueue.pop_front();
149                 retval = true;
150         }
151         jobQueueMutex.unlock();
152
153         return retval;
154 }
155
156 /******************************************************************************/
157 void AsyncEngine::putJobResult(LuaJobInfo &&result)
158 {
159         resultQueueMutex.lock();
160         resultQueue.emplace_back(std::move(result));
161         resultQueueMutex.unlock();
162 }
163
164 /******************************************************************************/
165 void AsyncEngine::step(lua_State *L)
166 {
167         stepJobResults(L);
168         stepAutoscale();
169 }
170
171 void AsyncEngine::stepJobResults(lua_State *L)
172 {
173         int error_handler = PUSH_ERROR_HANDLER(L);
174         lua_getglobal(L, "core");
175
176         ScriptApiBase *script = ModApiBase::getScriptApiBase(L);
177
178         MutexAutoLock autolock(resultQueueMutex);
179         while (!resultQueue.empty()) {
180                 LuaJobInfo j = std::move(resultQueue.front());
181                 resultQueue.pop_front();
182
183                 lua_getfield(L, -1, "async_event_handler");
184                 if (lua_isnil(L, -1))
185                         FATAL_ERROR("Async event handler does not exist!");
186                 luaL_checktype(L, -1, LUA_TFUNCTION);
187
188                 lua_pushinteger(L, j.id);
189                 if (j.result_ext)
190                         script_unpack(L, j.result_ext.get());
191                 else
192                         lua_pushlstring(L, j.result.data(), j.result.size());
193
194                 // Call handler
195                 const char *origin = j.mod_origin.empty() ? nullptr : j.mod_origin.c_str();
196                 script->setOriginDirect(origin);
197                 int result = lua_pcall(L, 2, 0, error_handler);
198                 if (result)
199                         script_error(L, result, origin, "<async>");
200         }
201
202         lua_pop(L, 2); // Pop core and error handler
203 }
204
205 void AsyncEngine::stepAutoscale()
206 {
207         if (workerThreads.size() >= autoscaleMaxWorkers)
208                 return;
209
210         MutexAutoLock autolock(jobQueueMutex);
211
212         // 2) If the timer elapsed, check again
213         if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) {
214                 autoscaleTimer = 0;
215                 // Determine overlap with previous snapshot
216                 unsigned int n = 0;
217                 for (const auto &it : jobQueue)
218                         n += autoscaleSeenJobs.count(it.id);
219                 autoscaleSeenJobs.clear();
220                 infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl;
221                 // Start this many new threads
222                 while (workerThreads.size() < autoscaleMaxWorkers && n > 0) {
223                         addWorkerThread();
224                         n--;
225                 }
226                 return;
227         }
228
229         // 1) Check if there's anything in the queue
230         if (!autoscaleTimer && !jobQueue.empty()) {
231                 // Take a snapshot of all jobs we have seen
232                 for (const auto &it : jobQueue)
233                         autoscaleSeenJobs.emplace(it.id);
234                 // and set a timer for 1 second
235                 autoscaleTimer = porting::getTimeMs() + 1000;
236         }
237 }
238
239 /******************************************************************************/
240 bool AsyncEngine::prepareEnvironment(lua_State* L, int top)
241 {
242         for (StateInitializer &stateInitializer : stateInitializers) {
243                 stateInitializer(L, top);
244         }
245
246         auto *script = ModApiBase::getScriptApiBase(L);
247         try {
248                 script->loadMod(Server::getBuiltinLuaPath() + DIR_DELIM + "init.lua",
249                         BUILTIN_MOD_NAME);
250                 script->checkSetByBuiltin();
251         } catch (const ModError &e) {
252                 errorstream << "Execution of async base environment failed: "
253                         << e.what() << std::endl;
254                 FATAL_ERROR("Execution of async base environment failed");
255         }
256
257         // Load per mod stuff
258         if (server) {
259                 const auto &list = server->m_async_init_files;
260                 try {
261                         for (auto &it : list)
262                                 script->loadMod(it.second, it.first);
263                 } catch (const ModError &e) {
264                         errorstream << "Failed to load mod script inside async environment." << std::endl;
265                         server->setAsyncFatalError(e.what());
266                         return false;
267                 }
268         }
269
270         return true;
271 }
272
273 /******************************************************************************/
274 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
275                 const std::string &name) :
276         ScriptApiBase(ScriptingType::Async),
277         Thread(name),
278         jobDispatcher(jobDispatcher)
279 {
280         lua_State *L = getStack();
281
282         if (jobDispatcher->server) {
283                 setGameDef(jobDispatcher->server);
284
285                 if (g_settings->getBool("secure.enable_security"))
286                         initializeSecurity();
287         }
288
289         // Prepare job lua environment
290         lua_getglobal(L, "core");
291         int top = lua_gettop(L);
292
293         // Push builtin initialization type
294         lua_pushstring(L, jobDispatcher->server ? "async_game" : "async");
295         lua_setglobal(L, "INIT");
296
297         if (!jobDispatcher->prepareEnvironment(L, top)) {
298                 // can't throw from here so we're stuck with this
299                 isErrored = true;
300         }
301 }
302
303 /******************************************************************************/
304 AsyncWorkerThread::~AsyncWorkerThread()
305 {
306         sanity_check(!isRunning());
307 }
308
309 /******************************************************************************/
310 void* AsyncWorkerThread::run()
311 {
312         if (isErrored)
313                 return nullptr;
314
315         lua_State *L = getStack();
316
317         int error_handler = PUSH_ERROR_HANDLER(L);
318
319         auto report_error = [this] (const ModError &e) {
320                 if (jobDispatcher->server)
321                         jobDispatcher->server->setAsyncFatalError(e.what());
322                 else
323                         errorstream << e.what() << std::endl;
324         };
325
326         lua_getglobal(L, "core");
327         if (lua_isnil(L, -1)) {
328                 FATAL_ERROR("Unable to find core within async environment!");
329         }
330
331         // Main loop
332         LuaJobInfo j;
333         while (!stopRequested()) {
334                 // Wait for job
335                 if (!jobDispatcher->getJob(&j) || stopRequested())
336                         continue;
337
338                 const bool use_ext = !!j.params_ext;
339
340                 lua_getfield(L, -1, "job_processor");
341                 if (lua_isnil(L, -1))
342                         FATAL_ERROR("Unable to get async job processor!");
343                 luaL_checktype(L, -1, LUA_TFUNCTION);
344
345                 if (luaL_loadbuffer(L, j.function.data(), j.function.size(), "=(async)")) {
346                         errorstream << "ASYNC WORKER: Unable to deserialize function" << std::endl;
347                         lua_pushnil(L);
348                 }
349                 if (use_ext)
350                         script_unpack(L, j.params_ext.get());
351                 else
352                         lua_pushlstring(L, j.params.data(), j.params.size());
353
354                 // Call it
355                 setOriginDirect(j.mod_origin.empty() ? nullptr : j.mod_origin.c_str());
356                 int result = lua_pcall(L, 2, 1, error_handler);
357                 if (result) {
358                         try {
359                                 scriptError(result, "<async>");
360                         } catch (const ModError &e) {
361                                 report_error(e);
362                         }
363                 } else {
364                         // Fetch result
365                         if (use_ext) {
366                                 try {
367                                         j.result_ext.reset(script_pack(L, -1));
368                                 } catch (const ModError &e) {
369                                         report_error(e);
370                                         result = LUA_ERRERR;
371                                 }
372                         } else {
373                                 size_t length;
374                                 const char *retval = lua_tolstring(L, -1, &length);
375                                 j.result.assign(retval, length);
376                         }
377                 }
378
379                 lua_pop(L, 1);  // Pop retval
380
381                 // Put job result
382                 if (result == 0)
383                         jobDispatcher->putJobResult(std::move(j));
384         }
385
386         lua_pop(L, 2);  // Pop core and error handler
387
388         return 0;
389 }
390