]> git.lizzy.rs Git - minetest.git/blob - src/script/cpp_api/s_async.cpp
Add meshnode drawtype.
[minetest.git] / src / script / cpp_api / s_async.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdio.h>
21 #include <stdlib.h>
22
23 extern "C" {
24 #include "lua.h"
25 #include "lauxlib.h"
26 #include "lualib.h"
27 }
28
29 #include "server.h"
30 #include "s_async.h"
31 #include "log.h"
32 #include "filesys.h"
33 #include "porting.h"
34 #include "common/c_internal.h"
35
36 /******************************************************************************/
37 AsyncEngine::AsyncEngine() :
38         initDone(false),
39         jobIdCounter(0)
40 {
41 }
42
43 /******************************************************************************/
44 AsyncEngine::~AsyncEngine()
45 {
46
47         // Request all threads to stop
48         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49                         it != workerThreads.end(); it++) {
50                 (*it)->Stop();
51         }
52
53
54         // Wake up all threads
55         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56                         it != workerThreads.end(); it++) {
57                 jobQueueCounter.Post();
58         }
59
60         // Wait for threads to finish
61         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62                         it != workerThreads.end(); it++) {
63                 (*it)->Wait();
64         }
65
66         // Force kill all threads
67         for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68                         it != workerThreads.end(); it++) {
69                 (*it)->Kill();
70                 delete *it;
71         }
72
73         jobQueueMutex.Lock();
74         jobQueue.clear();
75         jobQueueMutex.Unlock();
76         workerThreads.clear();
77 }
78
79 /******************************************************************************/
80 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
81 {
82         if (initDone) {
83                 return false;
84         }
85         functionList[name] = func;
86         return true;
87 }
88
89 /******************************************************************************/
90 void AsyncEngine::initialize(unsigned int numEngines)
91 {
92         initDone = true;
93
94         for (unsigned int i = 0; i < numEngines; i++) {
95                 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
96                 workerThreads.push_back(toAdd);
97                 toAdd->Start();
98         }
99 }
100
101 /******************************************************************************/
102 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
103 {
104         jobQueueMutex.Lock();
105         LuaJobInfo toAdd;
106         toAdd.id = jobIdCounter++;
107         toAdd.serializedFunction = func;
108         toAdd.serializedParams = params;
109
110         jobQueue.push_back(toAdd);
111
112         jobQueueCounter.Post();
113
114         jobQueueMutex.Unlock();
115
116         return toAdd.id;
117 }
118
119 /******************************************************************************/
120 LuaJobInfo AsyncEngine::getJob()
121 {
122         jobQueueCounter.Wait();
123         jobQueueMutex.Lock();
124
125         LuaJobInfo retval;
126         retval.valid = false;
127
128         if (!jobQueue.empty()) {
129                 retval = jobQueue.front();
130                 jobQueue.pop_front();
131                 retval.valid = true;
132         }
133         jobQueueMutex.Unlock();
134
135         return retval;
136 }
137
138 /******************************************************************************/
139 void AsyncEngine::putJobResult(LuaJobInfo result)
140 {
141         resultQueueMutex.Lock();
142         resultQueue.push_back(result);
143         resultQueueMutex.Unlock();
144 }
145
146 /******************************************************************************/
147 void AsyncEngine::step(lua_State *L, int errorhandler)
148 {
149         lua_getglobal(L, "core");
150         resultQueueMutex.Lock();
151         while (!resultQueue.empty()) {
152                 LuaJobInfo jobDone = resultQueue.front();
153                 resultQueue.pop_front();
154
155                 lua_getfield(L, -1, "async_event_handler");
156
157                 if (lua_isnil(L, -1)) {
158                         assert("Async event handler does not exist!" == 0);
159                 }
160
161                 luaL_checktype(L, -1, LUA_TFUNCTION);
162
163                 lua_pushinteger(L, jobDone.id);
164                 lua_pushlstring(L, jobDone.serializedResult.data(),
165                                 jobDone.serializedResult.size());
166
167                 if (lua_pcall(L, 2, 0, errorhandler)) {
168                         script_error(L);
169                 }
170         }
171         resultQueueMutex.Unlock();
172         lua_pop(L, 1); // Pop core
173 }
174
175 /******************************************************************************/
176 void AsyncEngine::pushFinishedJobs(lua_State* L) {
177         // Result Table
178         resultQueueMutex.Lock();
179
180         unsigned int index = 1;
181         lua_createtable(L, resultQueue.size(), 0);
182         int top = lua_gettop(L);
183
184         while (!resultQueue.empty()) {
185                 LuaJobInfo jobDone = resultQueue.front();
186                 resultQueue.pop_front();
187
188                 lua_createtable(L, 0, 2);  // Pre-allocate space for two map fields
189                 int top_lvl2 = lua_gettop(L);
190
191                 lua_pushstring(L, "jobid");
192                 lua_pushnumber(L, jobDone.id);
193                 lua_settable(L, top_lvl2);
194
195                 lua_pushstring(L, "retval");
196                 lua_pushlstring(L, jobDone.serializedResult.data(),
197                         jobDone.serializedResult.size());
198                 lua_settable(L, top_lvl2);
199
200                 lua_rawseti(L, top, index++);
201         }
202
203         resultQueueMutex.Unlock();
204 }
205
206 /******************************************************************************/
207 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
208 {
209         for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
210                         it != functionList.end(); it++) {
211                 lua_pushstring(L, it->first.c_str());
212                 lua_pushcfunction(L, it->second);
213                 lua_settable(L, top);
214         }
215 }
216
217 /******************************************************************************/
218 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
219                 unsigned int threadNum) :
220         ScriptApiBase(),
221         jobDispatcher(jobDispatcher),
222         threadnum(threadNum)
223 {
224         lua_State *L = getStack();
225
226         // Prepare job lua environment
227         lua_getglobal(L, "core");
228         int top = lua_gettop(L);
229
230         // Push builtin initialization type
231         lua_pushstring(L, "async");
232         lua_setglobal(L, "INIT");
233
234         jobDispatcher->prepareEnvironment(L, top);
235 }
236
237 /******************************************************************************/
238 AsyncWorkerThread::~AsyncWorkerThread()
239 {
240         assert(IsRunning() == false);
241 }
242
243 /******************************************************************************/
244 void* AsyncWorkerThread::Thread()
245 {
246         ThreadStarted();
247
248         // Register thread for error logging
249         char number[21];
250         snprintf(number, sizeof(number), "%d", threadnum);
251         log_register_thread(std::string("AsyncWorkerThread_") + number);
252
253         porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
254
255         lua_State *L = getStack();
256
257         std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
258         if (!loadScript(script)) {
259                 errorstream
260                         << "AsyncWorkderThread execution of async base environment failed!"
261                         << std::endl;
262                 abort();
263         }
264
265         lua_getglobal(L, "core");
266         if (lua_isnil(L, -1)) {
267                 errorstream << "Unable to find core within async environment!";
268                 abort();
269         }
270
271         // Main loop
272         while (!StopRequested()) {
273                 // Wait for job
274                 LuaJobInfo toProcess = jobDispatcher->getJob();
275
276                 if (toProcess.valid == false || StopRequested()) {
277                         continue;
278                 }
279
280                 lua_getfield(L, -1, "job_processor");
281                 if (lua_isnil(L, -1)) {
282                         errorstream << "Unable to get async job processor!" << std::endl;
283                         abort();
284                 }
285
286                 luaL_checktype(L, -1, LUA_TFUNCTION);
287
288                 // Call it
289                 lua_pushlstring(L,
290                                 toProcess.serializedFunction.data(),
291                                 toProcess.serializedFunction.size());
292                 lua_pushlstring(L,
293                                 toProcess.serializedParams.data(),
294                                 toProcess.serializedParams.size());
295
296                 if (lua_pcall(L, 2, 1, m_errorhandler)) {
297                         scriptError();
298                         toProcess.serializedResult = "";
299                 } else {
300                         // Fetch result
301                         size_t length;
302                         const char *retval = lua_tolstring(L, -1, &length);
303                         toProcess.serializedResult = std::string(retval, length);
304                 }
305
306                 lua_pop(L, 1);  // Pop retval
307
308                 // Put job result
309                 jobDispatcher->putJobResult(toProcess);
310         }
311
312         lua_pop(L, 1);  // Pop core
313
314         log_deregister_thread();
315
316         return 0;
317 }
318