]> git.lizzy.rs Git - dragonfireclient.git/blob - src/script/cpp_api/s_async.cpp
Remove dependency on marshal and many other async changes
[dragonfireclient.git] / src / script / cpp_api / s_async.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28
29 #include "s_async.h"
30 #include "log.h"
31 #include "filesys.h"
32 #include "porting.h"
33 #include "common/c_internal.h"
34
35 /******************************************************************************/
36 AsyncEngine::AsyncEngine() :
37         m_initDone(false),
38         m_JobIdCounter(0)
39 {
40 }
41
42 /******************************************************************************/
43 AsyncEngine::~AsyncEngine()
44 {
45         // Force kill all threads
46         for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin();
47                         i != m_WorkerThreads.end(); i++) {
48                 (*i)->Kill();
49                 delete *i;
50         }
51
52         m_JobQueueMutex.Lock();
53         m_JobQueue.clear();
54         m_JobQueueMutex.Unlock();
55         m_WorkerThreads.clear();
56 }
57
58 /******************************************************************************/
59 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
60 {
61         if (m_initDone) {
62                 return false;
63         }
64         m_FunctionList[name] = func;
65         return true;
66 }
67
68 /******************************************************************************/
69 void AsyncEngine::Initialize(unsigned int numEngines)
70 {
71         m_initDone = true;
72
73         for (unsigned int i = 0; i < numEngines; i++) {
74                 AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i);
75                 m_WorkerThreads.push_back(toAdd);
76                 toAdd->Start();
77         }
78 }
79
80 /******************************************************************************/
81 unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params)
82 {
83         m_JobQueueMutex.Lock();
84         LuaJobInfo toadd;
85         toadd.JobId = m_JobIdCounter++;
86         toadd.serializedFunction = func;
87         toadd.serializedParams = params;
88
89         m_JobQueue.push_back(toadd);
90
91         m_JobQueueCounter.Post();
92
93         m_JobQueueMutex.Unlock();
94
95         return toadd.JobId;
96 }
97
98 /******************************************************************************/
99 LuaJobInfo AsyncEngine::getJob()
100 {
101         m_JobQueueCounter.Wait();
102         m_JobQueueMutex.Lock();
103
104         LuaJobInfo retval;
105         retval.valid = false;
106
107         if (m_JobQueue.size() != 0) {
108                 retval = m_JobQueue.front();
109                 retval.valid = true;
110                 m_JobQueue.erase(m_JobQueue.begin());
111         }
112         m_JobQueueMutex.Unlock();
113
114         return retval;
115 }
116
117 /******************************************************************************/
118 void AsyncEngine::putJobResult(LuaJobInfo result)
119 {
120         m_ResultQueueMutex.Lock();
121         m_ResultQueue.push_back(result);
122         m_ResultQueueMutex.Unlock();
123 }
124
125 /******************************************************************************/
126 void AsyncEngine::Step(lua_State *L, int errorhandler)
127 {
128         lua_getglobal(L, "engine");
129         m_ResultQueueMutex.Lock();
130         while (!m_ResultQueue.empty()) {
131                 LuaJobInfo jobdone = m_ResultQueue.front();
132                 m_ResultQueue.erase(m_ResultQueue.begin());
133
134                 lua_getfield(L, -1, "async_event_handler");
135
136                 if (lua_isnil(L, -1)) {
137                         assert("Async event handler does not exist!" == 0);
138                 }
139
140                 luaL_checktype(L, -1, LUA_TFUNCTION);
141
142                 lua_pushinteger(L, jobdone.JobId);
143                 lua_pushlstring(L, jobdone.serializedResult.c_str(),
144                                 jobdone.serializedResult.length());
145
146                 if (lua_pcall(L, 2, 0, errorhandler)) {
147                         script_error(L);
148                 }
149         }
150         m_ResultQueueMutex.Unlock();
151         lua_pop(L, 1); // Pop engine
152 }
153
154 /******************************************************************************/
155 void AsyncEngine::PushFinishedJobs(lua_State* L) {
156         // Result Table
157         m_ResultQueueMutex.Lock();
158
159         unsigned int index = 1;
160         lua_createtable(L, m_ResultQueue.size(), 0);
161         int top = lua_gettop(L);
162
163         while (!m_ResultQueue.empty()) {
164                 LuaJobInfo jobdone = m_ResultQueue.front();
165                 m_ResultQueue.erase(m_ResultQueue.begin());
166
167                 lua_createtable(L, 0, 2);  // Pre-alocate space for two map fields
168                 int top_lvl2 = lua_gettop(L);
169
170                 lua_pushstring(L, "jobid");
171                 lua_pushnumber(L, jobdone.JobId);
172                 lua_settable(L, top_lvl2);
173
174                 lua_pushstring(L, "retval");
175                 lua_pushlstring(L, jobdone.serializedResult.data(),
176                         jobdone.serializedResult.size());
177                 lua_settable(L, top_lvl2);
178
179                 lua_rawseti(L, top, index++);
180         }
181
182         m_ResultQueueMutex.Unlock();
183 }
184
185 /******************************************************************************/
186 void AsyncEngine::PrepareEnvironment(lua_State* L, int top) {
187         for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin();
188                         it != m_FunctionList.end(); it++) {
189                 lua_pushstring(L, it->first.c_str());
190                 lua_pushcfunction(L, it->second);
191                 lua_settable(L, top);
192         }
193 }
194
195 /******************************************************************************/
196 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
197                 unsigned int threadNum) :
198         ScriptApiBase(),
199         m_JobDispatcher(jobDispatcher),
200         m_threadnum(threadNum)
201 {
202         lua_State *L = getStack();
203
204         luaL_openlibs(L);
205
206         // Prepare job lua environment
207         lua_newtable(L);
208         lua_setglobal(L, "engine");
209         lua_getglobal(L, "engine");
210         int top = lua_gettop(L);
211
212         lua_pushstring(L, DIR_DELIM);
213         lua_setglobal(L, "DIR_DELIM");
214
215         lua_pushstring(L,
216                         (porting::path_share + DIR_DELIM + "builtin").c_str());
217         lua_setglobal(L, "SCRIPTDIR");
218
219         m_JobDispatcher->PrepareEnvironment(L, top);
220 }
221
222 /******************************************************************************/
223 AsyncWorkerThread::~AsyncWorkerThread()
224 {
225         assert(IsRunning() == false);
226 }
227
228 /******************************************************************************/
229 void* AsyncWorkerThread::Thread()
230 {
231         ThreadStarted();
232
233         // Register thread for error logging
234         char number[21];
235         snprintf(number, sizeof(number), "%d", m_threadnum);
236         log_register_thread(std::string("AsyncWorkerThread_") + number);
237
238         porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
239
240         std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
241                         + DIR_DELIM + "async_env.lua";
242
243         if (!loadScript(asyncscript)) {
244                 errorstream
245                         << "AsyncWorkderThread execution of async base environment failed!"
246                         << std::endl;
247                 abort();
248         }
249
250         lua_State *L = getStack();
251         // Main loop
252         while (!StopRequested()) {
253                 // Wait for job
254                 LuaJobInfo toProcess = m_JobDispatcher->getJob();
255
256                 if (toProcess.valid == false || StopRequested()) {
257                         continue;
258                 }
259
260                 lua_getglobal(L, "engine");
261                 if (lua_isnil(L, -1)) {
262                         errorstream << "Unable to find engine within async environment!";
263                         abort();
264                 }
265
266                 lua_getfield(L, -1, "job_processor");
267                 if (lua_isnil(L, -1)) {
268                         errorstream << "Unable to get async job processor!" << std::endl;
269                         abort();
270                 }
271
272                 luaL_checktype(L, -1, LUA_TFUNCTION);
273
274                 // Call it
275                 lua_pushlstring(L,
276                                 toProcess.serializedFunction.data(),
277                                 toProcess.serializedFunction.size());
278                 lua_pushlstring(L,
279                                 toProcess.serializedParams.data(),
280                                 toProcess.serializedParams.size());
281
282                 if (lua_pcall(L, 2, 1, m_errorhandler)) {
283                         scriptError();
284                         toProcess.serializedResult = "";
285                 } else {
286                         // Fetch result
287                         size_t length;
288                         const char *retval = lua_tolstring(L, -1, &length);
289                         toProcess.serializedResult = std::string(retval, length);
290                 }
291
292                 // Pop engine, job_processor, and retval
293                 lua_pop(L, 3);
294
295                 // Put job result
296                 m_JobDispatcher->putJobResult(toProcess);
297         }
298         log_deregister_thread();
299         return 0;
300 }
301