3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
34 #include "common/c_internal.h"
35 #include "common/c_packer.h"
36 #include "lua_api/l_base.h"
38 /******************************************************************************/
39 AsyncEngine::~AsyncEngine()
41 // Request all threads to stop
42 for (AsyncWorkerThread *workerThread : workerThreads) {
46 // Wake up all threads
47 for (auto it : workerThreads) {
49 jobQueueCounter.post();
52 // Wait for threads to finish
53 for (AsyncWorkerThread *workerThread : workerThreads) {
57 // Force kill all threads
58 for (AsyncWorkerThread *workerThread : workerThreads) {
64 jobQueueMutex.unlock();
65 workerThreads.clear();
68 /******************************************************************************/
69 void AsyncEngine::registerStateInitializer(StateInitializer func)
71 FATAL_ERROR_IF(initDone, "Initializer may not be registered after init");
72 stateInitializers.push_back(func);
75 /******************************************************************************/
76 void AsyncEngine::initialize(unsigned int numEngines)
80 if (numEngines == 0) {
81 // Leave one core for the main thread and one for whatever else
82 autoscaleMaxWorkers = Thread::getNumberOfProcessors();
83 if (autoscaleMaxWorkers >= 2)
84 autoscaleMaxWorkers -= 2;
85 infostream << "AsyncEngine: using at most " << autoscaleMaxWorkers
86 << " threads with automatic scaling" << std::endl;
90 for (unsigned int i = 0; i < numEngines; i++)
95 void AsyncEngine::addWorkerThread()
97 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
98 std::string("AsyncWorker-") + itos(workerThreads.size()));
99 workerThreads.push_back(toAdd);
103 /******************************************************************************/
104 u32 AsyncEngine::queueAsyncJob(std::string &&func, std::string &¶ms,
105 const std::string &mod_origin)
107 MutexAutoLock autolock(jobQueueMutex);
108 u32 jobId = jobIdCounter++;
110 jobQueue.emplace_back();
111 auto &to_add = jobQueue.back();
113 to_add.function = std::move(func);
114 to_add.params = std::move(params);
115 to_add.mod_origin = mod_origin;
117 jobQueueCounter.post();
121 u32 AsyncEngine::queueAsyncJob(std::string &&func, PackedValue *params,
122 const std::string &mod_origin)
124 MutexAutoLock autolock(jobQueueMutex);
125 u32 jobId = jobIdCounter++;
127 jobQueue.emplace_back();
128 auto &to_add = jobQueue.back();
130 to_add.function = std::move(func);
131 to_add.params_ext.reset(params);
132 to_add.mod_origin = mod_origin;
134 jobQueueCounter.post();
138 /******************************************************************************/
139 bool AsyncEngine::getJob(LuaJobInfo *job)
141 jobQueueCounter.wait();
142 jobQueueMutex.lock();
146 if (!jobQueue.empty()) {
147 *job = std::move(jobQueue.front());
148 jobQueue.pop_front();
151 jobQueueMutex.unlock();
156 /******************************************************************************/
157 void AsyncEngine::putJobResult(LuaJobInfo &&result)
159 resultQueueMutex.lock();
160 resultQueue.emplace_back(std::move(result));
161 resultQueueMutex.unlock();
164 /******************************************************************************/
165 void AsyncEngine::step(lua_State *L)
171 void AsyncEngine::stepJobResults(lua_State *L)
173 int error_handler = PUSH_ERROR_HANDLER(L);
174 lua_getglobal(L, "core");
176 ScriptApiBase *script = ModApiBase::getScriptApiBase(L);
178 MutexAutoLock autolock(resultQueueMutex);
179 while (!resultQueue.empty()) {
180 LuaJobInfo j = std::move(resultQueue.front());
181 resultQueue.pop_front();
183 lua_getfield(L, -1, "async_event_handler");
184 if (lua_isnil(L, -1))
185 FATAL_ERROR("Async event handler does not exist!");
186 luaL_checktype(L, -1, LUA_TFUNCTION);
188 lua_pushinteger(L, j.id);
190 script_unpack(L, j.result_ext.get());
192 lua_pushlstring(L, j.result.data(), j.result.size());
195 const char *origin = j.mod_origin.empty() ? nullptr : j.mod_origin.c_str();
196 script->setOriginDirect(origin);
197 int result = lua_pcall(L, 2, 0, error_handler);
199 script_error(L, result, origin, "<async>");
202 lua_pop(L, 2); // Pop core and error handler
205 void AsyncEngine::stepAutoscale()
207 if (workerThreads.size() >= autoscaleMaxWorkers)
210 MutexAutoLock autolock(jobQueueMutex);
212 // 2) If the timer elapsed, check again
213 if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) {
215 // Determine overlap with previous snapshot
217 for (const auto &it : jobQueue)
218 n += autoscaleSeenJobs.count(it.id);
219 autoscaleSeenJobs.clear();
220 infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl;
221 // Start this many new threads
222 while (workerThreads.size() < autoscaleMaxWorkers && n > 0) {
229 // 1) Check if there's anything in the queue
230 if (!autoscaleTimer && !jobQueue.empty()) {
231 // Take a snapshot of all jobs we have seen
232 for (const auto &it : jobQueue)
233 autoscaleSeenJobs.emplace(it.id);
234 // and set a timer for 1 second
235 autoscaleTimer = porting::getTimeMs() + 1000;
239 /******************************************************************************/
240 bool AsyncEngine::prepareEnvironment(lua_State* L, int top)
242 for (StateInitializer &stateInitializer : stateInitializers) {
243 stateInitializer(L, top);
246 auto *script = ModApiBase::getScriptApiBase(L);
248 script->loadMod(Server::getBuiltinLuaPath() + DIR_DELIM + "init.lua",
250 script->checkSetByBuiltin();
251 } catch (const ModError &e) {
252 errorstream << "Execution of async base environment failed: "
253 << e.what() << std::endl;
254 FATAL_ERROR("Execution of async base environment failed");
257 // Load per mod stuff
259 const auto &list = server->m_async_init_files;
261 for (auto &it : list)
262 script->loadMod(it.second, it.first);
263 } catch (const ModError &e) {
264 errorstream << "Failed to load mod script inside async environment." << std::endl;
265 server->setAsyncFatalError(e.what());
273 /******************************************************************************/
274 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
275 const std::string &name) :
276 ScriptApiBase(ScriptingType::Async),
278 jobDispatcher(jobDispatcher)
280 lua_State *L = getStack();
282 if (jobDispatcher->server) {
283 setGameDef(jobDispatcher->server);
285 if (g_settings->getBool("secure.enable_security"))
286 initializeSecurity();
289 // Prepare job lua environment
290 lua_getglobal(L, "core");
291 int top = lua_gettop(L);
293 // Push builtin initialization type
294 lua_pushstring(L, jobDispatcher->server ? "async_game" : "async");
295 lua_setglobal(L, "INIT");
297 if (!jobDispatcher->prepareEnvironment(L, top)) {
298 // can't throw from here so we're stuck with this
303 /******************************************************************************/
304 AsyncWorkerThread::~AsyncWorkerThread()
306 sanity_check(!isRunning());
309 /******************************************************************************/
310 void* AsyncWorkerThread::run()
315 lua_State *L = getStack();
317 int error_handler = PUSH_ERROR_HANDLER(L);
319 auto report_error = [this] (const ModError &e) {
320 if (jobDispatcher->server)
321 jobDispatcher->server->setAsyncFatalError(e.what());
323 errorstream << e.what() << std::endl;
326 lua_getglobal(L, "core");
327 if (lua_isnil(L, -1)) {
328 FATAL_ERROR("Unable to find core within async environment!");
333 while (!stopRequested()) {
335 if (!jobDispatcher->getJob(&j) || stopRequested())
338 const bool use_ext = !!j.params_ext;
340 lua_getfield(L, -1, "job_processor");
341 if (lua_isnil(L, -1))
342 FATAL_ERROR("Unable to get async job processor!");
343 luaL_checktype(L, -1, LUA_TFUNCTION);
345 if (luaL_loadbuffer(L, j.function.data(), j.function.size(), "=(async)")) {
346 errorstream << "ASYNC WORKER: Unable to deserialize function" << std::endl;
350 script_unpack(L, j.params_ext.get());
352 lua_pushlstring(L, j.params.data(), j.params.size());
355 setOriginDirect(j.mod_origin.empty() ? nullptr : j.mod_origin.c_str());
356 int result = lua_pcall(L, 2, 1, error_handler);
359 scriptError(result, "<async>");
360 } catch (const ModError &e) {
367 j.result_ext.reset(script_pack(L, -1));
368 } catch (const ModError &e) {
374 const char *retval = lua_tolstring(L, -1, &length);
375 j.result.assign(retval, length);
379 lua_pop(L, 1); // Pop retval
383 jobDispatcher->putJobResult(std::move(j));
386 lua_pop(L, 2); // Pop core and error handler