]> git.lizzy.rs Git - minetest.git/commitdiff
Implement httpfetch module and initialize it from main()
authorKahrl <kahrl@gmx.net>
Thu, 29 Aug 2013 03:04:56 +0000 (05:04 +0200)
committerKahrl <kahrl@gmx.net>
Fri, 13 Dec 2013 17:05:10 +0000 (18:05 +0100)
Add curl_parallel_limit setting that will replace media_fetch_threads in
a later commit.

Fix a typo in MutexedQueue::pop_back() that made it impossible to compile
code that used this function. (Noticed this while implementing httpfetch.)

src/CMakeLists.txt
src/defaultsettings.cpp
src/httpfetch.cpp [new file with mode: 0644]
src/httpfetch.h [new file with mode: 0644]
src/main.cpp
src/util/container.h

index a30dc9854af3adc7c87e5c0a3c52257a5b569ef1..104e564065fa945f046f0357ff567d1d41eb929f 100644 (file)
@@ -188,7 +188,7 @@ message (STATUS "LuaJIT library: ${LUA_LIBRARY}")
 message (STATUS "LuaJIT headers: ${LUA_INCLUDE_DIR}")
 
 set(USE_LUAJIT 0)
-if(LUA_LIBRARY AND LUA_INCLUDE_DIR) 
+if(LUA_LIBRARY AND LUA_INCLUDE_DIR)
        message (STATUS "LuaJIT found.")
        set(USE_LUAJIT 1)
 else(LUA_LIBRARY AND LUA_INCLUDE_DIR)
