]> git.lizzy.rs Git - rust.git/commitdiff
std: Add some hacks to use libuv
authorBrian Anderson <banderson@mozilla.com>
Sun, 22 Jan 2012 01:29:52 +0000 (17:29 -0800)
committerBrian Anderson <banderson@mozilla.com>
Mon, 23 Jan 2012 04:06:58 +0000 (20:06 -0800)
mk/rt.mk
src/libstd/std.rc
src/libstd/uvtmp.rs [new file with mode: 0644]
src/rt/rust_uv.cpp
src/rt/rust_uvtmp.cpp [new file with mode: 0644]
src/rt/rustrt.def.in

index 2eae42087290badf1ca360df307dc4aca2b1e710..a161f298a6397467777e1cf6a0c81137ed59c3d0 100644 (file)
--- a/mk/rt.mk
+++ b/mk/rt.mk
@@ -48,6 +48,7 @@ RUNTIME_CS_$(1) := \
               rt/rust_port.cpp \
               rt/rust_upcall.cpp \
               rt/rust_uv.cpp \
+              rt/rust_uvtmp.cpp \
               rt/rust_log.cpp \
               rt/rust_timer.cpp \
               rt/circular_buffer.cpp \
index f8d105ef77edb5d7a60e1b48b9e35dc91f8ae1b3..bcfe108eae52425e3f5552f3e44a96c7249963df 100644 (file)
@@ -7,7 +7,7 @@
 #[license = "MIT"];
 #[crate_type = "lib"];
 
-export fs, io, net, run, uv;
+export fs, io, net, run, uv, uvtmp;
 export c_vec, four, tri, util;
 export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind;
 export rope;
@@ -25,6 +25,7 @@ mod net;
 #[path =  "run_program.rs"]
 mod run;
 mod uv;
+mod uvtmp;
 
 
 // Utility modules
diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs
new file mode 100644 (file)
index 0000000..e205989
--- /dev/null
@@ -0,0 +1,149 @@
+// Some temporary libuv hacks for servo
+
+#[cfg(target_os = "linux")];
+#[cfg(target_os = "macos")];
+#[cfg(target_os = "freebsd")];
+
+
+#[nolink]
+native mod rustrt {
+    fn rust_uvtmp_create_thread() -> thread;
+    fn rust_uvtmp_start_thread(thread: thread);
+    fn rust_uvtmp_join_thread(thread: thread);
+    fn rust_uvtmp_delete_thread(thread: thread);
+    fn rust_uvtmp_connect(
+        thread: thread,
+        ip: str::sbuf,
+        chan: comm::chan<iomsg>);
+    fn rust_uvtmp_close_connection(thread: thread, cd: connect_data);
+    fn rust_uvtmp_write(
+        thread: thread,
+        cd: connect_data,
+        buf: *u8,
+        len: ctypes::size_t,
+        chan: comm::chan<iomsg>);
+    fn rust_uvtmp_read_start(
+        thread: thread,
+        cd: connect_data,
+        chan: comm::chan<iomsg>);
+    fn rust_uvtmp_delete_buf(buf: *u8);
+}
+
+type thread = *ctypes::void;
+
+type connect_data = *ctypes::void;
+
+enum iomsg {
+    whatever,
+    connected(connect_data),
+    wrote(connect_data),
+    read(connect_data, *u8, ctypes::ssize_t)
+}
+
+fn create_thread() -> thread {
+    rustrt::rust_uvtmp_create_thread()
+}
+
+fn start_thread(thread: thread) {
+    rustrt::rust_uvtmp_start_thread(thread)
+}
+
+fn join_thread(thread: thread) {
+    rustrt::rust_uvtmp_join_thread(thread)
+}
+
+fn delete_thread(thread: thread) {
+    rustrt::rust_uvtmp_delete_thread(thread)
+}
+
+fn connect(thread: thread, ip: str, ch: comm::chan<iomsg>) {
+    str::as_buf(ip) {|ipbuf|
+        rustrt::rust_uvtmp_connect(thread, ipbuf, ch)
+    }
+}
+
+fn close_connection(thread: thread, cd: connect_data) {
+    rustrt::rust_uvtmp_close_connection(thread ,cd);
+}
+
+fn write(thread: thread, cd: connect_data,bytes: [u8],
+         chan: comm::chan<iomsg>) unsafe {
+    rustrt::rust_uvtmp_write(
+        thread, cd, vec::to_ptr(bytes), vec::len(bytes), chan);
+}
+
+fn read_start(thread: thread, cd: connect_data,
+              chan: comm::chan<iomsg>) {
+    rustrt::rust_uvtmp_read_start(thread, cd, chan);
+}
+
+fn delete_buf(buf: *u8) {
+    rustrt::rust_uvtmp_delete_buf(buf);
+}
+
+#[test]
+fn test_start_stop() {
+    let thread = create_thread();
+    start_thread(thread);
+    join_thread(thread);
+    delete_thread(thread);
+}
+
+#[test]
+#[ignore]
+fn test_connect() {
+    let thread = create_thread();
+    start_thread(thread);
+    let port = comm::port();
+    let chan = comm::chan(port);
+    connect(thread, "74.125.224.146", chan);
+    alt comm::recv(port) {
+      connected(cd) {
+        close_connection(thread, cd);
+      }
+    }
+    join_thread(thread);
+    delete_thread(thread);
+}
+
+#[test]
+#[ignore]
+fn test_http() {
+    let thread = create_thread();
+    start_thread(thread);
+    let port = comm::port();
+    let chan = comm::chan(port);
+    connect(thread, "74.125.224.146", chan);
+    alt comm::recv(port) {
+      connected(cd) {
+        write(thread, cd, str::bytes("GET / HTTP/1.0\n\n"), chan);
+        alt comm::recv(port) {
+          wrote(cd) {
+            read_start(thread, cd, chan);
+            let keep_going = true;
+            while keep_going {
+                alt comm::recv(port) {
+                  read(_, buf, -1) {
+                    keep_going = false;
+                    delete_buf(buf);
+                  }
+                  read(_, buf, len) {
+                    unsafe {
+                        log(error, len);
+                        let buf = vec::unsafe::from_buf(buf, len as uint);
+                        let str = str::unsafe_from_bytes(buf);
+                        #error("read something");
+                        io::println(str);
+                    }
+                    delete_buf(buf);
+                  }
+                }
+            }
+            close_connection(thread, cd);
+          }
+        }
+      }
+    }
+    join_thread(thread);
+    delete_thread(thread);
+}
\ No newline at end of file
index b339d77c043a2f61f4d75d02ab7fccd59e9608d8..9ee1b844add2c1c70b9b34c3c4e8a7d61263aaba 100644 (file)
@@ -48,3 +48,4 @@ extern "C" CDECL size_t
 rust_uv_size_of_idle_t() {
   return sizeof(uv_idle_t);
 }
