optval: *mut libc::c_char,
optlen: *mut libc::c_int) -> libc::c_int;
+ pub fn SetEvent(hEvent: libc::HANDLE) -> libc::BOOL;
+ pub fn WaitForMultipleObjects(nCount: libc::DWORD,
+ lpHandles: *const libc::HANDLE,
+ bWaitAll: libc::BOOL,
+ dwMilliseconds: libc::DWORD) -> libc::DWORD;
+
pub fn CancelIo(hFile: libc::HANDLE) -> libc::BOOL;
pub fn CancelIoEx(hFile: libc::HANDLE,
lpOverlapped: libc::LPOVERLAPPED) -> libc::BOOL;
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
-use std::sync::atomics;
+use std::sync::atomic;
use super::{retry, keep_going};
use super::c;
listener: self,
reader: reader,
writer: writer,
- closed: atomics::AtomicBool::new(false),
+ closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
listener: self,
abort: try!(os::Event::new()),
accept: accept,
- closed: atomics::AtomicBool::new(false),
+ closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
listener: TcpListener,
reader: FileDesc,
writer: FileDesc,
- closed: atomics::AtomicBool,
+ closed: atomic::AtomicBool,
}
#[cfg(windows)]
listener: TcpListener,
abort: os::Event,
accept: os::Event,
- closed: atomics::AtomicBool,
+ closed: atomic::AtomicBool,
}
impl TcpAcceptor {
// self-pipe is never written to unless close_accept() is called.
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
- while !self.inner.closed.load(atomics::SeqCst) {
+ while !self.inner.closed.load(atomic::SeqCst) {
match retry(|| unsafe {
libc::accept(self.fd(), ptr::mut_null(), ptr::mut_null())
}) {
// stolen, so we do all of this in a loop as well.
let events = [self.inner.abort.handle(), self.inner.accept.handle()];
- while !self.inner.closed.load(atomics::SeqCst) {
+ while !self.inner.closed.load(atomic::SeqCst) {
let ms = if self.deadline == 0 {
c::WSA_INFINITE as u64
} else {
let now = ::io::timer::now();
- if self.deadline < now {0} else {now - self.deadline}
+ if self.deadline < now {0} else {self.deadline - now}
};
let ret = unsafe {
c::WSAWaitForMultipleEvents(2, events.as_ptr(), libc::FALSE,
c::WSA_WAIT_EVENT_0 => break,
n => assert_eq!(n, c::WSA_WAIT_EVENT_0 + 1),
}
- println!("woke up");
let mut wsaevents: c::WSANETWORKEVENTS = unsafe { mem::zeroed() };
let ret = unsafe {
} {
-1 if util::wouldblock() => {}
-1 => return Err(os::last_error()),
- fd => return Ok(TcpStream::new(Inner::new(fd))),
+
+ // Accepted sockets inherit the same properties as the caller,
+ // so we need to deregister our event and switch the socket back
+ // to blocking mode
+ fd => {
+ let stream = TcpStream::new(Inner::new(fd));
+ let ret = unsafe {
+ c::WSAEventSelect(fd, events[1], 0)
+ };
+ if ret != 0 { return Err(os::last_error()) }
+ try!(util::set_nonblocking(fd, false));
+ return Ok(stream)
+ }
}
}
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
- self.inner.closed.store(true, atomics::SeqCst);
+ self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
#[cfg(windows)]
fn close_accept(&mut self) -> IoResult<()> {
- self.inner.closed.store(true, atomics::SeqCst);
+ self.inner.closed.store(true, atomic::SeqCst);
let ret = unsafe { c::WSASetEvent(self.inner.abort.handle()) };
if ret == libc::TRUE {
Ok(())
use std::rt::mutex;
use std::rt::rtio;
use std::rt::rtio::{IoResult, IoError};
-use std::sync::atomics;
+use std::sync::atomic;
use super::retry;
use super::net;
listener: self,
reader: reader,
writer: writer,
- closed: atomics::AtomicBool::new(false),
+ closed: atomic::AtomicBool::new(false),
}),
deadline: 0,
})
listener: UnixListener,
reader: FileDesc,
writer: FileDesc,
- closed: atomics::AtomicBool,
+ closed: atomic::AtomicBool,
}
impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
- while !self.inner.closed.load(atomics::SeqCst) {
+ while !self.inner.closed.load(atomic::SeqCst) {
unsafe {
let mut storage: libc::sockaddr_storage = mem::zeroed();
let storagep = &mut storage as *mut libc::sockaddr_storage;
#[cfg(unix)]
fn close_accept(&mut self) -> IoResult<()> {
- self.inner.closed.store(true, atomics::SeqCst);
+ self.inner.closed.store(true, atomic::SeqCst);
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
match fd.inner_write([0]) {
Ok(..) => Ok(()),
}
pub fn await(handle: libc::HANDLE, deadline: u64,
- overlapped: &mut libc::OVERLAPPED) -> bool {
- if deadline == 0 { return true }
+ events: &[libc::HANDLE]) -> IoResult<uint> {
+ use libc::consts::os::extra::{WAIT_FAILED, WAIT_TIMEOUT, WAIT_OBJECT_0};
// If we've got a timeout, use WaitForSingleObject in tandem with CancelIo
// to figure out if we should indeed get the result.
- let now = ::io::timer::now();
- let timeout = deadline < now || unsafe {
- let ms = (deadline - now) as libc::DWORD;
- let r = libc::WaitForSingleObject(overlapped.hEvent,
- ms);
- r != libc::WAIT_OBJECT_0
- };
- if timeout {
- unsafe { let _ = c::CancelIo(handle); }
- false
+ let ms = if deadline == 0 {
+ libc::INFINITE as u64
} else {
- true
+ let now = ::io::timer::now();
+ if deadline < now {0} else {deadline - now}
+ };
+ let ret = unsafe {
+ c::WaitForMultipleObjects(events.len() as libc::DWORD,
+ events.as_ptr(),
+ libc::FALSE,
+ ms as libc::DWORD)
+ };
+ match ret {
+ WAIT_FAILED => Err(super::last_error()),
+ WAIT_TIMEOUT => unsafe {
+ let _ = c::CancelIo(handle);
+ Err(util::timeout("operation timed out"))
+ },
+ n => Ok((n - WAIT_OBJECT_0) as uint)
}
}
drop(guard);
loop {
// Process a timeout if one is pending
- let succeeded = await(self.handle(), self.read_deadline,
- &mut overlapped);
+ let wait_succeeded = await(self.handle(), self.read_deadline,
+ [overlapped.hEvent]);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
// If the reading half is now closed, then we're done. If we woke up
// because the writing half was closed, keep trying.
- if !succeeded {
+ if wait_succeeded.is_err() {
return Err(util::timeout("read timed out"))
}
if self.read_closed() {
})
}
// Process a timeout if one is pending
- let succeeded = await(self.handle(), self.write_deadline,
- &mut overlapped);
+ let wait_succeeded = await(self.handle(), self.write_deadline,
+ [overlapped.hEvent]);
let ret = unsafe {
libc::GetOverlappedResult(self.handle(),
&mut overlapped,
if os::errno() != libc::ERROR_OPERATION_ABORTED as uint {
return Err(super::last_error())
}
- if !succeeded {
+ if !wait_succeeded.is_ok() {
let amt = offset + bytes_written as uint;
return if amt > 0 {
Err(IoError {
listener: self,
event: try!(Event::new(true, false)),
deadline: 0,
+ inner: Arc::new(AcceptorState {
+ abort: try!(Event::new(true, false)),
+ closed: atomic::AtomicBool::new(false),
+ }),
})
}
}
}
pub struct UnixAcceptor {
+ inner: Arc<AcceptorState>,
listener: UnixListener,
event: Event,
deadline: u64,
}
+struct AcceptorState {
+ abort: Event,
+ closed: atomic::AtomicBool,
+}
+
impl UnixAcceptor {
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
// This function has some funky implementation details when working with
// using the original server pipe.
let handle = self.listener.handle;
+ // If we've had an artifical call to close_accept, be sure to never
+ // proceed in accepting new clients in the future
+ if self.inner.closed.load(atomic::SeqCst) { return Err(util::eof()) }
+
let name = try!(to_utf16(&self.listener.name));
// Once we've got a "server handle", we need to wait for a client to
if err == libc::ERROR_IO_PENDING as libc::DWORD {
// Process a timeout if one is pending
- let _ = await(handle, self.deadline, &mut overlapped);
+ let wait_succeeded = await(handle, self.deadline,
+ [self.inner.abort.handle(),
+ overlapped.hEvent]);
// This will block until the overlapped I/O is completed. The
// timeout was previously handled, so this will either block in
libc::TRUE)
};
if ret == 0 {
- err = unsafe { libc::GetLastError() };
+ if wait_succeeded.is_ok() {
+ err = unsafe { libc::GetLastError() };
+ } else {
+ return Err(util::timeout("accept timed out"))
+ }
} else {
// we succeeded, bypass the check below
err = libc::ERROR_PIPE_CONNECTED as libc::DWORD;
}
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
- fail!()
+ let name = to_utf16(&self.listener.name).ok().unwrap();
+ box UnixAcceptor {
+ inner: self.inner.clone(),
+ event: Event::new(true, false).ok().unwrap(),
+ deadline: 0,
+ listener: UnixListener {
+ name: self.listener.name.clone(),
+ handle: unsafe {
+ let p = pipe(name.as_ptr(), false) ;
+ assert!(p != libc::INVALID_HANDLE_VALUE as libc::HANDLE);
+ p
+ },
+ },
+ } as Box<rtio::RtioUnixAcceptor + Send>
}
fn close_accept(&mut self) -> IoResult<()> {
- fail!()
+ self.inner.closed.store(true, atomic::SeqCst);
+ let ret = unsafe {
+ c::SetEvent(self.inner.abort.handle())
+ };
+ if ret == 0 {
+ Err(super::last_error())
+ } else {
+ Ok(())
+ }
}
}
c::fd_set(&mut set, fd);
max = cmp::max(max, fd + 1);
}
+ if cfg!(windows) {
+ max = fds.len() as net::sock_t;
+ }
let (read, write) = match status {
Readable => (&mut set as *mut _, ptr::mut_null()),
}
impl rtio::RtioTcpListener for TcpListener {
- fn listen(self: Box<TcpListener>)
+ fn listen(mut self: Box<TcpListener>)
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
let _m = self.fire_homing_missile();
}
impl rtio::RtioUnixListener for PipeListener {
- fn listen(self: Box<PipeListener>)
+ fn listen(mut self: Box<PipeListener>)
-> IoResult<Box<rtio::RtioUnixAcceptor + Send>> {
let _m = self.fire_homing_missile();
///
/// ```
/// # #![allow(experimental)]
- /// use std::io::TcpListener;
- /// use std::io::{Listener, Acceptor, TimedOut};
+ /// use std::io::{TcpListener, Listener, Acceptor, EndOfFile};
///
/// let mut a = TcpListener::bind("127.0.0.1", 8482).listen().unwrap();
/// let a2 = a.clone();
rx2.recv();
})
+ #[cfg(not(windows))]
iotest!(fn clone_accept_smoke() {
let addr = next_test_unix();
let l = UnixListener::bind(&addr);
});
assert!(a.accept().is_ok());
+ drop(a);
assert!(a2.accept().is_ok());
})
extern crate native;
use std::io::{TcpListener, Listener, Acceptor, EndOfFile, TcpStream};
-use std::sync::{atomics, Arc};
+use std::sync::{atomic, Arc};
use std::task::TaskBuilder;
use native::NativeTaskBuilder;
let mut l = TcpListener::bind("127.0.0.1", 0).unwrap();
let addr = l.socket_name().unwrap();
let mut a = l.listen().unwrap();
- let cnt = Arc::new(atomics::AtomicUint::new(0));
+ let cnt = Arc::new(atomic::AtomicUint::new(0));
let (tx, rx) = channel();
for _ in range(0, N) {
match a.accept() {
Ok(..) => {
mycnt += 1;
- if cnt.fetch_add(1, atomics::SeqCst) == N * M - 1 {
+ if cnt.fetch_add(1, atomic::SeqCst) == N * M - 1 {
break
}
}
assert_eq!(rx.iter().take(N - 1).count(), N - 1);
// Everything should have been accepted.
- assert_eq!(cnt.load(atomics::SeqCst), N * M);
+ assert_eq!(cnt.load(atomic::SeqCst), N * M);
}