3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
33 #include "common/c_internal.h"
35 /******************************************************************************/
36 AsyncEngine::AsyncEngine() :
42 /******************************************************************************/
43 AsyncEngine::~AsyncEngine()
46 // Request all threads to stop
47 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
48 it != workerThreads.end(); it++) {
53 // Wake up all threads
54 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
55 it != workerThreads.end(); it++) {
56 jobQueueCounter.Post();
59 // Wait for threads to finish
60 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
61 it != workerThreads.end(); it++) {
65 // Force kill all threads
66 for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
67 it != workerThreads.end(); it++) {
74 jobQueueMutex.Unlock();
75 workerThreads.clear();
78 /******************************************************************************/
79 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
84 functionList[name] = func;
88 /******************************************************************************/
89 void AsyncEngine::initialize(unsigned int numEngines)
93 for (unsigned int i = 0; i < numEngines; i++) {
94 AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
95 workerThreads.push_back(toAdd);
100 /******************************************************************************/
101 unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
103 jobQueueMutex.Lock();
105 toAdd.id = jobIdCounter++;
106 toAdd.serializedFunction = func;
107 toAdd.serializedParams = params;
109 jobQueue.push_back(toAdd);
111 jobQueueCounter.Post();
113 jobQueueMutex.Unlock();
118 /******************************************************************************/
119 LuaJobInfo AsyncEngine::getJob()
121 jobQueueCounter.Wait();
122 jobQueueMutex.Lock();
125 retval.valid = false;
127 if (!jobQueue.empty()) {
128 retval = jobQueue.front();
129 jobQueue.pop_front();
132 jobQueueMutex.Unlock();
137 /******************************************************************************/
138 void AsyncEngine::putJobResult(LuaJobInfo result)
140 resultQueueMutex.Lock();
141 resultQueue.push_back(result);
142 resultQueueMutex.Unlock();
145 /******************************************************************************/
146 void AsyncEngine::step(lua_State *L, int errorhandler)
148 lua_getglobal(L, "engine");
149 resultQueueMutex.Lock();
150 while (!resultQueue.empty()) {
151 LuaJobInfo jobDone = resultQueue.front();
152 resultQueue.pop_front();
154 lua_getfield(L, -1, "async_event_handler");
156 if (lua_isnil(L, -1)) {
157 assert("Async event handler does not exist!" == 0);
160 luaL_checktype(L, -1, LUA_TFUNCTION);
162 lua_pushinteger(L, jobDone.id);
163 lua_pushlstring(L, jobDone.serializedResult.data(),
164 jobDone.serializedResult.size());
166 if (lua_pcall(L, 2, 0, errorhandler)) {
170 resultQueueMutex.Unlock();
171 lua_pop(L, 1); // Pop engine
174 /******************************************************************************/
175 void AsyncEngine::pushFinishedJobs(lua_State* L) {
177 resultQueueMutex.Lock();
179 unsigned int index = 1;
180 lua_createtable(L, resultQueue.size(), 0);
181 int top = lua_gettop(L);
183 while (!resultQueue.empty()) {
184 LuaJobInfo jobDone = resultQueue.front();
185 resultQueue.pop_front();
187 lua_createtable(L, 0, 2); // Pre-allocate space for two map fields
188 int top_lvl2 = lua_gettop(L);
190 lua_pushstring(L, "jobid");
191 lua_pushnumber(L, jobDone.id);
192 lua_settable(L, top_lvl2);
194 lua_pushstring(L, "retval");
195 lua_pushlstring(L, jobDone.serializedResult.data(),
196 jobDone.serializedResult.size());
197 lua_settable(L, top_lvl2);
199 lua_rawseti(L, top, index++);
202 resultQueueMutex.Unlock();
205 /******************************************************************************/
206 void AsyncEngine::prepareEnvironment(lua_State* L, int top)
208 for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
209 it != functionList.end(); it++) {
210 lua_pushstring(L, it->first.c_str());
211 lua_pushcfunction(L, it->second);
212 lua_settable(L, top);
216 /******************************************************************************/
217 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
218 unsigned int threadNum) :
220 jobDispatcher(jobDispatcher),
223 lua_State *L = getStack();
227 // Prepare job lua environment
229 lua_setglobal(L, "engine");
230 lua_getglobal(L, "engine");
231 int top = lua_gettop(L);
233 lua_pushstring(L, DIR_DELIM);
234 lua_setglobal(L, "DIR_DELIM");
237 (porting::path_share + DIR_DELIM + "builtin").c_str());
238 lua_setglobal(L, "SCRIPTDIR");
240 jobDispatcher->prepareEnvironment(L, top);
243 /******************************************************************************/
244 AsyncWorkerThread::~AsyncWorkerThread()
246 assert(IsRunning() == false);
249 /******************************************************************************/
250 void* AsyncWorkerThread::Thread()
254 // Register thread for error logging
256 snprintf(number, sizeof(number), "%d", threadnum);
257 log_register_thread(std::string("AsyncWorkerThread_") + number);
259 porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
261 std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
262 + DIR_DELIM + "async_env.lua";
264 if (!loadScript(asyncscript)) {
266 << "AsyncWorkderThread execution of async base environment failed!"
271 lua_State *L = getStack();
273 while (!StopRequested()) {
275 LuaJobInfo toProcess = jobDispatcher->getJob();
277 if (toProcess.valid == false || StopRequested()) {
281 lua_getglobal(L, "engine");
282 if (lua_isnil(L, -1)) {
283 errorstream << "Unable to find engine within async environment!";
287 lua_getfield(L, -1, "job_processor");
288 if (lua_isnil(L, -1)) {
289 errorstream << "Unable to get async job processor!" << std::endl;
293 luaL_checktype(L, -1, LUA_TFUNCTION);
297 toProcess.serializedFunction.data(),
298 toProcess.serializedFunction.size());
300 toProcess.serializedParams.data(),
301 toProcess.serializedParams.size());
303 if (lua_pcall(L, 2, 1, m_errorhandler)) {
305 toProcess.serializedResult = "";
309 const char *retval = lua_tolstring(L, -1, &length);
310 toProcess.serializedResult = std::string(retval, length);
313 // Pop engine, job_processor, and retval
317 jobDispatcher->putJobResult(toProcess);
319 log_deregister_thread();