#include "lualib.h"
}
+#include "server.h"
#include "s_async.h"
#include "log.h"
#include "filesys.h"
#include "porting.h"
#include "common/c_internal.h"
-/******************************************************************************/
-AsyncEngine::AsyncEngine() :
- initDone(false),
- jobIdCounter(0)
-{
-}
-
/******************************************************************************/
AsyncEngine::~AsyncEngine()
{
// Request all threads to stop
for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
- it != workerThreads.end(); it++) {
- (*it)->Stop();
+ it != workerThreads.end(); ++it) {
+ (*it)->stop();
}
// Wake up all threads
for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
- it != workerThreads.end(); it++) {
- jobQueueCounter.Post();
+ it != workerThreads.end(); ++it) {
+ jobQueueCounter.post();
}
// Wait for threads to finish
for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
- it != workerThreads.end(); it++) {
- (*it)->Wait();
+ it != workerThreads.end(); ++it) {
+ (*it)->wait();
}
// Force kill all threads
for (std::vector<AsyncWorkerThread *>::iterator it = workerThreads.begin();
- it != workerThreads.end(); it++) {
- (*it)->Kill();
+ it != workerThreads.end(); ++it) {
delete *it;
}
- jobQueueMutex.Lock();
+ jobQueueMutex.lock();
jobQueue.clear();
- jobQueueMutex.Unlock();
+ jobQueueMutex.unlock();
workerThreads.clear();
}
/******************************************************************************/
-bool AsyncEngine::registerFunction(const char* name, lua_CFunction func)
+void AsyncEngine::registerStateInitializer(StateInitializer func)
{
- if (initDone) {
- return false;
- }
- functionList[name] = func;
- return true;
+ stateInitializers.push_back(func);
}
/******************************************************************************/
initDone = true;
for (unsigned int i = 0; i < numEngines; i++) {
- AsyncWorkerThread *toAdd = new AsyncWorkerThread(this, i);
+ AsyncWorkerThread *toAdd = new AsyncWorkerThread(this,
+ std::string("AsyncWorker-") + itos(i));
workerThreads.push_back(toAdd);
- toAdd->Start();
+ toAdd->start();
}
}
/******************************************************************************/
-unsigned int AsyncEngine::queueAsyncJob(std::string func, std::string params)
+unsigned int AsyncEngine::queueAsyncJob(const std::string &func,
+ const std::string ¶ms)
{
- jobQueueMutex.Lock();
+ jobQueueMutex.lock();
LuaJobInfo toAdd;
toAdd.id = jobIdCounter++;
toAdd.serializedFunction = func;
jobQueue.push_back(toAdd);
- jobQueueCounter.Post();
+ jobQueueCounter.post();
- jobQueueMutex.Unlock();
+ jobQueueMutex.unlock();
return toAdd.id;
}
/******************************************************************************/
LuaJobInfo AsyncEngine::getJob()
{
- jobQueueCounter.Wait();
- jobQueueMutex.Lock();
+ jobQueueCounter.wait();
+ jobQueueMutex.lock();
LuaJobInfo retval;
- retval.valid = false;
if (!jobQueue.empty()) {
retval = jobQueue.front();
jobQueue.pop_front();
retval.valid = true;
}
- jobQueueMutex.Unlock();
+ jobQueueMutex.unlock();
return retval;
}
/******************************************************************************/
-void AsyncEngine::putJobResult(LuaJobInfo result)
+void AsyncEngine::putJobResult(const LuaJobInfo &result)
{
- resultQueueMutex.Lock();
+ resultQueueMutex.lock();
resultQueue.push_back(result);
- resultQueueMutex.Unlock();
+ resultQueueMutex.unlock();
}
/******************************************************************************/
-void AsyncEngine::step(lua_State *L, int errorhandler)
+void AsyncEngine::step(lua_State *L)
{
- lua_getglobal(L, "engine");
- resultQueueMutex.Lock();
+ int error_handler = PUSH_ERROR_HANDLER(L);
+ lua_getglobal(L, "core");
+ resultQueueMutex.lock();
while (!resultQueue.empty()) {
LuaJobInfo jobDone = resultQueue.front();
resultQueue.pop_front();
lua_getfield(L, -1, "async_event_handler");
if (lua_isnil(L, -1)) {
- assert("Async event handler does not exist!" == 0);
+ FATAL_ERROR("Async event handler does not exist!");
}
luaL_checktype(L, -1, LUA_TFUNCTION);
lua_pushlstring(L, jobDone.serializedResult.data(),
jobDone.serializedResult.size());
- if (lua_pcall(L, 2, 0, errorhandler)) {
- script_error(L);
- }
+ PCALL_RESL(L, lua_pcall(L, 2, 0, error_handler));
}
- resultQueueMutex.Unlock();
- lua_pop(L, 1); // Pop engine
+ resultQueueMutex.unlock();
+ lua_pop(L, 2); // Pop core and error handler
}
/******************************************************************************/
void AsyncEngine::pushFinishedJobs(lua_State* L) {
// Result Table
- resultQueueMutex.Lock();
+ MutexAutoLock l(resultQueueMutex);
unsigned int index = 1;
lua_createtable(L, resultQueue.size(), 0);
lua_rawseti(L, top, index++);
}
-
- resultQueueMutex.Unlock();
}
/******************************************************************************/
void AsyncEngine::prepareEnvironment(lua_State* L, int top)
{
- for (std::map<std::string, lua_CFunction>::iterator it = functionList.begin();
- it != functionList.end(); it++) {
- lua_pushstring(L, it->first.c_str());
- lua_pushcfunction(L, it->second);
- lua_settable(L, top);
+ for (std::vector<StateInitializer>::iterator it = stateInitializers.begin();
+ it != stateInitializers.end(); it++) {
+ (*it)(L, top);
}
}
/******************************************************************************/
AsyncWorkerThread::AsyncWorkerThread(AsyncEngine* jobDispatcher,
- unsigned int threadNum) :
+ const std::string &name) :
+ Thread(name),
ScriptApiBase(),
- jobDispatcher(jobDispatcher),
- threadnum(threadNum)
+ jobDispatcher(jobDispatcher)
{
lua_State *L = getStack();
- luaL_openlibs(L);
-
// Prepare job lua environment
- lua_newtable(L);
- lua_setglobal(L, "engine");
- lua_getglobal(L, "engine");
+ lua_getglobal(L, "core");
int top = lua_gettop(L);
- lua_pushstring(L, DIR_DELIM);
- lua_setglobal(L, "DIR_DELIM");
-
- lua_pushstring(L,
- (porting::path_share + DIR_DELIM + "builtin").c_str());
- lua_setglobal(L, "SCRIPTDIR");
+ // Push builtin initialization type
+ lua_pushstring(L, "async");
+ lua_setglobal(L, "INIT");
jobDispatcher->prepareEnvironment(L, top);
}
/******************************************************************************/
AsyncWorkerThread::~AsyncWorkerThread()
{
- assert(IsRunning() == false);
+ sanity_check(!isRunning());
}
/******************************************************************************/
-void* AsyncWorkerThread::Thread()
+void* AsyncWorkerThread::run()
{
- ThreadStarted();
-
- // Register thread for error logging
- char number[21];
- snprintf(number, sizeof(number), "%d", threadnum);
- log_register_thread(std::string("AsyncWorkerThread_") + number);
+ lua_State *L = getStack();
- porting::setThreadName((std::string("AsyncWorkTh_") + number).c_str());
+ std::string script = getServer()->getBuiltinLuaPath() + DIR_DELIM + "init.lua";
+ try {
+ loadScript(script);
+ } catch (const ModError &e) {
+ errorstream << "Execution of async base environment failed: "
+ << e.what() << std::endl;
+ FATAL_ERROR("Execution of async base environment failed");
+ }
- std::string asyncscript = porting::path_share + DIR_DELIM + "builtin"
- + DIR_DELIM + "async_env.lua";
+ int error_handler = PUSH_ERROR_HANDLER(L);
- if (!loadScript(asyncscript)) {
- errorstream
- << "AsyncWorkderThread execution of async base environment failed!"
- << std::endl;
- abort();
+ lua_getglobal(L, "core");
+ if (lua_isnil(L, -1)) {
+ FATAL_ERROR("Unable to find core within async environment!");
}
- lua_State *L = getStack();
// Main loop
- while (!StopRequested()) {
+ while (!stopRequested()) {
// Wait for job
LuaJobInfo toProcess = jobDispatcher->getJob();
- if (toProcess.valid == false || StopRequested()) {
+ if (!toProcess.valid || stopRequested()) {
continue;
}
- lua_getglobal(L, "engine");
- if (lua_isnil(L, -1)) {
- errorstream << "Unable to find engine within async environment!";
- abort();
- }
-
lua_getfield(L, -1, "job_processor");
if (lua_isnil(L, -1)) {
- errorstream << "Unable to get async job processor!" << std::endl;
- abort();
+ FATAL_ERROR("Unable to get async job processor!");
}
luaL_checktype(L, -1, LUA_TFUNCTION);
toProcess.serializedParams.data(),
toProcess.serializedParams.size());
- if (lua_pcall(L, 2, 1, m_errorhandler)) {
- scriptError();
+ int result = lua_pcall(L, 2, 1, error_handler);
+ if (result) {
+ PCALL_RES(result);
toProcess.serializedResult = "";
} else {
// Fetch result
toProcess.serializedResult = std::string(retval, length);
}
- // Pop engine, job_processor, and retval
- lua_pop(L, 3);
+ lua_pop(L, 1); // Pop retval
// Put job result
jobDispatcher->putJobResult(toProcess);
}
- log_deregister_thread();
+
+ lua_pop(L, 2); // Pop core and error handler
+
return 0;
}