3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
34 #include "common/c_internal.h"
36 /******************************************************************************/
37 AsyncEngine::AsyncEngine() :
43 /******************************************************************************/
44 AsyncEngine::~AsyncEngine()
47 // Request all threads to stop
48 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
49 it != workerThreads.end(); it++) {
54 // Wake up all threads
55 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
56 it != workerThreads.end(); it++) {
57 jobQueueCounter.Post();
60 // Wait for threads to finish
61 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
62 it != workerThreads.end(); it++) {
66 // Force kill all threads
67 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
68 it != workerThreads.end(); it++) {
75 jobQueueMutex.Unlock();
76 workerThreads.clear();
79 /******************************************************************************/
80 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
85 functionList[name] = func;
89 /******************************************************************************/
90 void AsyncEngine::initialize(unsigned int numEngines)
94 for (unsigned int i = 0; i < numEngines; i++) {
95 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
96 workerThreads.push_back(toAdd);
101 /******************************************************************************/
102 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
104 jobQueueMutex.Lock();
106 toAdd.id = jobIdCounter++;
107 toAdd.serializedFunction = func;
108 toAdd.serializedParams = params;
110 jobQueue.push_back(toAdd);
112 jobQueueCounter.Post();
114 jobQueueMutex.Unlock();
119 /******************************************************************************/
120 LuaJobInfo AsyncEngine::getJob()
122 jobQueueCounter.Wait();
123 jobQueueMutex.Lock();
126 retval.valid = false;
128 if (!jobQueue.empty()) {
129 retval = jobQueue.front();
130 jobQueue.pop_front();
133 jobQueueMutex.Unlock();
138 /******************************************************************************/
139 void AsyncEngine::putJobResult(LuaJobInfo result)
141 resultQueueMutex.Lock();
142 resultQueue.push_back(result);
143 resultQueueMutex.Unlock();
146 /******************************************************************************/
147 void AsyncEngine::step(lua_State *L, int errorhandler)
149 lua_getglobal(L, "engine");
150 resultQueueMutex.Lock();
151 while (!resultQueue.empty()) {
152 LuaJobInfo jobDone = resultQueue.front();
153 resultQueue.pop_front();
155 lua_getfield(L, -1, "async_event_handler");
157 if (lua_isnil(L, -1)) {
158 assert("Async event handler does not exist!" == 0);
161 luaL_checktype(L, -1, LUA_TFUNCTION);
163 lua_pushinteger(L, jobDone.id);
164 lua_pushlstring(L, jobDone.serializedResult.data(),
165 jobDone.serializedResult.size());
167 if (lua_pcall(L, 2, 0, errorhandler)) {
171 resultQueueMutex.Unlock();
172 lua_pop(L, 1); // Pop engine
175 /******************************************************************************/
176 void AsyncEngine::pushFinishedJobs(lua_State* L) {
178 resultQueueMutex.Lock();
180 unsigned int index = 1;
181 lua_createtable(L, resultQueue.size(), 0);
182 int top = lua_gettop(L);
184 while (!resultQueue.empty()) {
185 LuaJobInfo jobDone = resultQueue.front();
186 resultQueue.pop_front();
188 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
189 int top_lvl2 = lua_gettop(L);
191 lua_pushstring(L, "jobid");
192 lua_pushnumber(L, jobDone.id);
193 lua_settable(L, top_lvl2);
195 lua_pushstring(L, "retval");
196 lua_pushlstring(L, jobDone.serializedResult.data(),
197 jobDone.serializedResult.size());
198 lua_settable(L, top_lvl2);
200 lua_rawseti(L, top, index++);
203 resultQueueMutex.Unlock();
206 /******************************************************************************/
207 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
209 for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
210 it != functionList.end(); it++) {
211 lua_pushstring(L, it->first.c_str());
212 lua_pushcfunction(L, it->second);
213 lua_settable(L, top);
217 /******************************************************************************/
218 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
219 unsigned int threadNum) :
221 jobDispatcher(jobDispatcher),
224 lua_State *L = getStack();
228 // Prepare job lua environment
230 lua_setglobal(L, "engine");
231 lua_getglobal(L, "engine");
232 int top = lua_gettop(L);
234 lua_pushstring(L, DIR_DELIM);
235 lua_setglobal(L, "DIR_DELIM");
237 // Push builtin initialization type
238 lua_pushstring(L, "async");
239 lua_setglobal(L, "INIT");
241 jobDispatcher->prepareEnvironment(L, top);
244 /******************************************************************************/
245 AsyncWorkerThread::~AsyncWorkerThread()
247 assert(IsRunning() == false);
250 /******************************************************************************/
251 void* AsyncWorkerThread::Thread()
255 // Register thread for error logging
257 snprintf(number, sizeof(number), "%d", threadnum);
258 log_register_thread(std::string("AsyncWorkerThread_") + number);
260 porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
262 lua_State *L = getStack();
264 std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
265 if (!loadScript(script)) {
267 << "AsyncWorkderThread execution of async base environment failed!"
273 while (!StopRequested()) {
275 LuaJobInfo toProcess = jobDispatcher->getJob();
277 if (toProcess.valid == false || StopRequested()) {
281 lua_getglobal(L, "engine");
282 if (lua_isnil(L, -1)) {
283 errorstream << "Unable to find engine within async environment!";
287 lua_getfield(L, -1, "job_processor");
288 if (lua_isnil(L, -1)) {
289 errorstream << "Unable to get async job processor!" << std::endl;
293 luaL_checktype(L, -1, LUA_TFUNCTION);
297 toProcess.serializedFunction.data(),
298 toProcess.serializedFunction.size());
300 toProcess.serializedParams.data(),
301 toProcess.serializedParams.size());
303 if (lua_pcall(L, 2, 1, m_errorhandler)) {
305 toProcess.serializedResult = "";
309 const char *retval = lua_tolstring(L, -1, &length);
310 toProcess.serializedResult = std::string(retval, length);
313 // Pop engine, job_processor, and retval
317 jobDispatcher->putJobResult(toProcess);
319 log_deregister_thread();