]> git.lizzy.rs Git - minetest.git/blobdiff - src/script/cpp_api/s_async.cpp
[CSM] Add core.get_timeofday & core.get_day_count env calls (#5401)
[minetest.git] / src / script / cpp_api / s_async.cpp
index 603b6fe9c6fb01b231edb2011d0e9b3cf8223d77..1fb84fab66e7c83fe19dd8f1a1e7b760ad9c7e21 100644 (file)
@@ -26,6 +26,7 @@ extern "C" {
 #include "lualib.h"
 }
 
+#include "server.h"
 #include "s_async.h"
 #include "log.h"
 #include "filesys.h"
@@ -34,82 +35,103 @@ extern "C" {
 
 /******************************************************************************/
 AsyncEngine::AsyncEngine() :
-       m_initDone(false),
-       m_JobIdCounter(0)
+       initDone(false),
+       jobIdCounter(0)
 {
 }
 
 /******************************************************************************/
 AsyncEngine::~AsyncEngine()
 {
+
+       // Request all threads to stop
+       for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
+                       it != workerThreads.end(); it++) {
+               (*it)->stop();
+       }
+
+
+       // Wake up all threads
+       for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
+                       it != workerThreads.end(); it++) {
+               jobQueueCounter.post();
+       }
+
+       // Wait for threads to finish
+       for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
+                       it != workerThreads.end(); it++) {
+               (*it)->wait();
+       }
+
        // Force kill all threads
-       for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin();
-                       i != m_WorkerThreads.end(); i++) {
-               (*i)->Kill();
-               delete *i;
+       for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
+                       it != workerThreads.end(); it++) {
+               delete *it;
        }
 
-       m_JobQueueMutex.Lock();
-       m_JobQueue.clear();
-       m_JobQueueMutex.Unlock();
-       m_WorkerThreads.clear();
+       jobQueueMutex.lock();
+       jobQueue.clear();
+       jobQueueMutex.unlock();
+       workerThreads.clear();
 }
 
 /******************************************************************************/
 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
 {
-       if (m_initDone) {
+       if (initDone) {
                return false;
        }
-       m_FunctionList[name] = func;
+
+       functionList[name] = func;
        return true;
 }
 
 /******************************************************************************/
-void AsyncEngine::Initialize(unsigned int numEngines)
+void AsyncEngine::initialize(unsigned int numEngines)
 {
-       m_initDone = true;
+       initDone = true;
 
        for (unsigned int i = 0; i < numEngines; i++) {
-               AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i);
-               m_WorkerThreads.push_back(toAdd);
-               toAdd->Start();
+               AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
+                       std::string("AsyncWorker-") + itos(i));
+               workerThreads.push_back(toAdd);
+               toAdd->start();
        }
 }
 
 /******************************************************************************/
-unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params)
+unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
 {
-       m_JobQueueMutex.Lock();
-       LuaJobInfo toadd;
-       toadd.JobId = m_JobIdCounter++;
-       toadd.serializedFunction = func;
-       toadd.serializedParams = params;
+       jobQueueMutex.lock();
+       LuaJobInfo toAdd;
+       toAdd.id = jobIdCounter++;
+       toAdd.serializedFunction = func;
+       toAdd.serializedParams = params;
 
-       m_JobQueue.push_back(toadd);
+       jobQueue.push_back(toAdd);
 
-       m_JobQueueCounter.Post();
+       jobQueueCounter.post();
 
-       m_JobQueueMutex.Unlock();
+       jobQueueMutex.unlock();
 
-       return toadd.JobId;
+       return toAdd.id;
 }
 
 /******************************************************************************/
 LuaJobInfo AsyncEngine::getJob()
 {
-       m_JobQueueCounter.Wait();
-       m_JobQueueMutex.Lock();
+       jobQueueCounter.wait();
+       jobQueueMutex.lock();
 
        LuaJobInfo retval;
        retval.valid = false;
 
-       if (m_JobQueue.size() != 0) {
-               retval = m_JobQueue.front();
+       if (!jobQueue.empty()) {
+               retval = jobQueue.front();
+               jobQueue.pop_front();
                retval.valid = true;
-               m_JobQueue.erase(m_JobQueue.begin());
        }
-       m_JobQueueMutex.Unlock();
+       jobQueueMutex.unlock();
 
        return retval;
 }
@@ -117,75 +139,73 @@ LuaJobInfo AsyncEngine::getJob()
 /******************************************************************************/
 void AsyncEngine::putJobResult(LuaJobInfo result)
 {
-       m_ResultQueueMutex.Lock();
-       m_ResultQueue.push_back(result);
-       m_ResultQueueMutex.Unlock();
+       resultQueueMutex.lock();
+       resultQueue.push_back(result);
+       resultQueueMutex.unlock();
 }
 
 /******************************************************************************/
-void AsyncEngine::Step(lua_State *L, int errorhandler)
+void AsyncEngine::step(lua_State *L)
 {
-       lua_getglobal(L, "engine");
-       m_ResultQueueMutex.Lock();
-       while (!m_ResultQueue.empty()) {
-               LuaJobInfo jobdone = m_ResultQueue.front();
-               m_ResultQueue.erase(m_ResultQueue.begin());
+       int error_handler = PUSH_ERROR_HANDLER(L);
+       lua_getglobal(L, "core");
+       resultQueueMutex.lock();
+       while (!resultQueue.empty()) {
+               LuaJobInfo jobDone = resultQueue.front();
+               resultQueue.pop_front();
 
                lua_getfield(L, -1, "async_event_handler");
 
                if (lua_isnil(L, -1)) {
-                       assert("Async event handler does not exist!" == 0);
+                       FATAL_ERROR("Async event handler does not exist!");
                }
 
                luaL_checktype(L, -1, LUA_TFUNCTION);
 
-               lua_pushinteger(L, jobdone.JobId);
-               lua_pushlstring(L, jobdone.serializedResult.c_str(),
-                               jobdone.serializedResult.length());
+               lua_pushinteger(L, jobDone.id);
+               lua_pushlstring(L, jobDone.serializedResult.data(),
+                               jobDone.serializedResult.size());
 
-               if (lua_pcall(L, 2, 0, errorhandler)) {
-                       script_error(L);
-               }
+               PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
        }
-       m_ResultQueueMutex.Unlock();
-       lua_pop(L, 1); // Pop engine
+       resultQueueMutex.unlock();
+       lua_pop(L, 2); // Pop core and error handler
 }
 
 /******************************************************************************/
-void AsyncEngine::PushFinishedJobs(lua_State* L) {
+void AsyncEngine::pushFinishedJobs(lua_State* L) {
        // Result Table
-       m_ResultQueueMutex.Lock();
+       MutexAutoLock l(resultQueueMutex);
 
        unsigned int index = 1;
-       lua_createtable(L, m_ResultQueue.size(), 0);
+       lua_createtable(L, resultQueue.size(), 0);
        int top = lua_gettop(L);
 
-       while (!m_ResultQueue.empty()) {
-               LuaJobInfo jobdone = m_ResultQueue.front();
-               m_ResultQueue.erase(m_ResultQueue.begin());
+       while (!resultQueue.empty()) {
+               LuaJobInfo jobDone = resultQueue.front();
+               resultQueue.pop_front();
 
-               lua_createtable(L, 0, 2);  // Pre-alocate space for two map fields
+               lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
                int top_lvl2 = lua_gettop(L);
 
                lua_pushstring(L, "jobid");
-               lua_pushnumber(L, jobdone.JobId);
+               lua_pushnumber(L, jobDone.id);
                lua_settable(L, top_lvl2);
 
                lua_pushstring(L, "retval");
-               lua_pushlstring(L, jobdone.serializedResult.data(),
-                       jobdone.serializedResult.size());
+               lua_pushlstring(L, jobDone.serializedResult.data(),
+                       jobDone.serializedResult.size());
                lua_settable(L, top_lvl2);
 
                lua_rawseti(L, top, index++);
        }
-
-       m_ResultQueueMutex.Unlock();
 }
 
 /******************************************************************************/
-void AsyncEngine::PrepareEnvironment(lua_State* L, int top) {
-       for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin();
-                       it != m_FunctionList.end(); it++) {
+void AsyncEngine::prepareEnvironment(lua_State* L, int top)
+{
+       for (UNORDERED_MAP<std::string, lua_CFunction>::iterator it = functionList.begin();
+                       it != functionList.end(); it++) {
                lua_pushstring(L, it->first.c_str());
                lua_pushcfunction(L, it->second);
                lua_settable(L, top);
@@ -194,79 +214,63 @@ void AsyncEngine::PrepareEnvironment(lua_State* L, int top) {
 
 /******************************************************************************/
 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
-               unsigned int threadNum) :
+               const std::string &name) :
+       Thread(name),
        ScriptApiBase(),
-       m_JobDispatcher(jobDispatcher),
-       m_threadnum(threadNum)
+       jobDispatcher(jobDispatcher)
 {
        lua_State *L = getStack();
 
-       luaL_openlibs(L);
-
        // Prepare job lua environment
-       lua_newtable(L);
-       lua_setglobal(L, "engine");
-       lua_getglobal(L, "engine");
+       lua_getglobal(L, "core");
        int top = lua_gettop(L);
 
-       lua_pushstring(L, DIR_DELIM);
-       lua_setglobal(L, "DIR_DELIM");
-
-       lua_pushstring(L,
-                       (porting::path_share + DIR_DELIM + "builtin").c_str());
-       lua_setglobal(L, "SCRIPTDIR");
+       // Push builtin initialization type
+       lua_pushstring(L, "async");
+       lua_setglobal(L, "INIT");
 
-       m_JobDispatcher->PrepareEnvironment(L, top);
+       jobDispatcher->prepareEnvironment(L, top);
 }
 
 /******************************************************************************/
 AsyncWorkerThread::~AsyncWorkerThread()
 {
-       assert(IsRunning() == false);
+       sanity_check(!isRunning());
 }
 
 /******************************************************************************/
-void* AsyncWorkerThread::Thread()
+void* AsyncWorkerThread::run()
 {
-       ThreadStarted();
-
-       // Register thread for error logging
-       char number[21];
-       snprintf(number, sizeof(number), "%d", m_threadnum);
-       log_register_thread(std::string("AsyncWorkerThread_") + number);
+       lua_State *L = getStack();
 
-       porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
+       std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
+       try {
+               loadScript(script);
+       } catch (const ModError &e) {
+               errorstream << "Execution of async base environment failed: "
+                       << e.what() << std::endl;
+               FATAL_ERROR("Execution of async base environment failed");
+       }
 
-       std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
-                       + DIR_DELIM + "async_env.lua";
+       int error_handler = PUSH_ERROR_HANDLER(L);
 
-       if (!loadScript(asyncscript)) {
-               errorstream
-                       << "AsyncWorkderThread execution of async base environment failed!"
-                       << std::endl;
-               abort();
+       lua_getglobal(L, "core");
+       if (lua_isnil(L, -1)) {
+               FATAL_ERROR("Unable to find core within async environment!");
        }
 
-       lua_State *L = getStack();
        // Main loop
-       while (!StopRequested()) {
+       while (!stopRequested()) {
                // Wait for job
-               LuaJobInfo toProcess = m_JobDispatcher->getJob();
+               LuaJobInfo toProcess = jobDispatcher->getJob();
 
-               if (toProcess.valid == false || StopRequested()) {
+               if (toProcess.valid == false || stopRequested()) {
                        continue;
                }
 
-               lua_getglobal(L, "engine");
-               if (lua_isnil(L, -1)) {
-                       errorstream << "Unable to find engine within async environment!";
-                       abort();
-               }
-
                lua_getfield(L, -1, "job_processor");
                if (lua_isnil(L, -1)) {
-                       errorstream << "Unable to get async job processor!" << std::endl;
-                       abort();
+                       FATAL_ERROR("Unable to get async job processor!");
                }
 
                luaL_checktype(L, -1, LUA_TFUNCTION);
@@ -279,8 +283,9 @@ void* AsyncWorkerThread::Thread()
                                toProcess.serializedParams.data(),
                                toProcess.serializedParams.size());
 
-               if (lua_pcall(L, 2, 1, m_errorhandler)) {
-                       scriptError();
+               int result = lua_pcall(L, 2, 1, error_handler);
+               if (result) {
+                       PCALL_RES(result);
                        toProcess.serializedResult = "";
                } else {
                        // Fetch result
@@ -289,13 +294,14 @@ void* AsyncWorkerThread::Thread()
                        toProcess.serializedResult = std::string(retval, length);
                }
 
-               // Pop engine, job_processor, and retval
-               lua_pop(L, 3);
+               lua_pop(L, 1);  // Pop retval
 
                // Put job result
-               m_JobDispatcher->putJobResult(toProcess);
+               jobDispatcher->putJobResult(toProcess);
        }
-       log_deregister_thread();
+
+       lua_pop(L, 2);  // Pop core and error handler
+
        return 0;
 }