@@ -307,6 +307,7 @@ set(common_SRCS
        pathfinder.cpp
        convert_json.cpp
        gettext.cpp
+       httpfetch.cpp
        ${JTHREAD_SRCS}
        ${common_SCRIPT_SRCS}
        ${UTIL_SRCS}
@@ -500,7 +501,7 @@ if(MSVC)
        # Flags for C files (sqlite)
        # /MT = Link statically with standard library stuff
        set(CMAKE_C_FLAGS_RELEASE "/O2 /Ob2 /MT")
-       
+
        if(BUILD_SERVER)
                set_target_properties(${PROJECT_NAME}server PROPERTIES
                                COMPILE_DEFINITIONS "SERVER")
@@ -508,13 +509,13 @@ if(MSVC)
 
 else()
        # Probably GCC
-       
+
        if(WARN_ALL)
                set(RELEASE_WARNING_FLAGS "-Wall")
        else()
                set(RELEASE_WARNING_FLAGS "")
        endif()
-       
+
        if(NOT APPLE AND NOT "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
                CHECK_CXX_COMPILER_FLAG("-Wno-unused-but-set-variable" HAS_UNUSED_BUT_SET_VARIABLE_WARNING)
                if(HAS_UNUSED_BUT_SET_VARIABLE_WARNING)
@@ -537,7 +538,7 @@ else()
        if(USE_GPROF)
                set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -pg")
        endif()
-       
+
        if(BUILD_SERVER)
                set_target_properties(${PROJECT_NAME}server PROPERTIES
                                COMPILE_DEFINITIONS "SERVER")
index 9b407b1c36326207f3fe58ad20807f10f80d2d0e..662717c8a98e6d60c59479c5bd42a242cb11de9f 100644 (file)
@@ -140,6 +140,7 @@ void set_default_settings(Settings *settings)
        settings->setDefault("enable_particles", "true");
 
        settings->setDefault("media_fetch_threads", "8");
+       settings->setDefault("curl_parallel_limit", "8");
 
        settings->setDefault("serverlist_url", "servers.minetest.net");
        settings->setDefault("serverlist_file", "favoriteservers.txt");
diff --git a/src/httpfetch.cpp b/src/httpfetch.cpp
new file mode 100644 (file)
index 0000000..4342a8b
--- /dev/null
@@ -0,0 +1,718 @@
+/*
+Minetest
+Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#include "httpfetch.h"
+#include <iostream>
+#include <sstream>
+#include <list>
+#include <map>
+#include <errno.h>
+#include "jthread/jevent.h"
+#include "config.h"
+#include "exceptions.h"
+#include "debug.h"
+#include "log.h"
+#include "util/container.h"
+#include "util/thread.h"
+#include "socket.h" // for select()
+
+JMutex g_httpfetch_mutex;
+std::map<unsigned long, std::list<HTTPFetchResult> > g_httpfetch_results;
+
+static void httpfetch_deliver_result(const HTTPFetchResult &fetchresult)
+{
+       unsigned long caller = fetchresult.caller;
+       if (caller != HTTPFETCH_DISCARD) {
+               JMutexAutoLock lock(g_httpfetch_mutex);
+               g_httpfetch_results[caller].push_back(fetchresult);
+       }
+}
+
+static void httpfetch_request_clear(unsigned long caller);
+
+unsigned long httpfetch_caller_alloc()
+{
+       JMutexAutoLock lock(g_httpfetch_mutex);
+
+       // Check each caller ID except HTTPFETCH_DISCARD
+       const unsigned long discard = HTTPFETCH_DISCARD;
+       for (unsigned long caller = discard + 1; caller != discard; ++caller) {
+               std::map<unsigned long, std::list<HTTPFetchResult> >::iterator
+                       it = g_httpfetch_results.find(caller);
+               if (it == g_httpfetch_results.end()) {
+                       verbosestream<<"httpfetch_caller_alloc: allocating "
+                                       <<caller<<std::endl;
+                       // Access element to create it
+                       g_httpfetch_results[caller];
+                       return caller;
+               }
+       }
+
+       assert("httpfetch_caller_alloc: ran out of caller IDs" == 0);
+       return discard;
+}
+
+void httpfetch_caller_free(unsigned long caller)
+{
+       verbosestream<<"httpfetch_caller_free: freeing "
+                       <<caller<<std::endl;
+
+       httpfetch_request_clear(caller);
+       if (caller != HTTPFETCH_DISCARD) {
+               JMutexAutoLock lock(g_httpfetch_mutex);
+               g_httpfetch_results.erase(caller);
+       }
+}
+
+bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetchresult)
+{
+       JMutexAutoLock lock(g_httpfetch_mutex);
+
+       // Check that caller exists
+       std::map<unsigned long, std::list<HTTPFetchResult> >::iterator
+               it = g_httpfetch_results.find(caller);
+       if (it == g_httpfetch_results.end())
+               return false;
+
+       // Check that result queue is nonempty
+       std::list<HTTPFetchResult> &callerresults = it->second;
+       if (callerresults.empty())
+               return false;
+
+       // Pop first result
+       fetchresult = callerresults.front();
+       callerresults.pop_front();
+       return true;
+}
+
+#if USE_CURL
+#include <curl/curl.h>
+
+/*
+       USE_CURL is on: use cURL based httpfetch implementation
+*/
+
+static size_t httpfetch_writefunction(
+               char *ptr, size_t size, size_t nmemb, void *userdata)
+{
+       std::ostringstream *stream = (std::ostringstream*)userdata;
+       size_t count = size * nmemb;
+       stream->write(ptr, count);
+       return count;
+}
+
+static size_t httpfetch_discardfunction(
+               char *ptr, size_t size, size_t nmemb, void *userdata)
+{
+       return size * nmemb;
+}
+
+class CurlHandlePool
+{
+       std::list<CURL*> handles;
+
+public:
+       CurlHandlePool() {}
+       ~CurlHandlePool()
+       {
+               for (std::list<CURL*>::iterator it = handles.begin();
+                               it != handles.end(); ++it) {
+                       curl_easy_cleanup(*it);
+               }
+       }
+       CURL * alloc()
+       {
+               CURL *curl;
+               if (handles.empty()) {
+                       curl = curl_easy_init();
+                       if (curl == NULL) {
+                               errorstream<<"curl_easy_init returned NULL"<<std::endl;
+                       }
+               }
+               else {
+                       curl = handles.front();
+                       handles.pop_front();
+               }
+               return curl;
+       }
+       void free(CURL *handle)
+       {
+               if (handle)
+                       handles.push_back(handle);
+       }
+};
+
+struct HTTPFetchOngoing
+{
+       CurlHandlePool *pool;
+       CURL *curl;
+       CURLM *multi;
+       HTTPFetchRequest request;
+       HTTPFetchResult result;
+       std::ostringstream oss;
+       char *post_fields;
+       struct curl_slist *httpheader;
+
+       HTTPFetchOngoing(HTTPFetchRequest request_, CurlHandlePool *pool_):
+               pool(pool_),
+               curl(NULL),
+               multi(NULL),
+               request(request_),
+               result(request_),
+               oss(std::ios::binary),
+               httpheader(NULL)
+       {
+               curl = pool->alloc();
+               if (curl != NULL) {
+                       // Set static cURL options
+                       curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+                       curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1);
+                       curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+                       curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 1);
+
+#if LIBCURL_VERSION_NUM >= 0x071304
+                       // Restrict protocols so that curl vulnerabilities in
+                       // other protocols don't affect us.
+                       // These settings were introduced in curl 7.19.4.
+                       long protocols =
+                               CURLPROTO_HTTP |
+                               CURLPROTO_HTTPS |
+                               CURLPROTO_FTP |
+                               CURLPROTO_FTPS;
+                       curl_easy_setopt(curl, CURLOPT_PROTOCOLS, protocols);
+                       curl_easy_setopt(curl, CURLOPT_REDIR_PROTOCOLS, protocols);
+#endif
+
+                       // Set cURL options based on HTTPFetchRequest
+                       curl_easy_setopt(curl, CURLOPT_URL,
+                                       request.url.c_str());
+                       curl_easy_setopt(curl, CURLOPT_TIMEOUT_MS,
+                                       request.timeout);
+                       curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT_MS,
+                                       request.connect_timeout);
+                       // Set up a write callback that writes to the
+                       // ostringstream ongoing->oss, unless the data
+                       // is to be discarded
+                       if (request.caller == HTTPFETCH_DISCARD) {
+                               curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
+                                               httpfetch_discardfunction);
+                               curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
+                       }
+                       else {
+                               curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
+                                               httpfetch_writefunction);
+                               curl_easy_setopt(curl, CURLOPT_WRITEDATA, &oss);
+                       }
+                       // Set POST (or GET) data
+                       if (request.post_fields.empty()) {
+                               curl_easy_setopt(curl, CURLOPT_HTTPGET, 1);
+                       }
+                       else {
+                               curl_easy_setopt(curl, CURLOPT_POST, 1);
+                               curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE,
+                                               request.post_fields.size());
+                               curl_easy_setopt(curl, CURLOPT_POSTFIELDS,
+                                               request.post_fields.c_str());
+                               // request.post_fields must now *never* be
+                               // modified until CURLOPT_POSTFIELDS is cleared
+                       }
+                       // Set additional HTTP headers
+                       for (size_t i = 0; i < request.extra_headers.size(); ++i) {
+                               httpheader = curl_slist_append(
+                                       httpheader,
+                                       request.extra_headers[i].c_str());
+                       }
+                       curl_easy_setopt(curl, CURLOPT_HTTPHEADER, httpheader);
+               }
+       }
+
+       CURLcode start(CURLM *multi_)
+       {
+               if (curl == NULL)
+                       return CURLE_FAILED_INIT;
+
+               if (multi_) {
+                       // Multi interface (async)
+                       CURLMcode mres = curl_multi_add_handle(multi_, curl);
+                       if (mres != CURLM_OK) {
+                               errorstream<<"curl_multi_add_handle"
+                                       <<" returned error code "<<mres
+                                       <<std::endl;
+                               return CURLE_FAILED_INIT;
+                       }
+                       multi = multi_; // store for curl_multi_remove_handle
+                       return CURLE_OK;
+               }
+               else {
+                       // Easy interface (sync)
+                       return curl_easy_perform(curl);
+               }
+       }
+
+       void complete(CURLcode res)
+       {
+               result.succeeded = (res == CURLE_OK);
+               result.timeout = (res == CURLE_OPERATION_TIMEDOUT);
+               result.data = oss.str();
+
+               // Get HTTP/FTP response code
+               result.response_code = 0;
+               if (curl != NULL) {
+                       if (curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE,
+                                       &result.response_code) != CURLE_OK) {
+                               result.response_code = 0;
+                       }
+               }
+
+               if (res != CURLE_OK) {
+                       infostream<<request.url<<" not found ("
+                               <<curl_easy_strerror(res)<<")"
+                               <<" (response code "<<result.response_code<<")"
+                               <<std::endl;
+               }
+       }
+
+       ~HTTPFetchOngoing()
+       {
+               if (multi != NULL) {
+                       CURLMcode mres = curl_multi_remove_handle(multi, curl);
+                       if (mres != CURLM_OK) {
+                               errorstream<<"curl_multi_remove_handle"
+                                       <<" returned error code "<<mres
+                                       <<std::endl;
+                       }
+               }
+
+               // Set safe options for the reusable cURL handle
+               curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
+                               httpfetch_discardfunction);
+               curl_easy_setopt(curl, CURLOPT_WRITEDATA, NULL);
+               curl_easy_setopt(curl, CURLOPT_POSTFIELDS, NULL);
+               if (httpheader != NULL) {
+                       curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
+                       curl_slist_free_all(httpheader);
+               }
+
+               // Store the cURL handle for reuse
+               pool->free(curl);
+       }
+};
+
+class CurlFetchThread : public SimpleThread
+{
+protected:
+       enum RequestType {
+               RT_FETCH,
+               RT_CLEAR,
+               RT_WAKEUP,
+       };
+
+       struct Request {
+               RequestType type;
+               HTTPFetchRequest fetchrequest;
+               Event *event;
+       };
+
+       CURLM *m_multi;
+       MutexedQueue<Request> m_requests;
+       size_t m_parallel_limit;
+
+       // Variables exclusively used within thread
+       std::vector<HTTPFetchOngoing*> m_all_ongoing;
+       std::list<HTTPFetchRequest> m_queued_fetches;
+
+public:
+       CurlFetchThread(int parallel_limit)
+       {
+               if (parallel_limit >= 1)
+                       m_parallel_limit = parallel_limit;
+               else
+                       m_parallel_limit = 1;
+       }
+
+       void requestFetch(const HTTPFetchRequest &fetchrequest)
+       {
+               Request req;
+               req.type = RT_FETCH;
+               req.fetchrequest = fetchrequest;
+               req.event = NULL;
+               m_requests.push_back(req);
+       }
+
+       void requestClear(unsigned long caller, Event *event)
+       {
+               Request req;
+               req.type = RT_CLEAR;
+               req.fetchrequest.caller = caller;
+               req.event = event;
+               m_requests.push_back(req);
+       }
+
+       void requestWakeUp()
+       {
+               Request req;
+               req.type = RT_WAKEUP;
+               req.event = NULL;
+               m_requests.push_back(req);
+       }
+
+protected:
+       // Handle a request from some other thread
+       // E.g. new fetch; clear fetches for one caller; wake up
+       void processRequest(const Request &req)
+       {
+               if (req.type == RT_FETCH) {
+                       // New fetch, queue until there are less
+                       // than m_parallel_limit ongoing fetches
+                       m_queued_fetches.push_back(req.fetchrequest);
+
+                       // see processQueued() for what happens next
+
+               }
+               else if (req.type == RT_CLEAR) {
+                       unsigned long caller = req.fetchrequest.caller;
+
+                       // Abort all ongoing fetches for the caller
+                       for (std::vector<HTTPFetchOngoing*>::iterator
+                                       it = m_all_ongoing.begin();
+                                       it != m_all_ongoing.end();) {
+                               if ((*it)->request.caller == caller) {
+                                       delete (*it);
+                                       it = m_all_ongoing.erase(it);
+                               }
+                               else
+                                       ++it;
+                       }
+
+                       // Also abort all queued fetches for the caller
+                       for (std::list<HTTPFetchRequest>::iterator
+                                       it = m_queued_fetches.begin();
+                                       it != m_queued_fetches.end();) {
+                               if ((*it).caller == caller)
+                                       it = m_queued_fetches.erase(it);
+                               else
+                                       ++it;
+                       }
+               }
+               else if (req.type == RT_WAKEUP) {
+                       // Wakeup: Nothing to do, thread is awake at this point
+               }
+
+               if (req.event != NULL)
+                       req.event->signal();
+       }
+
+       // Start new ongoing fetches if m_parallel_limit allows
+       void processQueued(CurlHandlePool *pool)
+       {
+               while (m_all_ongoing.size() < m_parallel_limit &&
+                               !m_queued_fetches.empty()) {
+                       HTTPFetchRequest request = m_queued_fetches.front();
+                       m_queued_fetches.pop_front();
+
+                       // Create ongoing fetch data and make a cURL handle
+                       // Set cURL options based on HTTPFetchRequest
+                       HTTPFetchOngoing *ongoing =
+                               new HTTPFetchOngoing(request, pool);
+
+                       // Initiate the connection (curl_multi_add_handle)
+                       CURLcode res = ongoing->start(m_multi);
+                       if (res == CURLE_OK) {
+                               m_all_ongoing.push_back(ongoing);
+                       }
+                       else {
+                               ongoing->complete(res);
+                               httpfetch_deliver_result(ongoing->result);
+                               delete ongoing;
+                       }
+               }
+       }
+
+       // Process CURLMsg (indicates completion of a fetch)
+       void processCurlMessage(CURLMsg *msg)
+       {
+               // Determine which ongoing fetch the message pertains to
+               size_t i = 0;
+               bool found = false;
+               for (i = 0; i < m_all_ongoing.size(); ++i) {
+                       if (m_all_ongoing[i]->curl == msg->easy_handle) {
+                               found = true;
+                               break;
+                       }
+               }
+               if (msg->msg == CURLMSG_DONE && found) {
+                       // m_all_ongoing[i] succeeded or failed.
+                       HTTPFetchOngoing *ongoing = m_all_ongoing[i];
+                       ongoing->complete(msg->data.result);
+                       httpfetch_deliver_result(ongoing->result);
+                       delete ongoing;
+                       m_all_ongoing.erase(m_all_ongoing.begin() + i);
+               }
+       }
+
+       // Wait for a request from another thread, or timeout elapses
+       void waitForRequest(long timeout)
+       {
+               if (m_queued_fetches.empty()) {
+                       try {
+                               Request req = m_requests.pop_front(timeout);
+                               processRequest(req);
+                       }
+                       catch (ItemNotFoundException &e) {}
+               }
+       }
+
+       // Wait until some IO happens, or timeout elapses
+       void waitForIO(long timeout)
+       {
+               fd_set read_fd_set;
+               fd_set write_fd_set;
+               fd_set exc_fd_set;
+               int max_fd;
+               long select_timeout = -1;
+               struct timeval select_tv;
+               CURLMcode mres;
+
+               FD_ZERO(&read_fd_set);
+               FD_ZERO(&write_fd_set);
+               FD_ZERO(&exc_fd_set);
+
+               mres = curl_multi_fdset(m_multi, &read_fd_set,
+                               &write_fd_set, &exc_fd_set, &max_fd);
+               if (mres != CURLM_OK) {
+                       errorstream<<"curl_multi_fdset"
+                               <<" returned error code "<<mres
+                               <<std::endl;
+                       select_timeout = 0;
+               }
+
+               mres = curl_multi_timeout(m_multi, &select_timeout);
+               if (mres != CURLM_OK) {
+                       errorstream<<"curl_multi_timeout"
+                               <<" returned error code "<<mres
+                               <<std::endl;
+                       select_timeout = 0;
+               }
+
+               // Limit timeout so new requests get through
+               if (select_timeout < 0 || select_timeout > timeout)
+                       select_timeout = timeout;
+
+               if (select_timeout > 0) {
+                       select_tv.tv_sec = select_timeout / 1000;
+                       select_tv.tv_usec = (select_timeout % 1000) * 1000;
+                       int retval = select(max_fd + 1, &read_fd_set,
+                                       &write_fd_set, &exc_fd_set,
+                                       &select_tv);
+                       if (retval == -1) {
+                               #ifdef _WIN32
+                               errorstream<<"select returned error code "
+                                       <<WSAGetLastError()<<std::endl;
+                               #else
+                               errorstream<<"select returned error code "
+                                       <<errno<<std::endl;
+                               #endif
+                       }
+               }
+       }
+
+       void * Thread()
+       {
+               ThreadStarted();
+               log_register_thread("CurlFetchThread");
+               DSTACK(__FUNCTION_NAME);
+
+               CurlHandlePool pool;
+
+               m_multi = curl_multi_init();
+               if (m_multi == NULL) {
+                       errorstream<<"curl_multi_init returned NULL\n";
+                       return NULL;
+               }
+
+               assert(m_all_ongoing.empty());
+
+               while (getRun()) {
+                       BEGIN_DEBUG_EXCEPTION_HANDLER
+
+                       /*
+                               Handle new async requests
+                       */
+
+                       while (!m_requests.empty()) {
+                               Request req = m_requests.pop_front();
+                               processRequest(req);
+                       }
+                       processQueued(&pool);
+
+                       /*
+                               Handle ongoing async requests
+                       */
+
+                       int still_ongoing = 0;
+                       while (curl_multi_perform(m_multi, &still_ongoing) ==
+                                       CURLM_CALL_MULTI_PERFORM)
+                               /* noop */;
+
+                       /*
+                               Handle completed async requests
+                       */
+                       if (still_ongoing < (int) m_all_ongoing.size()) {
+                               CURLMsg *msg;
+                               int msgs_in_queue;
+                               msg = curl_multi_info_read(m_multi, &msgs_in_queue);
+                               while (msg != NULL) {
+                                       processCurlMessage(msg);
+                                       msg = curl_multi_info_read(m_multi, &msgs_in_queue);
+                               }
+                       }
+
+                       /*
+                               If there are ongoing requests, wait for data
+                               (with a timeout of 100ms so that new requests
+                               can be processed).
+
+                               If no ongoing requests, wait for a new request.
+                               (Possibly an empty request that signals
+                               that the thread should be stopped.)
+                       */
+                       if (m_all_ongoing.empty())
+                               waitForRequest(100000000);
+                       else
+                               waitForIO(100);
+
+                       END_DEBUG_EXCEPTION_HANDLER(errorstream)
+               }
+
+               // Call curl_multi_remove_handle and cleanup easy handles
+               for (size_t i = 0; i < m_all_ongoing.size(); ++i) {
+                       delete m_all_ongoing[i];
+               }
+               m_all_ongoing.clear();
+
+               m_queued_fetches.clear();
+
+               CURLMcode mres = curl_multi_cleanup(m_multi);
+               if (mres != CURLM_OK) {
+                       errorstream<<"curl_multi_cleanup"
+                               <<" returned error code "<<mres
+                               <<std::endl;
+               }
+
+               return NULL;
+       }
+};
+
+CurlFetchThread *g_httpfetch_thread = NULL;
+
+void httpfetch_init(int parallel_limit)
+{
+       verbosestream<<"httpfetch_init: parallel_limit="<<parallel_limit
+                       <<std::endl;
+
+       CURLcode res = curl_global_init(CURL_GLOBAL_DEFAULT);
+       assert(res == CURLE_OK);
+
+       g_httpfetch_thread = new CurlFetchThread(parallel_limit);
+}
+
+void httpfetch_cleanup()
+{
+       verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;
+
+       g_httpfetch_thread->setRun(false);
+       g_httpfetch_thread->requestWakeUp();
+       g_httpfetch_thread->stop();
+       delete g_httpfetch_thread;
+
+       curl_global_cleanup();
+}
+
+void httpfetch_async(const HTTPFetchRequest &fetchrequest)
+{
+       g_httpfetch_thread->requestFetch(fetchrequest);
+       if (!g_httpfetch_thread->IsRunning())
+               g_httpfetch_thread->Start();
+}
+
+static void httpfetch_request_clear(unsigned long caller)
+{
+       if (g_httpfetch_thread->IsRunning()) {
+               Event event;
+               g_httpfetch_thread->requestClear(caller, &event);
+               event.wait();
+       }
+       else {
+               g_httpfetch_thread->requestClear(caller, NULL);
+       }
+}
+
+void httpfetch_sync(const HTTPFetchRequest &fetchrequest,
+               HTTPFetchResult &fetchresult)
+{
+       // Create ongoing fetch data and make a cURL handle
+       // Set cURL options based on HTTPFetchRequest
+       CurlHandlePool pool;
+       HTTPFetchOngoing ongoing(fetchrequest, &pool);
+       // Do the fetch (curl_easy_perform)
+       CURLcode res = ongoing.start(NULL);
+       // Update fetchresult
+       ongoing.complete(res);
+       fetchresult = ongoing.result;
+}
+
+#else  // USE_CURL
+
+/*
+       USE_CURL is off:
+
+       Dummy httpfetch implementation that always returns an error.
+*/
+
+void httpfetch_init(int parallel_limit)
+{
+}
+
+void httpfetch_cleanup()
+{
+}
+
+void httpfetch_async(const HTTPFetchRequest &fetchrequest)
+{
+       errorstream<<"httpfetch_async: unable to fetch "<<fetchrequest.url
+                       <<" because USE_CURL=0"<<std::endl;
+
+       HTTPFetchResult fetchresult(fetchrequest); // sets succeeded = false etc.
+       httpfetch_deliver_result(fetchresult);
+}
+
+static void httpfetch_request_clear(unsigned long caller)
+{
+}
+
+void httpfetch_sync(const HTTPFetchRequest &fetchrequest,
+               HTTPFetchResult &fetchresult)
+{
+       errorstream<<"httpfetch_sync: unable to fetch "<<fetchrequest.url
+                       <<" because USE_CURL=0"<<std::endl;
+
+       fetchresult = HTTPFetchResult(fetchrequest); // sets succeeded = false etc.
+}
+
+#endif  // USE_CURL
diff --git a/src/httpfetch.h b/src/httpfetch.h
new file mode 100644 (file)
index 0000000..56a198b
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+Minetest
+Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
+
+This program is free software; you can redistribute it and/or modify
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License along
+with this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#ifndef HTTPFETCH_HEADER
+#define HTTPFETCH_HEADER
+
+#include <string>
+#include <vector>
+#include "config.h"
+
+// Can be used in place of "caller" in asynchronous transfers to discard result
+// (used as default value of "caller")
+#define HTTPFETCH_DISCARD 0
+
+struct HTTPFetchRequest
+{
+       std::string url;
+
+       // Identifies the caller (for asynchronous requests)
+       // Ignored by httpfetch_sync
+       unsigned long caller;
+
+       // Some number that identifies the request
+       // (when the same caller issues multiple httpfetch_async calls)
+       unsigned long request_id;
+
+       // Timeout for the whole transfer, in milliseconds
+       long timeout;
+
+       // Timeout for the connection phase, in milliseconds
+       long connect_timeout;
+
+       // POST data (should be application/x-www-form-urlencoded
+       // unless a Content-Type header is specified in extra_headers)
+       // If this is empty a GET request is done instead.
+       std::string post_fields;
+
+       // If not empty, should contain entries such as "Accept: text/html"
+       std::vector<std::string> extra_headers;
+
+       HTTPFetchRequest()
+       {
+               url = "";
+               caller = HTTPFETCH_DISCARD;
+               request_id = 0;
+               timeout = 0;
+               connect_timeout = 0;
+       }
+};
+
+struct HTTPFetchResult
+{
+       bool succeeded;
+       bool timeout;
+       long response_code;
+       std::string data;
+       // The caller and request_id from the corresponding HTTPFetchRequest.
+       unsigned long caller;
+       unsigned long request_id;
+
+       HTTPFetchResult()
+       {
+               succeeded = false;
+               timeout = false;
+               response_code = 0;
+               data = "";
+               caller = HTTPFETCH_DISCARD;
+               request_id = 0;
+       }
+
+       HTTPFetchResult(const HTTPFetchRequest &fetchrequest)
+       {
+               succeeded = false;
+               timeout = false;
+               response_code = 0;
+               data = "";
+               caller = fetchrequest.caller;
+               request_id = fetchrequest.request_id;
+       }
+};
+
+// Initializes the httpfetch module
+void httpfetch_init(int parallel_limit);
+
+// Stops the httpfetch thread and cleans up resources
+void httpfetch_cleanup();
+
+// Starts an asynchronous HTTP fetch request
+void httpfetch_async(const HTTPFetchRequest &fetchrequest);
+
+// If any fetch for the given caller ID is complete, removes it from the
+// result queue, sets fetchresult and returns true. Otherwise returns false.
+bool httpfetch_async_get(unsigned long caller, HTTPFetchResult &fetchresult);
+
+// Allocates a caller ID for httpfetch_async
+// Not required if you want to set caller = HTTPFETCH_DISCARD
+unsigned long httpfetch_caller_alloc();
+
+// Frees a caller ID allocated with httpfetch_caller_alloc
+// Note: This can be expensive, because the httpfetch thread is told
+// to stop any ongoing fetches for the given caller.
+void httpfetch_caller_free(unsigned long caller);
+
+// Performs a synchronous HTTP request. This blocks and therefore should
+// only be used from background threads.
+void httpfetch_sync(const HTTPFetchRequest &fetchrequest,
+               HTTPFetchResult &fetchresult);
+
+
+#endif // !HTTPFETCH_HEADER
index 57c2f06a545fc5f487cc7f5e4e84a227c224a282..239d68246e636bd7ae8f4534272004a70061e5b2 100644 (file)
@@ -77,6 +77,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "subgame.h"
 #include "quicktune.h"
 #include "serverlist.h"
+#include "httpfetch.h"
 #include "guiEngine.h"
 #include "mapsector.h"
 
@@ -1001,6 +1002,9 @@ int main(int argc, char *argv[])
        assert(res == CURLE_OK);
 #endif
 
+       // Initialize HTTP fetcher
+       httpfetch_init(g_settings->getS32("curl_parallel_limit"));
+
        /*
                Run unit tests
        */
@@ -1858,6 +1862,9 @@ int main(int argc, char *argv[])
                }
        }
 
+       // Stop httpfetch thread (if started)
+       httpfetch_cleanup();
+
        END_DEBUG_EXCEPTION_HANDLER(errorstream)
 
        debugstreams_deinit();
index fdd76cc598a3b36a552b31c678841101fc23f316..e83c3cd375a9b7ea4f755d4aa7f9817163974641 100644 (file)
@@ -297,7 +297,8 @@ class MutexedQueue
 
                                if(!m_list.empty())
                                {
-                                       typename std::list<T>::iterator last = m_list.back();
+                                       typename std::list<T>::iterator last = m_list.end();
+                                       last--;
                                        T t = *last;
                                        m_list.erase(last);
                                        return t;