3 Copyright (C) 2013 sapier, <sapier AT gmx DOT net>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
33 #include "common/c_internal.h"
35 /******************************************************************************/
36 AsyncEngine::AsyncEngine() :
42 /******************************************************************************/
43 AsyncEngine::~AsyncEngine()
45 // Force kill all threads
46 for (std::vector<AsyncWorkerThread*>::iterator i = m_WorkerThreads.begin();
47 i != m_WorkerThreads.end(); i++) {
52 m_JobQueueMutex.Lock();
54 m_JobQueueMutex.Unlock();
55 m_WorkerThreads.clear();
58 /******************************************************************************/
59 bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
64 m_FunctionList[name] = func;
68 /******************************************************************************/
69 void AsyncEngine::Initialize(unsigned int numEngines)
73 for (unsigned int i = 0; i < numEngines; i++) {
74 AsyncWorkerThread* toAdd = new AsyncWorkerThread(this, i);
75 m_WorkerThreads.push_back(toAdd);
80 /******************************************************************************/
81 unsigned int AsyncEngine::doAsyncJob(std::string func, std::string params)
83 m_JobQueueMutex.Lock();
85 toadd.JobId = m_JobIdCounter++;
86 toadd.serializedFunction = func;
87 toadd.serializedParams = params;
89 m_JobQueue.push_back(toadd);
91 m_JobQueueCounter.Post();
93 m_JobQueueMutex.Unlock();
98 /******************************************************************************/
99 LuaJobInfo AsyncEngine::getJob()
101 m_JobQueueCounter.Wait();
102 m_JobQueueMutex.Lock();
105 retval.valid = false;
107 if (m_JobQueue.size() != 0) {
108 retval = m_JobQueue.front();
110 m_JobQueue.erase(m_JobQueue.begin());
112 m_JobQueueMutex.Unlock();
117 /******************************************************************************/
118 void AsyncEngine::putJobResult(LuaJobInfo result)
120 m_ResultQueueMutex.Lock();
121 m_ResultQueue.push_back(result);
122 m_ResultQueueMutex.Unlock();
125 /******************************************************************************/
126 void AsyncEngine::Step(lua_State *L, int errorhandler)
128 lua_getglobal(L, "engine");
129 m_ResultQueueMutex.Lock();
130 while (!m_ResultQueue.empty()) {
131 LuaJobInfo jobdone = m_ResultQueue.front();
132 m_ResultQueue.erase(m_ResultQueue.begin());
134 lua_getfield(L, -1, "async_event_handler");
136 if (lua_isnil(L, -1)) {
137 assert("Async event handler does not exist!" == 0);
140 luaL_checktype(L, -1, LUA_TFUNCTION);
142 lua_pushinteger(L, jobdone.JobId);
143 lua_pushlstring(L, jobdone.serializedResult.c_str(),
144 jobdone.serializedResult.length());
146 if (lua_pcall(L, 2, 0, errorhandler)) {
150 m_ResultQueueMutex.Unlock();
151 lua_pop(L, 1); // Pop engine
154 /******************************************************************************/
155 void AsyncEngine::PushFinishedJobs(lua_State* L) {
157 m_ResultQueueMutex.Lock();
159 unsigned int index = 1;
160 lua_createtable(L, m_ResultQueue.size(), 0);
161 int top = lua_gettop(L);
163 while (!m_ResultQueue.empty()) {
164 LuaJobInfo jobdone = m_ResultQueue.front();
165 m_ResultQueue.erase(m_ResultQueue.begin());
167 lua_createtable(L, 0, 2); // Pre-alocate space for two map fields
168 int top_lvl2 = lua_gettop(L);
170 lua_pushstring(L, "jobid");
171 lua_pushnumber(L, jobdone.JobId);
172 lua_settable(L, top_lvl2);
174 lua_pushstring(L, "retval");
175 lua_pushlstring(L, jobdone.serializedResult.data(),
176 jobdone.serializedResult.size());
177 lua_settable(L, top_lvl2);
179 lua_rawseti(L, top, index++);
182 m_ResultQueueMutex.Unlock();
185 /******************************************************************************/
186 void AsyncEngine::PrepareEnvironment(lua_State* L, int top) {
187 for (std::map<std::string, lua_CFunction>::iterator it = m_FunctionList.begin();
188 it != m_FunctionList.end(); it++) {
189 lua_pushstring(L, it->first.c_str());
190 lua_pushcfunction(L, it->second);
191 lua_settable(L, top);
195 /******************************************************************************/
196 AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
197 unsigned int threadNum) :
199 m_JobDispatcher(jobDispatcher),
200 m_threadnum(threadNum)
202 lua_State *L = getStack();
206 // Prepare job lua environment
208 lua_setglobal(L, "engine");
209 lua_getglobal(L, "engine");
210 int top = lua_gettop(L);
212 lua_pushstring(L, DIR_DELIM);
213 lua_setglobal(L, "DIR_DELIM");
216 (porting::path_share + DIR_DELIM + "builtin").c_str());
217 lua_setglobal(L, "SCRIPTDIR");
219 m_JobDispatcher->PrepareEnvironment(L, top);
222 /******************************************************************************/
223 AsyncWorkerThread::~AsyncWorkerThread()
225 assert(IsRunning() == false);
228 /******************************************************************************/
229 void* AsyncWorkerThread::Thread()
233 // Register thread for error logging
235 snprintf(number, sizeof(number), "%d", m_threadnum);
236 log_register_thread(std::string("AsyncWorkerThread_") + number);
238 porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
240 std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
241 + DIR_DELIM + "async_env.lua";
243 if (!loadScript(asyncscript)) {
245 << "AsyncWorkderThread execution of async base environment failed!"
250 lua_State *L = getStack();
252 while (!StopRequested()) {
254 LuaJobInfo toProcess = m_JobDispatcher->getJob();
256 if (toProcess.valid == false || StopRequested()) {
260 lua_getglobal(L, "engine");
261 if (lua_isnil(L, -1)) {
262 errorstream << "Unable to find engine within async environment!";
266 lua_getfield(L, -1, "job_processor");
267 if (lua_isnil(L, -1)) {
268 errorstream << "Unable to get async job processor!" << std::endl;
272 luaL_checktype(L, -1, LUA_TFUNCTION);
276 toProcess.serializedFunction.data(),
277 toProcess.serializedFunction.size());
279 toProcess.serializedParams.data(),
280 toProcess.serializedParams.size());
282 if (lua_pcall(L, 2, 1, m_errorhandler)) {
284 toProcess.serializedResult = "";
288 const char *retval = lua_tolstring(L, -1, &length);
289 toProcess.serializedResult = std::string(retval, length);
292 // Pop engine, job_processor, and retval
296 m_JobDispatcher->putJobResult(toProcess);
298 log_deregister_thread();