]> git.lizzy.rs Git - dragonfireclient.git/blob - src/script/cpp_api/s_async.cpp
Fix code style of async API
[dragonfireclient.git] / src / script / cpp_api / s_async.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28
29 #include "s_async.h"
30 #include "log.h"
31 #include "filesys.h"
32 #include "porting.h"
33 #include "common/c_internal.h"
34
35 /******************************************************************************/
36 AsyncEngine::AsyncEngine() :
37         initDone(false),
38         jobIdCounter(0)
39 {
40 }
41
42 /******************************************************************************/
43 AsyncEngine::~AsyncEngine()
44 {
45
46         // Request all threads to stop
47         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
48                         it != workerThreads.end(); it++) {
49                 (*it)->Stop();
50         }
51
52
53         // Wake up all threads
54         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
55                         it != workerThreads.end(); it++) {
56                 jobQueueCounter.Post();
57         }
58
59         // Wait for threads to finish
60         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
61                         it != workerThreads.end(); it++) {
62                 (*it)->Wait();
63         }
64
65         // Force kill all threads
66         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
67                         it != workerThreads.end(); it++) {
68                 (*it)->Kill();
69                 delete *it;
70         }
71
72         jobQueueMutex.Lock();
73         jobQueue.clear();
74         jobQueueMutex.Unlock();
75         workerThreads.clear();
76 }
77
78 /******************************************************************************/
79 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
80 {
81         if (initDone) {
82                 return false;
83         }
84         functionList[name] = func;
85         return true;
86 }
87
88 /******************************************************************************/
89 void AsyncEngine::initialize(unsigned int numEngines)
90 {
91         initDone = true;
92
93         for (unsigned int i = 0; i < numEngines; i++) {
94                 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
95                 workerThreads.push_back(toAdd);
96                 toAdd->Start();
97         }
98 }
99
100 /******************************************************************************/
101 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
102 {
103         jobQueueMutex.Lock();
104         LuaJobInfo toAdd;
105         toAdd.id = jobIdCounter++;
106         toAdd.serializedFunction = func;
107         toAdd.serializedParams = params;
108
109         jobQueue.push_back(toAdd);
110
111         jobQueueCounter.Post();
112
113         jobQueueMutex.Unlock();
114
115         return toAdd.id;
116 }
117
118 /******************************************************************************/
119 LuaJobInfo AsyncEngine::getJob()
120 {
121         jobQueueCounter.Wait();
122         jobQueueMutex.Lock();
123
124         LuaJobInfo retval;
125         retval.valid = false;
126
127         if (!jobQueue.empty()) {
128                 retval = jobQueue.front();
129                 jobQueue.pop_front();
130                 retval.valid = true;
131         }
132         jobQueueMutex.Unlock();
133
134         return retval;
135 }
136
137 /******************************************************************************/
138 void AsyncEngine::putJobResult(LuaJobInfo result)
139 {
140         resultQueueMutex.Lock();
141         resultQueue.push_back(result);
142         resultQueueMutex.Unlock();
143 }
144
145 /******************************************************************************/
146 void AsyncEngine::step(lua_State *L, int errorhandler)
147 {
148         lua_getglobal(L, "engine");
149         resultQueueMutex.Lock();
150         while (!resultQueue.empty()) {
151                 LuaJobInfo jobDone = resultQueue.front();
152                 resultQueue.pop_front();
153
154                 lua_getfield(L, -1, "async_event_handler");
155
156                 if (lua_isnil(L, -1)) {
157                         assert("Async event handler does not exist!" == 0);
158                 }
159
160                 luaL_checktype(L, -1, LUA_TFUNCTION);
161
162                 lua_pushinteger(L, jobDone.id);
163                 lua_pushlstring(L, jobDone.serializedResult.data(),
164                                 jobDone.serializedResult.size());
165
166                 if (lua_pcall(L, 2, 0, errorhandler)) {
167                         script_error(L);
168                 }
169         }
170         resultQueueMutex.Unlock();
171         lua_pop(L, 1); // Pop engine
172 }
173
174 /******************************************************************************/
175 void AsyncEngine::pushFinishedJobs(lua_State* L) {
176         // Result Table
177         resultQueueMutex.Lock();
178
179         unsigned int index = 1;
180         lua_createtable(L, resultQueue.size(), 0);
181         int top = lua_gettop(L);
182
183         while (!resultQueue.empty()) {
184                 LuaJobInfo jobDone = resultQueue.front();
185                 resultQueue.pop_front();
186
187                 lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
188                 int top_lvl2 = lua_gettop(L);
189
190                 lua_pushstring(L, "jobid");
191                 lua_pushnumber(L, jobDone.id);
192                 lua_settable(L, top_lvl2);
193
194                 lua_pushstring(L, "retval");
195                 lua_pushlstring(L, jobDone.serializedResult.data(),
196                         jobDone.serializedResult.size());
197                 lua_settable(L, top_lvl2);
198
199                 lua_rawseti(L, top, index++);
200         }
201
202         resultQueueMutex.Unlock();
203 }
204
205 /******************************************************************************/
206 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
207 {
208         for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
209                         it != functionList.end(); it++) {
210                 lua_pushstring(L, it->first.c_str());
211                 lua_pushcfunction(L, it->second);
212                 lua_settable(L, top);
213         }
214 }
215
216 /******************************************************************************/
217 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
218                 unsigned int threadNum) :
219         ScriptApiBase(),
220         jobDispatcher(jobDispatcher),
221         threadnum(threadNum)
222 {
223         lua_State *L = getStack();
224
225         luaL_openlibs(L);
226
227         // Prepare job lua environment
228         lua_newtable(L);
229         lua_setglobal(L, "engine");
230         lua_getglobal(L, "engine");
231         int top = lua_gettop(L);
232
233         lua_pushstring(L, DIR_DELIM);
234         lua_setglobal(L, "DIR_DELIM");
235
236         lua_pushstring(L,
237                         (porting::path_share + DIR_DELIM + "builtin").c_str());
238         lua_setglobal(L, "SCRIPTDIR");
239
240         jobDispatcher->prepareEnvironment(L, top);
241 }
242
243 /******************************************************************************/
244 AsyncWorkerThread::~AsyncWorkerThread()
245 {
246         assert(IsRunning() == false);
247 }
248
249 /******************************************************************************/
250 void* AsyncWorkerThread::Thread()
251 {
252         ThreadStarted();
253
254         // Register thread for error logging
255         char number[21];
256         snprintf(number, sizeof(number), "%d", threadnum);
257         log_register_thread(std::string("AsyncWorkerThread_") + number);
258
259         porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
260
261         std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
262                         + DIR_DELIM + "async_env.lua";
263
264         if (!loadScript(asyncscript)) {
265                 errorstream
266                         << "AsyncWorkderThread execution of async base environment failed!"
267                         << std::endl;
268                 abort();
269         }
270
271         lua_State *L = getStack();
272         // Main loop
273         while (!StopRequested()) {
274                 // Wait for job
275                 LuaJobInfo toProcess = jobDispatcher->getJob();
276
277                 if (toProcess.valid == false || StopRequested()) {
278                         continue;
279                 }
280
281                 lua_getglobal(L, "engine");
282                 if (lua_isnil(L, -1)) {
283                         errorstream << "Unable to find engine within async environment!";
284                         abort();
285                 }
286
287                 lua_getfield(L, -1, "job_processor");
288                 if (lua_isnil(L, -1)) {
289                         errorstream << "Unable to get async job processor!" << std::endl;
290                         abort();
291                 }
292
293                 luaL_checktype(L, -1, LUA_TFUNCTION);
294
295                 // Call it
296                 lua_pushlstring(L,
297                                 toProcess.serializedFunction.data(),
298                                 toProcess.serializedFunction.size());
299                 lua_pushlstring(L,
300                                 toProcess.serializedParams.data(),
301                                 toProcess.serializedParams.size());
302
303                 if (lua_pcall(L, 2, 1, m_errorhandler)) {
304                         scriptError();
305                         toProcess.serializedResult = "";
306                 } else {
307                         // Fetch result
308                         size_t length;
309                         const char *retval = lua_tolstring(L, -1, &length);
310                         toProcess.serializedResult = std::string(retval, length);
311                 }
312
313                 // Pop engine, job_processor, and retval
314                 lua_pop(L, 3);
315
316                 // Put job result
317                 jobDispatcher->putJobResult(toProcess);
318         }
319         log_deregister_thread();
320         return 0;
321 }
322