3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
34 #include "common/c_internal.h"
36 /******************************************************************************/
37 AsyncEngine::AsyncEngine() :
43 /******************************************************************************/
44 AsyncEngine::~AsyncEngine()
47 // Request all threads to stop
48 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 it != workerThreads.end(); ++it) {
54 // Wake up all threads
55 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56 it != workerThreads.end(); ++it) {
57 jobQueueCounter.post();
60 // Wait for threads to finish
61 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62 it != workerThreads.end(); ++it) {
66 // Force kill all threads
67 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68 it != workerThreads.end(); ++it) {
74 jobQueueMutex.unlock();
75 workerThreads.clear();
78 /******************************************************************************/
79 void AsyncEngine::registerStateInitializer(StateInitializer func)
81 stateInitializers.push_back(func);
84 /******************************************************************************/
85 void AsyncEngine::initialize(unsigned int numEngines)
89 for (unsigned int i = 0; i < numEngines; i++) {
90 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
91 std::string("AsyncWorker-") + itos(i));
92 workerThreads.push_back(toAdd);
97 /******************************************************************************/
98 unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
99 const std::string ¶ms)
101 jobQueueMutex.lock();
103 toAdd.id = jobIdCounter++;
104 toAdd.serializedFunction = func;
105 toAdd.serializedParams = params;
107 jobQueue.push_back(toAdd);
109 jobQueueCounter.post();
111 jobQueueMutex.unlock();
116 /******************************************************************************/
117 LuaJobInfo AsyncEngine::getJob()
119 jobQueueCounter.wait();
120 jobQueueMutex.lock();
124 if (!jobQueue.empty()) {
125 retval = jobQueue.front();
126 jobQueue.pop_front();
129 jobQueueMutex.unlock();
134 /******************************************************************************/
135 void AsyncEngine::putJobResult(const LuaJobInfo &result)
137 resultQueueMutex.lock();
138 resultQueue.push_back(result);
139 resultQueueMutex.unlock();
142 /******************************************************************************/
143 void AsyncEngine::step(lua_State *L)
145 int error_handler = PUSH_ERROR_HANDLER(L);
146 lua_getglobal(L, "core");
147 resultQueueMutex.lock();
148 while (!resultQueue.empty()) {
149 LuaJobInfo jobDone = resultQueue.front();
150 resultQueue.pop_front();
152 lua_getfield(L, -1, "async_event_handler");
154 if (lua_isnil(L, -1)) {
155 FATAL_ERROR("Async event handler does not exist!");
158 luaL_checktype(L, -1, LUA_TFUNCTION);
160 lua_pushinteger(L, jobDone.id);
161 lua_pushlstring(L, jobDone.serializedResult.data(),
162 jobDone.serializedResult.size());
164 PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
166 resultQueueMutex.unlock();
167 lua_pop(L, 2); // Pop core and error handler
170 /******************************************************************************/
171 void AsyncEngine::pushFinishedJobs(lua_State* L) {
173 MutexAutoLock l(resultQueueMutex);
175 unsigned int index = 1;
176 lua_createtable(L, resultQueue.size(), 0);
177 int top = lua_gettop(L);
179 while (!resultQueue.empty()) {
180 LuaJobInfo jobDone = resultQueue.front();
181 resultQueue.pop_front();
183 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
184 int top_lvl2 = lua_gettop(L);
186 lua_pushstring(L, "jobid");
187 lua_pushnumber(L, jobDone.id);
188 lua_settable(L, top_lvl2);
190 lua_pushstring(L, "retval");
191 lua_pushlstring(L, jobDone.serializedResult.data(),
192 jobDone.serializedResult.size());
193 lua_settable(L, top_lvl2);
195 lua_rawseti(L, top, index++);
199 /******************************************************************************/
200 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
202 for (std::vector<StateInitializer>::iterator it = stateInitializers.begin();
203 it != stateInitializers.end(); it++) {
208 /******************************************************************************/
209 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
210 const std::string &name) :
213 jobDispatcher(jobDispatcher)
215 lua_State *L = getStack();
217 // Prepare job lua environment
218 lua_getglobal(L, "core");
219 int top = lua_gettop(L);
221 // Push builtin initialization type
222 lua_pushstring(L, "async");
223 lua_setglobal(L, "INIT");
225 jobDispatcher->prepareEnvironment(L, top);
228 /******************************************************************************/
229 AsyncWorkerThread::~AsyncWorkerThread()
231 sanity_check(!isRunning());
234 /******************************************************************************/
235 void* AsyncWorkerThread::run()
237 lua_State *L = getStack();
239 std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
242 } catch (const ModError &e) {
243 errorstream << "Execution of async base environment failed: "
244 << e.what() << std::endl;
245 FATAL_ERROR("Execution of async base environment failed");
248 int error_handler = PUSH_ERROR_HANDLER(L);
250 lua_getglobal(L, "core");
251 if (lua_isnil(L, -1)) {
252 FATAL_ERROR("Unable to find core within async environment!");
256 while (!stopRequested()) {
258 LuaJobInfo toProcess = jobDispatcher->getJob();
260 if (!toProcess.valid || stopRequested()) {
264 lua_getfield(L, -1, "job_processor");
265 if (lua_isnil(L, -1)) {
266 FATAL_ERROR("Unable to get async job processor!");
269 luaL_checktype(L, -1, LUA_TFUNCTION);
273 toProcess.serializedFunction.data(),
274 toProcess.serializedFunction.size());
276 toProcess.serializedParams.data(),
277 toProcess.serializedParams.size());
279 int result = lua_pcall(L, 2, 1, error_handler);
282 toProcess.serializedResult = "";
286 const char *retval = lua_tolstring(L, -1, &length);
287 toProcess.serializedResult = std::string(retval, length);
290 lua_pop(L, 1); // Pop retval
293 jobDispatcher->putJobResult(toProcess);
296 lua_pop(L, 2); // Pop core and error handler