3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
34 #include "common/c_internal.h"
36 /******************************************************************************/
37 AsyncEngine::~AsyncEngine()
40 // Request all threads to stop
41 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
42 it != workerThreads.end(); ++it) {
47 // Wake up all threads
48 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 it != workerThreads.end(); ++it) {
50 jobQueueCounter.post();
53 // Wait for threads to finish
54 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
55 it != workerThreads.end(); ++it) {
59 // Force kill all threads
60 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
61 it != workerThreads.end(); ++it) {
67 jobQueueMutex.unlock();
68 workerThreads.clear();
71 /******************************************************************************/
72 void AsyncEngine::registerStateInitializer(StateInitializer func)
74 stateInitializers.push_back(func);
77 /******************************************************************************/
78 void AsyncEngine::initialize(unsigned int numEngines)
82 for (unsigned int i = 0; i < numEngines; i++) {
83 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
84 std::string("AsyncWorker-") + itos(i));
85 workerThreads.push_back(toAdd);
90 /******************************************************************************/
91 unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
92 const std::string ¶ms)
96 toAdd.id = jobIdCounter++;
97 toAdd.serializedFunction = func;
98 toAdd.serializedParams = params;
100 jobQueue.push_back(toAdd);
102 jobQueueCounter.post();
104 jobQueueMutex.unlock();
109 /******************************************************************************/
110 LuaJobInfo AsyncEngine::getJob()
112 jobQueueCounter.wait();
113 jobQueueMutex.lock();
117 if (!jobQueue.empty()) {
118 retval = jobQueue.front();
119 jobQueue.pop_front();
122 jobQueueMutex.unlock();
127 /******************************************************************************/
128 void AsyncEngine::putJobResult(const LuaJobInfo &result)
130 resultQueueMutex.lock();
131 resultQueue.push_back(result);
132 resultQueueMutex.unlock();
135 /******************************************************************************/
136 void AsyncEngine::step(lua_State *L)
138 int error_handler = PUSH_ERROR_HANDLER(L);
139 lua_getglobal(L, "core");
140 resultQueueMutex.lock();
141 while (!resultQueue.empty()) {
142 LuaJobInfo jobDone = resultQueue.front();
143 resultQueue.pop_front();
145 lua_getfield(L, -1, "async_event_handler");
147 if (lua_isnil(L, -1)) {
148 FATAL_ERROR("Async event handler does not exist!");
151 luaL_checktype(L, -1, LUA_TFUNCTION);
153 lua_pushinteger(L, jobDone.id);
154 lua_pushlstring(L, jobDone.serializedResult.data(),
155 jobDone.serializedResult.size());
157 PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
159 resultQueueMutex.unlock();
160 lua_pop(L, 2); // Pop core and error handler
163 /******************************************************************************/
164 void AsyncEngine::pushFinishedJobs(lua_State* L) {
166 MutexAutoLock l(resultQueueMutex);
168 unsigned int index = 1;
169 lua_createtable(L, resultQueue.size(), 0);
170 int top = lua_gettop(L);
172 while (!resultQueue.empty()) {
173 LuaJobInfo jobDone = resultQueue.front();
174 resultQueue.pop_front();
176 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
177 int top_lvl2 = lua_gettop(L);
179 lua_pushstring(L, "jobid");
180 lua_pushnumber(L, jobDone.id);
181 lua_settable(L, top_lvl2);
183 lua_pushstring(L, "retval");
184 lua_pushlstring(L, jobDone.serializedResult.data(),
185 jobDone.serializedResult.size());
186 lua_settable(L, top_lvl2);
188 lua_rawseti(L, top, index++);
192 /******************************************************************************/
193 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
195 for (std::vector<StateInitializer>::iterator it = stateInitializers.begin();
196 it != stateInitializers.end(); it++) {
201 /******************************************************************************/
202 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
203 const std::string &name) :
206 jobDispatcher(jobDispatcher)
208 lua_State *L = getStack();
210 // Prepare job lua environment
211 lua_getglobal(L, "core");
212 int top = lua_gettop(L);
214 // Push builtin initialization type
215 lua_pushstring(L, "async");
216 lua_setglobal(L, "INIT");
218 jobDispatcher->prepareEnvironment(L, top);
221 /******************************************************************************/
222 AsyncWorkerThread::~AsyncWorkerThread()
224 sanity_check(!isRunning());
227 /******************************************************************************/
228 void* AsyncWorkerThread::run()
230 lua_State *L = getStack();
232 std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
235 } catch (const ModError &e) {
236 errorstream << "Execution of async base environment failed: "
237 << e.what() << std::endl;
238 FATAL_ERROR("Execution of async base environment failed");
241 int error_handler = PUSH_ERROR_HANDLER(L);
243 lua_getglobal(L, "core");
244 if (lua_isnil(L, -1)) {
245 FATAL_ERROR("Unable to find core within async environment!");
249 while (!stopRequested()) {
251 LuaJobInfo toProcess = jobDispatcher->getJob();
253 if (!toProcess.valid || stopRequested()) {
257 lua_getfield(L, -1, "job_processor");
258 if (lua_isnil(L, -1)) {
259 FATAL_ERROR("Unable to get async job processor!");
262 luaL_checktype(L, -1, LUA_TFUNCTION);
266 toProcess.serializedFunction.data(),
267 toProcess.serializedFunction.size());
269 toProcess.serializedParams.data(),
270 toProcess.serializedParams.size());
272 int result = lua_pcall(L, 2, 1, error_handler);
275 toProcess.serializedResult = "";
279 const char *retval = lua_tolstring(L, -1, &length);
280 toProcess.serializedResult = std::string(retval, length);
283 lua_pop(L, 1); // Pop retval
286 jobDispatcher->putJobResult(toProcess);
289 lua_pop(L, 2); // Pop core and error handler