+
diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp
new file mode 100644 (file)
index 0000000..3d4cb89
--- /dev/null
@@ -0,0 +1,346 @@
+#include <utility>
+#include <queue>
+#include <string>
+#include "rust_internal.h"
+#include "uv.h"
+
+class rust_uvtmp_thread;
+
+struct connect_data {
+    rust_uvtmp_thread *thread;
+    uv_connect_t connect;
+    uv_tcp_t tcp;
+    chan_handle chan;
+};
+
+const intptr_t connected_tag = 1;
+const intptr_t wrote_tag = 2;
+const intptr_t read_tag = 3;
+
+struct iomsg {
+    intptr_t tag;
+    union {
+       connect_data *connected_val;
+       connect_data *wrote_val;
+       struct {
+           connect_data *cd;
+           uint8_t *buf;
+           ssize_t nread;
+       } read_val;
+    } val;
+};
+
+struct write_data {
+    connect_data *cd;
+    uint8_t *buf;
+    size_t len;
+    chan_handle chan;
+};
+
+struct read_start_data {
+    connect_data *cd;
+    chan_handle chan;
+};
+
+// FIXME: Copied from rust_builtins.cpp. Could bitrot easily
+static void
+send(rust_task *task, chan_handle chan, void *data) {
+    rust_task *target_task = task->kernel->get_task_by_id(chan.task);
+    if(target_task) {
+        rust_port *port = target_task->get_port_by_id(chan.port);
+        if(port) {
+            port->send(data);
+            scoped_lock with(target_task->lock);
+            port->deref();
+        }
+        target_task->deref();
+    }
+}
+
+class rust_uvtmp_thread : public rust_thread {
+
+private:
+    rust_task *task;
+    uv_loop_t *loop;
+    uv_idle_t idle;
+    lock_and_signal lock;
+    bool stop_flag;
+    std::queue<std::pair<std::string, chan_handle> > connect_queue;
+    std::queue<connect_data*> close_connection_queue;
+    std::queue<write_data*> write_queue;
+    std::queue<read_start_data*> read_start_queue;
+
+public:
+
+    rust_uvtmp_thread() {
+       task = rust_scheduler::get_task();
+       stop_flag = false;
+       loop = uv_loop_new();
+       uv_idle_init(loop, &idle);
+       idle.data = this;
+       uv_idle_start(&idle, idle_cb);
+    }
+
+    ~rust_uvtmp_thread() {
+       uv_loop_delete(loop);
+    }
+
+    void stop() {
+       scoped_lock with(lock);
+       stop_flag = true;
+    }
+
+    void connect(char *ip, chan_handle chan) {
+       scoped_lock with(lock);
+       connect_queue.push(std::pair<std::string, chan_handle>
+                          (std::string(ip), chan));
+    }
+
+    void
+    close_connection(connect_data *cd) {
+       scoped_lock with(lock);
+       close_connection_queue.push(cd);
+    }
+
+    void
+    write(connect_data *cd, uint8_t *buf, size_t len, chan_handle chan) {
+       scoped_lock with(lock);
+       write_data *wd = new write_data();
+       wd->cd = cd;
+       wd->buf = new uint8_t[len];
+       wd->len = len;
+       wd->chan = chan;
+
+       memcpy(wd->buf, buf, len);
+
+       write_queue.push(wd);
+    }
+
+    void
+    read_start(connect_data *cd, chan_handle chan) {
+       scoped_lock with(lock);
+       read_start_data *rd = new read_start_data();
+       rd->cd = cd;
+       rd->chan = chan;
+
+       read_start_queue.push(rd);
+    }
+
+private:
+
+    virtual void
+    run() {
+       uv_run(loop);
+    }
+
+    static void
+    idle_cb(uv_idle_t* handle, int status) {
+       rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data;
+       self->on_idle();
+    }
+
+    void
+    on_idle() {
+       scoped_lock with(lock);
+       make_new_connections();
+       close_connections();
+       write_buffers();
+       start_reads();
+       close_idle_if_stop();
+    }
+
+    void
+    make_new_connections() {
+       assert(lock.lock_held_by_current_thread());
+       while (!connect_queue.empty()) {
+           std::pair<std::string, chan_handle> pair = connect_queue.front();
+           connect_queue.pop();
+           struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0);
+           struct sockaddr_in server_addr = uv_ip4_addr(pair.first.c_str(), 80);
+
+           connect_data *cd = new connect_data();
+           cd->thread = this;
+           cd->chan = pair.second;
+           cd->connect.data = cd;
+
+           uv_tcp_init(loop, &cd->tcp);
+           uv_tcp_bind(&cd->tcp, client_addr);
+
+           uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb);
+       }
+    }
+
+    static void
+    connect_cb(uv_connect_t *handle, int status) {
+       connect_data *cd = (connect_data*)handle->data;
+       cd->thread->on_connect(cd);
+    }
+
+    void
+    on_connect(connect_data *cd) {
+       iomsg msg;
+       msg.tag = connected_tag;
+       msg.val.connected_val = cd;
+
+       send(task, cd->chan, &msg);
+    }
+
+    void
+    close_connections() {
+       assert(lock.lock_held_by_current_thread());
+       while (!close_connection_queue.empty()) {
+           connect_data *cd = close_connection_queue.front();
+           close_connection_queue.pop();
+           
+           cd->tcp.data = cd;
+           
+           uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb);
+       }
+    }
+
+    static void
+    tcp_close_cb(uv_handle_t *handle) {
+       connect_data *cd = (connect_data*)handle->data;
+       delete cd;
+    }
+
+    void
+    write_buffers() {
+       assert(lock.lock_held_by_current_thread());
+       while (!write_queue.empty()) {
+           write_data *wd = write_queue.front();
+           write_queue.pop();
+
+           uv_write_t *write = new uv_write_t();
+
+           write->data = wd;
+
+           uv_buf_t buf;
+           buf.base = (char*)wd->buf;
+           buf.len = wd->len;
+
+           uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb);
+       }
+    }
+
+    static void
+    write_cb(uv_write_t *handle, int status) {
+       write_data *wd = (write_data*)handle->data;
+       rust_uvtmp_thread *self = wd->cd->thread;
+       self->on_write(handle, wd);
+    }
+
+    void
+    on_write(uv_write_t *handle, write_data *wd) {
+       iomsg msg;
+       msg.tag = wrote_tag;
+       msg.val.wrote_val = wd->cd;
+
+       send(task, wd->chan, &msg);
+
+       delete [] wd->buf;
+       delete wd;
+       delete handle;
+    }
+
+    void
+    start_reads() {
+       assert (lock.lock_held_by_current_thread());
+       while (!read_start_queue.empty()) {
+           read_start_data *rd = read_start_queue.front();
+           read_start_queue.pop();
+
+           connect_data *cd = rd->cd;
+           cd->tcp.data = rd;
+
+           uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb);
+       }
+    }
+
+    static uv_buf_t
+    alloc_cb(uv_handle_t* handle, size_t size) {
+       uv_buf_t buf;
+       buf.base = new char[size];
+       buf.len = size;
+       return buf;
+    }
+
+    static void
+    read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) {
+       read_start_data *rd = (read_start_data*)handle->data;
+       rust_uvtmp_thread *self = rd->cd->thread;
+       self->on_read(rd, nread, buf);
+    }
+
+    void
+    on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) {
+       iomsg msg;
+       msg.tag = read_tag;
+       msg.val.read_val.cd = rd->cd;
+       msg.val.read_val.buf = (uint8_t*)buf.base;
+       msg.val.read_val.nread = nread;
+
+       send(task, rd->chan, &msg);
+       if (nread == -1) {
+           delete rd;
+       }
+    }
+
+    void
+    close_idle_if_stop() {
+       assert(lock.lock_held_by_current_thread());
+       if (stop_flag) {
+           uv_close((uv_handle_t*)&idle, NULL);
+       }
+    }
+
+};
+
+extern "C" rust_uvtmp_thread *
+rust_uvtmp_create_thread() {
+    rust_uvtmp_thread *thread = new rust_uvtmp_thread();
+    return thread;
+}
+
+extern "C" void
+rust_uvtmp_start_thread(rust_uvtmp_thread *thread) {
+    thread->start();    
+}
+
+extern "C" void
+rust_uvtmp_join_thread(rust_uvtmp_thread *thread) {
+    thread->stop();
+    thread->join();
+}
+
+extern "C" void
+rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) {
+    delete thread;
+}
+
+extern "C" void
+rust_uvtmp_connect(rust_uvtmp_thread *thread, char *ip, chan_handle *chan) {
+    thread->connect(ip, *chan);
+}
+
+extern "C" void
+rust_uvtmp_close_connection(rust_uvtmp_thread *thread, connect_data *cd) {
+  thread->close_connection(cd);
+}
+
+extern "C" void
+rust_uvtmp_write(rust_uvtmp_thread *thread, connect_data *cd,
+                uint8_t *buf, size_t len, chan_handle *chan) {
+    thread->write(cd, buf, len, *chan);
+}
+
+extern "C" void
+rust_uvtmp_read_start(rust_uvtmp_thread *thread, connect_data *cd,
+                     chan_handle *chan) {
+    thread->read_start(cd, *chan);
+}
+
+extern "C" void
+rust_uvtmp_delete_buf(uint8_t *buf) {
+    delete [] buf;
+}
index 745e4c6e34b76873d6eb14b2a9bd44013b626626..86f24a20f9620d78bc429043a64bcf63e1658d96 100644 (file)
@@ -88,3 +88,12 @@ rust_uv_unref
 rust_uv_idle_init
 rust_uv_idle_start
 rust_uv_size_of_idle_t
+rust_uvtmp_create_thread
+rust_uvtmp_start_thread
+rust_uvtmp_join_thread
+rust_uvtmp_delete_thread
+rust_uvtmp_connect
+rust_uvtmp_close_connection
+rust_uvtmp_write
+rust_uvtmp_read_start
+rust_uvtmp_delete_buf