BufWriter { inner: Some(inner), buf: Vec::with_capacity(capacity), panicked: false }
}
+ /// Send data in our local buffer into the inner writer, looping as
+ /// necessary until either it's all been sent or an error occurs.
+ ///
+ /// Because all the data in the buffer has been reported to our owner as
+ /// "successfully written" (by returning nonzero success values from
+ /// `write`), any 0-length writes from `inner` must be reported as i/o
+ /// errors from this method.
fn flush_buf(&mut self) -> io::Result<()> {
- let mut written = 0;
- let len = self.buf.len();
- let mut ret = Ok(());
- while written < len {
+ /// Helper struct to ensure the buffer is updated after all the writes
+ /// are complete. It tracks the number of written bytes and drains them
+ /// all from the front of the buffer when dropped.
+ struct BufGuard<'a> {
+ buffer: &'a mut Vec<u8>,
+ written: usize,
+ }
+
+ impl<'a> BufGuard<'a> {
+ fn new(buffer: &'a mut Vec<u8>) -> Self {
+ Self { buffer, written: 0 }
+ }
+
+ /// The unwritten part of the buffer
+ fn remaining(&self) -> &[u8] {
+ &self.buffer[self.written..]
+ }
+
+ /// Flag some bytes as removed from the front of the buffer
+ fn consume(&mut self, amt: usize) {
+ self.written += amt;
+ }
+
+ /// true if all of the bytes have been written
+ fn done(&self) -> bool {
+ self.written >= self.buffer.len()
+ }
+ }
+
+ impl Drop for BufGuard<'_> {
+ fn drop(&mut self) {
+ if self.written > 0 {
+ self.buffer.drain(..self.written);
+ }
+ }
+ }
+
+ let mut guard = BufGuard::new(&mut self.buf);
+ let inner = self.inner.as_mut().unwrap();
+ while !guard.done() {
self.panicked = true;
- let r = self.inner.as_mut().unwrap().write(&self.buf[written..]);
+ let r = inner.write(guard.remaining());
self.panicked = false;
match r {
Ok(0) => {
- ret =
- Err(Error::new(ErrorKind::WriteZero, "failed to write the buffered data"));
- break;
+ return Err(Error::new(
+ ErrorKind::WriteZero,
+ "failed to write the buffered data",
+ ));
}
- Ok(n) => written += n,
+ Ok(n) => guard.consume(n),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
- Err(e) => {
- ret = Err(e);
- break;
- }
+ Err(e) => return Err(e),
}
}
- if written > 0 {
- self.buf.drain(..written);
- }
- ret
+ Ok(())
+ }
+
+ /// Buffer some data without flushing it, regardless of the size of the
+ /// data. Writes as much as possible without exceeding capacity. Returns
+ /// the number of bytes written.
+ fn write_to_buf(&mut self, buf: &[u8]) -> usize {
+ let available = self.buf.capacity() - self.buf.len();
+ let amt_to_buffer = available.min(buf.len());
+ self.buf.extend_from_slice(&buf[..amt_to_buffer]);
+ amt_to_buffer
}
/// Gets a reference to the underlying writer.
if self.buf.len() + buf.len() > self.buf.capacity() {
self.flush_buf()?;
}
+ // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
if buf.len() >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write(buf);
self.panicked = false;
r
} else {
- self.buf.write(buf)
+ self.buf.extend_from_slice(buf);
+ Ok(buf.len())
+ }
+ }
+
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ // Normally, `write_all` just calls `write` in a loop. We can do better
+ // by calling `self.get_mut().write_all()` directly, which avoids
+ // round trips through the buffer in the event of a series of partial
+ // writes in some circumstances.
+ if self.buf.len() + buf.len() > self.buf.capacity() {
+ self.flush_buf()?;
+ }
+ // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
+ if buf.len() >= self.buf.capacity() {
+ self.panicked = true;
+ let r = self.get_mut().write_all(buf);
+ self.panicked = false;
+ r
+ } else {
+ self.buf.extend_from_slice(buf);
+ Ok(())
}
}
if self.buf.len() + total_len > self.buf.capacity() {
self.flush_buf()?;
}
+ // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919
if total_len >= self.buf.capacity() {
self.panicked = true;
let r = self.get_mut().write_vectored(bufs);
self.panicked = false;
r
} else {
- self.buf.write_vectored(bufs)
+ bufs.iter().for_each(|b| self.buf.extend_from_slice(b));
+ Ok(total_len)
}
}
///
/// Seeking always writes out the internal buffer before seeking.
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
- self.flush_buf().and_then(|_| self.get_mut().seek(pos))
+ self.flush_buf()?;
+ self.get_mut().seek(pos)
}
}
}
}
+/// Private helper struct for implementing the line-buffered writing logic.
+/// This shim temporarily wraps a BufWriter, and uses its internals to
+/// implement a line-buffered writer (specifically by using the internal
+/// methods like write_to_buf and flush_buf). In this way, a more
+/// efficient abstraction can be created than one that only had access to
+/// `write` and `flush`, without needlessly duplicating a lot of the
+/// implementation details of BufWriter. This also allows existing
+/// `BufWriters` to be temporarily given line-buffering logic; this is what
+/// enables Stdout to be alternately in line-buffered or block-buffered mode.
+#[derive(Debug)]
+pub(super) struct LineWriterShim<'a, W: Write> {
+ buffer: &'a mut BufWriter<W>,
+}
+
+impl<'a, W: Write> LineWriterShim<'a, W> {
+ pub fn new(buffer: &'a mut BufWriter<W>) -> Self {
+ Self { buffer }
+ }
+
+ /// Get a mutable reference to the inner writer (that is, the writer
+ /// wrapped by the BufWriter). Be careful with this writer, as writes to
+ /// it will bypass the buffer.
+ fn inner_mut(&mut self) -> &mut W {
+ self.buffer.get_mut()
+ }
+
+ /// Get the content currently buffered in self.buffer
+ fn buffered(&self) -> &[u8] {
+ self.buffer.buffer()
+ }
+
+ /// Flush the buffer iff the last byte is a newline (indicating that an
+ /// earlier write only succeeded partially, and we want to retry flushing
+ /// the buffered line before continuing with a subsequent write)
+ fn flush_if_completed_line(&mut self) -> io::Result<()> {
+ match self.buffered().last().copied() {
+ Some(b'\n') => self.buffer.flush_buf(),
+ _ => Ok(()),
+ }
+ }
+}
+
+impl<'a, W: Write> Write for LineWriterShim<'a, W> {
+ /// Write some data into this BufReader with line buffering. This means
+ /// that, if any newlines are present in the data, the data up to the last
+ /// newline is sent directly to the underlying writer, and data after it
+ /// is buffered. Returns the number of bytes written.
+ ///
+ /// This function operates on a "best effort basis"; in keeping with the
+ /// convention of `Write::write`, it makes at most one attempt to write
+ /// new data to the underlying writer. If that write only reports a partial
+ /// success, the remaining data will be buffered.
+ ///
+ /// Because this function attempts to send completed lines to the underlying
+ /// writer, it will also flush the existing buffer if it ends with a
+ /// newline, even if the incoming data does not contain any newlines.
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ let newline_idx = match memchr::memrchr(b'\n', buf) {
+ // If there are no new newlines (that is, if this write is less than
+ // one line), just do a regular buffered write (which may flush if
+ // we exceed the inner buffer's size)
+ None => {
+ self.flush_if_completed_line()?;
+ return self.buffer.write(buf);
+ }
+ // Otherwise, arrange for the lines to be written directly to the
+ // inner writer.
+ Some(newline_idx) => newline_idx + 1,
+ };
+
+ // Flush existing content to prepare for our write. We have to do this
+ // before attempting to write `buf` in order to maintain consistency;
+ // if we add `buf` to the buffer then try to flush it all at once,
+ // we're obligated to return Ok(), which would mean suppressing any
+ // errors that occur during flush.
+ self.buffer.flush_buf()?;
+
+ // This is what we're going to try to write directly to the inner
+ // writer. The rest will be buffered, if nothing goes wrong.
+ let lines = &buf[..newline_idx];
+
+ // Write `lines` directly to the inner writer. In keeping with the
+ // `write` convention, make at most one attempt to add new (unbuffered)
+ // data. Because this write doesn't touch the BufWriter state directly,
+ // and the buffer is known to be empty, we don't need to worry about
+ // self.buffer.panicked here.
+ let flushed = self.inner_mut().write(lines)?;
+
+ // If buffer returns Ok(0), propagate that to the caller without
+ // doing additional buffering; otherwise we're just guaranteeing
+ // an "ErrorKind::WriteZero" later.
+ if flushed == 0 {
+ return Ok(0);
+ }
+
+ // Now that the write has succeeded, buffer the rest (or as much of
+ // the rest as possible). If there were any unwritten newlines, we
+ // only buffer out to the last unwritten newline that fits in the
+ // buffer; this helps prevent flushing partial lines on subsequent
+ // calls to LineWriterShim::write.
+
+ // Handle the cases in order of most-common to least-common, under
+ // the presumption that most writes succeed in totality, and that most
+ // writes are smaller than the buffer.
+ // - Is this a partial line (ie, no newlines left in the unwritten tail)
+ // - If not, does the data out to the last unwritten newline fit in
+ // the buffer?
+ // - If not, scan for the last newline that *does* fit in the buffer
+ let tail = if flushed >= newline_idx {
+ &buf[flushed..]
+ } else if newline_idx - flushed <= self.buffer.capacity() {
+ &buf[flushed..newline_idx]
+ } else {
+ let scan_area = &buf[flushed..];
+ let scan_area = &scan_area[..self.buffer.capacity()];
+ match memchr::memrchr(b'\n', scan_area) {
+ Some(newline_idx) => &scan_area[..newline_idx + 1],
+ None => scan_area,
+ }
+ };
+
+ let buffered = self.buffer.write_to_buf(tail);
+ Ok(flushed + buffered)
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.buffer.flush()
+ }
+
+ /// Write some vectored data into this BufReader with line buffering. This
+ /// means that, if any newlines are present in the data, the data up to
+ /// and including the buffer containing the last newline is sent directly
+ /// to the inner writer, and the data after it is buffered. Returns the
+ /// number of bytes written.
+ ///
+ /// This function operates on a "best effort basis"; in keeping with the
+ /// convention of `Write::write`, it makes at most one attempt to write
+ /// new data to the underlying writer.
+ ///
+ /// Because this function attempts to send completed lines to the underlying
+ /// writer, it will also flush the existing buffer if it contains any
+ /// newlines.
+ ///
+ /// Because sorting through an array of `IoSlice` can be a bit convoluted,
+ /// This method differs from write in the following ways:
+ ///
+ /// - It attempts to write the full content of all the buffers up to and
+ /// including the one containing the last newline. This means that it
+ /// may attempt to write a partial line, that buffer has data past the
+ /// newline.
+ /// - If the write only reports partial success, it does not attempt to
+ /// find the precise location of the written bytes and buffer the rest.
+ ///
+ /// If the underlying vector doesn't support vectored writing, we instead
+ /// simply write the first non-empty buffer with `write`. This way, we
+ /// get the benefits of more granular partial-line handling without losing
+ /// anything in efficiency
+ fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
+ // If there's no specialized behavior for write_vectored, just use
+ // write. This has the benefit of more granular partial-line handling.
+ if !self.is_write_vectored() {
+ return match bufs.iter().find(|buf| !buf.is_empty()) {
+ Some(buf) => self.write(buf),
+ None => Ok(0),
+ };
+ }
+
+ // Find the buffer containing the last newline
+ let last_newline_buf_idx = bufs
+ .iter()
+ .enumerate()
+ .rev()
+ .find_map(|(i, buf)| memchr::memchr(b'\n', buf).map(|_| i));
+
+ // If there are no new newlines (that is, if this write is less than
+ // one line), just do a regular buffered write
+ let last_newline_buf_idx = match last_newline_buf_idx {
+ // No newlines; just do a normal buffered write
+ None => {
+ self.flush_if_completed_line()?;
+ return self.buffer.write_vectored(bufs);
+ }
+ Some(i) => i,
+ };
+
+ // Flush existing content to prepare for our write
+ self.buffer.flush_buf()?;
+
+ // This is what we're going to try to write directly to the inner
+ // writer. The rest will be buffered, if nothing goes wrong.
+ let (lines, tail) = bufs.split_at(last_newline_buf_idx + 1);
+
+ // Write `lines` directly to the inner writer. In keeping with the
+ // `write` convention, make at most one attempt to add new (unbuffered)
+ // data. Because this write doesn't touch the BufWriter state directly,
+ // and the buffer is known to be empty, we don't need to worry about
+ // self.panicked here.
+ let flushed = self.inner_mut().write_vectored(lines)?;
+
+ // If inner returns Ok(0), propagate that to the caller without
+ // doing additional buffering; otherwise we're just guaranteeing
+ // an "ErrorKind::WriteZero" later.
+ if flushed == 0 {
+ return Ok(0);
+ }
+
+ // Don't try to reconstruct the exact amount written; just bail
+ // in the event of a partial write
+ let lines_len = lines.iter().map(|buf| buf.len()).sum();
+ if flushed < lines_len {
+ return Ok(flushed);
+ }
+
+ // Now that the write has succeeded, buffer the rest (or as much of the
+ // rest as possible)
+ let buffered: usize = tail
+ .iter()
+ .filter(|buf| !buf.is_empty())
+ .map(|buf| self.buffer.write_to_buf(buf))
+ .take_while(|&n| n > 0)
+ .sum();
+
+ Ok(flushed + buffered)
+ }
+
+ fn is_write_vectored(&self) -> bool {
+ self.buffer.is_write_vectored()
+ }
+
+ /// Write some data into this BufReader with line buffering. This means
+ /// that, if any newlines are present in the data, the data up to the last
+ /// newline is sent directly to the underlying writer, and data after it
+ /// is buffered.
+ ///
+ /// Because this function attempts to send completed lines to the underlying
+ /// writer, it will also flush the existing buffer if it contains any
+ /// newlines, even if the incoming data does not contain any newlines.
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ match memchr::memrchr(b'\n', buf) {
+ // If there are no new newlines (that is, if this write is less than
+ // one line), just do a regular buffered write (which may flush if
+ // we exceed the inner buffer's size)
+ None => {
+ self.flush_if_completed_line()?;
+ self.buffer.write_all(buf)
+ }
+ Some(newline_idx) => {
+ let (lines, tail) = buf.split_at(newline_idx + 1);
+
+ if self.buffered().is_empty() {
+ self.inner_mut().write_all(lines)?;
+ } else {
+ // If there is any buffered data, we add the incoming lines
+ // to that buffer before flushing, which saves us at least
+ // one write call. We can't really do this with `write`,
+ // since we can't do this *and* not suppress errors *and*
+ // report a consistent state to the caller in a return
+ // value, but here in write_all it's fine.
+ self.buffer.write_all(lines)?;
+ self.buffer.flush_buf()?;
+ }
+
+ self.buffer.write_all(tail)
+ }
+ }
+ }
+}
+
/// Wraps a writer and buffers output to it, flushing whenever a newline
/// (`0x0a`, `'\n'`) is detected.
///
#[stable(feature = "rust1", since = "1.0.0")]
pub struct LineWriter<W: Write> {
inner: BufWriter<W>,
- need_flush: bool,
}
impl<W: Write> LineWriter<W> {
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
- LineWriter { inner: BufWriter::with_capacity(capacity, inner), need_flush: false }
+ LineWriter { inner: BufWriter::with_capacity(capacity, inner) }
}
/// Gets a reference to the underlying writer.
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn into_inner(self) -> Result<W, IntoInnerError<LineWriter<W>>> {
- self.inner.into_inner().map_err(|IntoInnerError(buf, e)| {
- IntoInnerError(LineWriter { inner: buf, need_flush: false }, e)
- })
+ self.inner
+ .into_inner()
+ .map_err(|IntoInnerError(buf, e)| IntoInnerError(LineWriter { inner: buf }, e))
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl<W: Write> Write for LineWriter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if self.need_flush {
- self.flush()?;
- }
-
- // Find the last newline character in the buffer provided. If found then
- // we're going to write all the data up to that point and then flush,
- // otherwise we just write the whole block to the underlying writer.
- let i = match memchr::memrchr(b'\n', buf) {
- Some(i) => i,
- None => return self.inner.write(buf),
- };
-
- // Ok, we're going to write a partial amount of the data given first
- // followed by flushing the newline. After we've successfully written
- // some data then we *must* report that we wrote that data, so future
- // errors are ignored. We set our internal `need_flush` flag, though, in
- // case flushing fails and we need to try it first next time.
- let n = self.inner.write(&buf[..=i])?;
- self.need_flush = true;
- if self.flush().is_err() || n != i + 1 {
- return Ok(n);
- }
+ LineWriterShim::new(&mut self.inner).write(buf)
+ }
- // At this point we successfully wrote `i + 1` bytes and flushed it out,
- // meaning that the entire line is now flushed out on the screen. While
- // we can attempt to finish writing the rest of the data provided.
- // Remember though that we ignore errors here as we've successfully
- // written data, so we need to report that.
- match self.inner.write(&buf[i + 1..]) {
- Ok(i) => Ok(n + i),
- Err(_) => Ok(n),
- }
+ fn flush(&mut self) -> io::Result<()> {
+ self.inner.flush()
}
- // Vectored writes are very similar to the writes above, but adjusted for
- // the list of buffers that we have to write.
fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
- if self.need_flush {
- self.flush()?;
- }
+ LineWriterShim::new(&mut self.inner).write_vectored(bufs)
+ }
- // Find the last newline, and failing that write the whole buffer
- let last_newline = bufs.iter().enumerate().rev().find_map(|(i, buf)| {
- let pos = memchr::memrchr(b'\n', buf)?;
- Some((i, pos))
- });
- let (i, j) = match last_newline {
- Some(pair) => pair,
- None => return self.inner.write_vectored(bufs),
- };
- let (prefix, suffix) = bufs.split_at(i);
- let (buf, suffix) = suffix.split_at(1);
- let buf = &buf[0];
-
- // Write everything up to the last newline, flushing afterwards. Note
- // that only if we finished our entire `write_vectored` do we try the
- // subsequent
- // `write`
- let mut n = 0;
- let prefix_amt = prefix.iter().map(|i| i.len()).sum();
- if prefix_amt > 0 {
- n += self.inner.write_vectored(prefix)?;
- self.need_flush = true;
- }
- if n == prefix_amt {
- match self.inner.write(&buf[..=j]) {
- Ok(m) => n += m,
- Err(e) if n == 0 => return Err(e),
- Err(_) => return Ok(n),
- }
- self.need_flush = true;
- }
- if self.flush().is_err() || n != j + 1 + prefix_amt {
- return Ok(n);
- }
+ fn is_write_vectored(&self) -> bool {
+ self.inner.is_write_vectored()
+ }
- // ... and now write out everything remaining
- match self.inner.write(&buf[j + 1..]) {
- Ok(i) => n += i,
- Err(_) => return Ok(n),
- }
+ fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
+ LineWriterShim::new(&mut self.inner).write_all(buf)
+ }
- if suffix.iter().map(|s| s.len()).sum::<usize>() == 0 {
- return Ok(n);
- }
- match self.inner.write_vectored(suffix) {
- Ok(i) => Ok(n + i),
- Err(_) => Ok(n),
- }
+ fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> io::Result<()> {
+ LineWriterShim::new(&mut self.inner).write_all_vectored(bufs)
}
- fn flush(&mut self) -> io::Result<()> {
- self.inner.flush()?;
- self.need_flush = false;
- Ok(())
+ fn write_fmt(&mut self, fmt: fmt::Arguments<'_>) -> io::Result<()> {
+ LineWriterShim::new(&mut self.inner).write_fmt(fmt)
}
}
#[cfg(test)]
mod tests {
use crate::io::prelude::*;
- use crate::io::{self, BufReader, BufWriter, IoSlice, LineWriter, SeekFrom};
+ use crate::io::{self, BufReader, BufWriter, ErrorKind, IoSlice, LineWriter, SeekFrom};
use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::thread;
lengths: Vec<usize>,
}
+ // FIXME: rustfmt and tidy disagree about the correct formatting of this
+ // function. This leads to issues for users with editors configured to
+ // rustfmt-on-save.
impl Read for ShortReader {
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) }
assert_eq!(v, []);
}
- #[test]
- fn test_line_buffer_fail_flush() {
- // Issue #32085
- struct FailFlushWriter<'a>(&'a mut Vec<u8>);
-
- impl Write for FailFlushWriter<'_> {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- self.0.extend_from_slice(buf);
- Ok(buf.len())
- }
- fn flush(&mut self) -> io::Result<()> {
- Err(io::Error::new(io::ErrorKind::Other, "flush failed"))
- }
- }
-
- let mut buf = Vec::new();
- {
- let mut writer = LineWriter::new(FailFlushWriter(&mut buf));
- let to_write = b"abc\ndef";
- if let Ok(written) = writer.write(to_write) {
- assert!(written < to_write.len(), "didn't flush on new line");
- // PASS
- return;
- }
- }
- assert!(buf.is_empty(), "write returned an error but wrote data");
- }
-
#[test]
fn test_line_buffer() {
let mut writer = LineWriter::new(Vec::new());
b.iter(|| BufWriter::new(io::sink()));
}
- struct AcceptOneThenFail {
- written: bool,
- flushed: bool,
+ /// A simple `Write` target, designed to be wrapped by `LineWriter` /
+ /// `BufWriter` / etc, that can have its `write` & `flush` behavior
+ /// configured
+ #[derive(Default, Clone)]
+ struct ProgrammableSink {
+ // Writes append to this slice
+ pub buffer: Vec<u8>,
+
+ // Flush sets this flag
+ pub flushed: bool,
+
+ // If true, writes will always be an error
+ pub always_write_error: bool,
+
+ // If true, flushes will always be an error
+ pub always_flush_error: bool,
+
+ // If set, only up to this number of bytes will be written in a single
+ // call to `write`
+ pub accept_prefix: Option<usize>,
+
+ // If set, counts down with each write, and writes return an error
+ // when it hits 0
+ pub max_writes: Option<usize>,
+
+ // If set, attempting to write when max_writes == Some(0) will be an
+ // error; otherwise, it will return Ok(0).
+ pub error_after_max_writes: bool,
}
- impl Write for AcceptOneThenFail {
+ impl Write for ProgrammableSink {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
- if !self.written {
- assert_eq!(data, b"a\nb\n");
- self.written = true;
- Ok(data.len())
- } else {
- Err(io::Error::new(io::ErrorKind::NotFound, "test"))
+ if self.always_write_error {
+ return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error"));
}
+
+ match self.max_writes {
+ Some(0) if self.error_after_max_writes => {
+ return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes"));
+ }
+ Some(0) => return Ok(0),
+ Some(ref mut count) => *count -= 1,
+ None => {}
+ }
+
+ let len = match self.accept_prefix {
+ None => data.len(),
+ Some(prefix) => data.len().min(prefix),
+ };
+
+ let data = &data[..len];
+ self.buffer.extend_from_slice(data);
+
+ Ok(len)
}
fn flush(&mut self) -> io::Result<()> {
- assert!(self.written);
- assert!(!self.flushed);
- self.flushed = true;
- Err(io::Error::new(io::ErrorKind::Other, "test"))
+ if self.always_flush_error {
+ Err(io::Error::new(io::ErrorKind::Other, "test - always_flush_error"))
+ } else {
+ self.flushed = true;
+ Ok(())
+ }
}
}
+ /// Previously the `LineWriter` could successfully write some bytes but
+ /// then fail to report that it has done so. Additionally, an erroneous
+ /// flush after a successful write was permanently ignored.
+ ///
+ /// Test that a line writer correctly reports the number of written bytes,
+ /// and that it attempts to flush buffered lines from previous writes
+ /// before processing new data
+ ///
+ /// Regression test for #37807
#[test]
fn erroneous_flush_retried() {
- let a = AcceptOneThenFail { written: false, flushed: false };
+ let writer = ProgrammableSink {
+ // Only write up to 4 bytes at a time
+ accept_prefix: Some(4),
- let mut l = LineWriter::new(a);
- assert_eq!(l.write(b"a\nb\na").unwrap(), 4);
- assert!(l.get_ref().written);
- assert!(l.get_ref().flushed);
- l.get_mut().flushed = false;
+ // Accept the first two writes, then error the others
+ max_writes: Some(2),
+ error_after_max_writes: true,
+
+ ..Default::default()
+ };
- assert_eq!(l.write(b"a").unwrap_err().kind(), io::ErrorKind::Other)
+ // This should write the first 4 bytes. The rest will be buffered, out
+ // to the last newline.
+ let mut writer = LineWriter::new(writer);
+ assert_eq!(writer.write(b"a\nb\nc\nd\ne").unwrap(), 8);
+
+ // This write should attempt to flush "c\nd\n", then buffer "e". No
+ // errors should happen here because no further writes should be
+ // attempted against `writer`.
+ assert_eq!(writer.write(b"e").unwrap(), 1);
+ assert_eq!(&writer.get_ref().buffer, b"a\nb\nc\nd\n");
}
#[test]
0,
);
assert_eq!(a.write_vectored(&[IoSlice::new(b"a\nb"),]).unwrap(), 3);
- assert_eq!(a.get_ref(), b"\nabaca\n");
+ assert_eq!(a.get_ref(), b"\nabaca\nb");
}
#[test]
fn line_vectored_partial_and_errors() {
+ use crate::collections::VecDeque;
+
enum Call {
Write { inputs: Vec<&'static [u8]>, output: io::Result<usize> },
Flush { output: io::Result<()> },
}
+
+ #[derive(Default)]
struct Writer {
- calls: Vec<Call>,
+ calls: VecDeque<Call>,
}
impl Write for Writer {
}
fn write_vectored(&mut self, buf: &[IoSlice<'_>]) -> io::Result<usize> {
- match self.calls.pop().unwrap() {
+ match self.calls.pop_front().expect("unexpected call to write") {
Call::Write { inputs, output } => {
assert_eq!(inputs, buf.iter().map(|b| &**b).collect::<Vec<_>>());
output
}
- _ => panic!("unexpected call to write"),
+ Call::Flush { .. } => panic!("unexpected call to write; expected a flush"),
}
}
+ fn is_write_vectored(&self) -> bool {
+ true
+ }
+
fn flush(&mut self) -> io::Result<()> {
- match self.calls.pop().unwrap() {
+ match self.calls.pop_front().expect("Unexpected call to flush") {
Call::Flush { output } => output,
- _ => panic!("unexpected call to flush"),
+ Call::Write { .. } => panic!("unexpected call to flush; expected a write"),
}
}
}
}
// partial writes keep going
- let mut a = LineWriter::new(Writer { calls: Vec::new() });
+ let mut a = LineWriter::new(Writer::default());
a.write_vectored(&[IoSlice::new(&[]), IoSlice::new(b"abc")]).unwrap();
- a.get_mut().calls.push(Call::Flush { output: Ok(()) });
- a.get_mut().calls.push(Call::Write { inputs: vec![b"bcx\n"], output: Ok(4) });
- a.get_mut().calls.push(Call::Write { inputs: vec![b"abcx\n"], output: Ok(1) });
+
+ a.get_mut().calls.push_back(Call::Write { inputs: vec![b"abc"], output: Ok(1) });
+ a.get_mut().calls.push_back(Call::Write { inputs: vec![b"bc"], output: Ok(2) });
+ a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\n"], output: Ok(2) });
+
a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\n")]).unwrap();
- a.get_mut().calls.push(Call::Flush { output: Ok(()) });
+
+ a.get_mut().calls.push_back(Call::Flush { output: Ok(()) });
a.flush().unwrap();
// erroneous writes stop and don't write more
- a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Err(err()) });
- assert_eq!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).unwrap(), 2);
- a.get_mut().calls.push(Call::Flush { output: Ok(()) });
- a.get_mut().calls.push(Call::Write { inputs: vec![b"x\n"], output: Ok(2) });
+ a.get_mut().calls.push_back(Call::Write { inputs: vec![b"x", b"\na"], output: Err(err()) });
+ a.get_mut().calls.push_back(Call::Flush { output: Ok(()) });
+ assert!(a.write_vectored(&[IoSlice::new(b"x"), IoSlice::new(b"\na")]).is_err());
a.flush().unwrap();
fn err() -> io::Error {
io::Error::new(io::ErrorKind::Other, "x")
}
}
+
+ /// Test that, in cases where vectored writing is not enabled, the
+ /// LineWriter uses the normal `write` call, which more-correctly handles
+ /// partial lines
+ #[test]
+ fn line_vectored_ignored() {
+ let writer = ProgrammableSink::default();
+ let mut writer = LineWriter::new(writer);
+
+ let content = [
+ IoSlice::new(&[]),
+ IoSlice::new(b"Line 1\nLine"),
+ IoSlice::new(b" 2\nLine 3\nL"),
+ IoSlice::new(&[]),
+ IoSlice::new(&[]),
+ IoSlice::new(b"ine 4"),
+ IoSlice::new(b"\nLine 5\n"),
+ ];
+
+ let count = writer.write_vectored(&content).unwrap();
+ assert_eq!(count, 11);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n");
+
+ let count = writer.write_vectored(&content[2..]).unwrap();
+ assert_eq!(count, 11);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n");
+
+ let count = writer.write_vectored(&content[5..]).unwrap();
+ assert_eq!(count, 5);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n");
+
+ let count = writer.write_vectored(&content[6..]).unwrap();
+ assert_eq!(count, 8);
+ assert_eq!(
+ writer.get_ref().buffer.as_slice(),
+ b"Line 1\nLine 2\nLine 3\nLine 4\nLine 5\n".as_ref()
+ );
+ }
+
+ /// Test that, given this input:
+ ///
+ /// Line 1\n
+ /// Line 2\n
+ /// Line 3\n
+ /// Line 4
+ ///
+ /// And given a result that only writes to midway through Line 2
+ ///
+ /// That only up to the end of Line 3 is buffered
+ ///
+ /// This behavior is desirable because it prevents flushing partial lines
+ #[test]
+ fn partial_write_buffers_line() {
+ let writer = ProgrammableSink { accept_prefix: Some(13), ..Default::default() };
+ let mut writer = LineWriter::new(writer);
+
+ assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3\nLine4").unwrap(), 21);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2");
+
+ assert_eq!(writer.write(b"Line 4").unwrap(), 6);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\n");
+ }
+
+ /// Test that, given this input:
+ ///
+ /// Line 1\n
+ /// Line 2\n
+ /// Line 3
+ ///
+ /// And given that the full write of lines 1 and 2 was successful
+ /// That data up to Line 3 is buffered
+ #[test]
+ fn partial_line_buffered_after_line_write() {
+ let writer = ProgrammableSink::default();
+ let mut writer = LineWriter::new(writer);
+
+ assert_eq!(writer.write(b"Line 1\nLine 2\nLine 3").unwrap(), 20);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\n");
+
+ assert!(writer.flush().is_ok());
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3");
+ }
+
+ /// Test that, given a partial line that exceeds the length of
+ /// LineBuffer's buffer (that is, without a trailing newline), that that
+ /// line is written to the inner writer
+ #[test]
+ fn long_line_flushed() {
+ let writer = ProgrammableSink::default();
+ let mut writer = LineWriter::with_capacity(5, writer);
+
+ assert_eq!(writer.write(b"0123456789").unwrap(), 10);
+ assert_eq!(&writer.get_ref().buffer, b"0123456789");
+ }
+
+ /// Test that, given a very long partial line *after* successfully
+ /// flushing a complete line, that that line is buffered unconditionally,
+ /// and no additional writes take place. This assures the property that
+ /// `write` should make at-most-one attempt to write new data.
+ #[test]
+ fn line_long_tail_not_flushed() {
+ let writer = ProgrammableSink::default();
+ let mut writer = LineWriter::with_capacity(5, writer);
+
+ // Assert that Line 1\n is flushed, and 01234 is buffered
+ assert_eq!(writer.write(b"Line 1\n0123456789").unwrap(), 12);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n");
+
+ // Because the buffer is full, this subsequent write will flush it
+ assert_eq!(writer.write(b"5").unwrap(), 1);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n01234");
+ }
+
+ /// Test that, if an attempt to pre-flush buffered data returns Ok(0),
+ /// this is propagated as an error.
+ #[test]
+ fn line_buffer_write0_error() {
+ let writer = ProgrammableSink {
+ // Accept one write, then return Ok(0) on subsequent ones
+ max_writes: Some(1),
+
+ ..Default::default()
+ };
+ let mut writer = LineWriter::new(writer);
+
+ // This should write "Line 1\n" and buffer "Partial"
+ assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n");
+
+ // This will attempt to flush "partial", which will return Ok(0), which
+ // needs to be an error, because we've already informed the client
+ // that we accepted the write.
+ let err = writer.write(b" Line End\n").unwrap_err();
+ assert_eq!(err.kind(), ErrorKind::WriteZero);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n");
+ }
+
+ /// Test that, if a write returns Ok(0) after a successful pre-flush, this
+ /// is propagated as Ok(0)
+ #[test]
+ fn line_buffer_write0_normal() {
+ let writer = ProgrammableSink {
+ // Accept two writes, then return Ok(0) on subsequent ones
+ max_writes: Some(2),
+
+ ..Default::default()
+ };
+ let mut writer = LineWriter::new(writer);
+
+ // This should write "Line 1\n" and buffer "Partial"
+ assert_eq!(writer.write(b"Line 1\nPartial").unwrap(), 14);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\n");
+
+ // This will flush partial, which will succeed, but then return Ok(0)
+ // when flushing " Line End\n"
+ assert_eq!(writer.write(b" Line End\n").unwrap(), 0);
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nPartial");
+ }
+
+ /// LineWriter has a custom `write_all`; make sure it works correctly
+ #[test]
+ fn line_write_all() {
+ let writer = ProgrammableSink {
+ // Only write 5 bytes at a time
+ accept_prefix: Some(5),
+ ..Default::default()
+ };
+ let mut writer = LineWriter::new(writer);
+
+ writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial").unwrap();
+ assert_eq!(&writer.get_ref().buffer, b"Line 1\nLine 2\nLine 3\nLine 4\n");
+ writer.write_all(b" Line 5\n").unwrap();
+ assert_eq!(
+ writer.get_ref().buffer.as_slice(),
+ b"Line 1\nLine 2\nLine 3\nLine 4\nPartial Line 5\n".as_ref(),
+ );
+ }
+
+ #[test]
+ fn line_write_all_error() {
+ let writer = ProgrammableSink {
+ // Only accept up to 3 writes of up to 5 bytes each
+ accept_prefix: Some(5),
+ max_writes: Some(3),
+ ..Default::default()
+ };
+
+ let mut writer = LineWriter::new(writer);
+ let res = writer.write_all(b"Line 1\nLine 2\nLine 3\nLine 4\nPartial");
+ assert!(res.is_err());
+ // An error from write_all leaves everything in an indeterminate state,
+ // so there's nothing else to test here
+ }
+
+ /// Under certain circumstances, the old implementation of LineWriter
+ /// would try to buffer "to the last newline" but be forced to buffer
+ /// less than that, leading to inappropriate partial line writes.
+ /// Regression test for that issue.
+ #[test]
+ fn partial_multiline_buffering() {
+ let writer = ProgrammableSink {
+ // Write only up to 5 bytes at a time
+ accept_prefix: Some(5),
+ ..Default::default()
+ };
+
+ let mut writer = LineWriter::with_capacity(10, writer);
+
+ let content = b"AAAAABBBBB\nCCCCDDDDDD\nEEE";
+
+ // When content is written, LineWriter will try to write blocks A, B,
+ // C, and D. Only block A will succeed. Under the old behavior, LineWriter
+ // would then try to buffer B, C and D, but because its capacity is 10,
+ // it will only be able to buffer B and C. We don't want to buffer
+ // partial lines concurrent with whole lines, so the correct behavior
+ // is to buffer only block B (out to the newline)
+ assert_eq!(writer.write(content).unwrap(), 11);
+ assert_eq!(writer.get_ref().buffer, *b"AAAAA");
+
+ writer.flush().unwrap();
+ assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB\n");
+ }
+
+ /// Same as test_partial_multiline_buffering, but in the event NO full lines
+ /// fit in the buffer, just buffer as much as possible
+ #[test]
+ fn partial_multiline_buffering_without_full_line() {
+ let writer = ProgrammableSink {
+ // Write only up to 5 bytes at a time
+ accept_prefix: Some(5),
+ ..Default::default()
+ };
+
+ let mut writer = LineWriter::with_capacity(5, writer);
+
+ let content = b"AAAAABBBBBBBBBB\nCCCCC\nDDDDD";
+
+ // When content is written, LineWriter will try to write blocks A, B,
+ // and C. Only block A will succeed. Under the old behavior, LineWriter
+ // would then try to buffer B and C, but because its capacity is 5,
+ // it will only be able to buffer part of B. Because it's not possible
+ // for it to buffer any complete lines, it should buffer as much of B as
+ // possible
+ assert_eq!(writer.write(content).unwrap(), 10);
+ assert_eq!(writer.get_ref().buffer, *b"AAAAA");
+
+ writer.flush().unwrap();
+ assert_eq!(writer.get_ref().buffer, *b"AAAAABBBBB");
+ }
+
+ #[derive(Debug, Clone, PartialEq, Eq)]
+ enum RecordedEvent {
+ Write(String),
+ Flush,
+ }
+
+ #[derive(Debug, Clone, Default)]
+ struct WriteRecorder {
+ pub events: Vec<RecordedEvent>,
+ }
+
+ impl Write for WriteRecorder {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ use crate::str::from_utf8;
+
+ self.events.push(RecordedEvent::Write(from_utf8(buf).unwrap().to_string()));
+ Ok(buf.len())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.events.push(RecordedEvent::Flush);
+ Ok(())
+ }
+ }
+
+ /// Test that a normal, formatted writeln only results in a single write
+ /// call to the underlying writer. A naive implementation of
+ /// LineWriter::write_all results in two writes: one of the buffered data,
+ /// and another of the final substring in the formatted set
+ #[test]
+ fn single_formatted_write() {
+ let writer = WriteRecorder::default();
+ let mut writer = LineWriter::new(writer);
+
+ // Under a naive implementation of LineWriter, this will result in two
+ // writes: "hello, world" and "!\n", because write() has to flush the
+ // buffer before attempting to write the last "!\n". write_all shouldn't
+ // have this limitation.
+ writeln!(&mut writer, "{}, {}!", "hello", "world").unwrap();
+ assert_eq!(writer.get_ref().events, [RecordedEvent::Write("hello, world!\n".to_string())]);
+ }
}