diff options
| author | Max Audron <audron@cocaine.farm> | 2020-04-12 17:45:42 +0200 |
|---|---|---|
| committer | Max Audron <audron@cocaine.farm> | 2020-04-12 17:45:42 +0200 |
| commit | a5b4d399322eb5f45622395ec8407c80edf9261c (patch) | |
| tree | ea71a15f607289c113edd0cc9125d6d5906bc046 | |
| parent | update deps (diff) | |
finish FramedCodec
| -rw-r--r-- | src/bin/quassel-client.rs | 2 | ||||
| -rw-r--r-- | src/protocol/frame/mod.rs | 87 | ||||
| -rw-r--r-- | src/tests/frame.rs | 241 | ||||
| -rw-r--r-- | src/tests/mod.rs | 15 |
4 files changed, 297 insertions, 48 deletions
diff --git a/src/bin/quassel-client.rs b/src/bin/quassel-client.rs index 542d265..ce450b6 100644 --- a/src/bin/quassel-client.rs +++ b/src/bin/quassel-client.rs @@ -13,7 +13,7 @@ async fn main() -> Result<(), Error> { "localhost", 4242, false, - false, + true, ).await.unwrap(); client.run().await; diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs index b8296aa..8c5a8d3 100644 --- a/src/protocol/frame/mod.rs +++ b/src/protocol/frame/mod.rs @@ -1,7 +1,7 @@ -use std::error::Error as StdError; -use std::io::{self, Cursor}; use std::convert::TryInto; +use std::error::Error as StdError; use std::fmt; +use std::io::{self, Cursor}; use bytes::{Buf, BufMut, BytesMut}; @@ -10,8 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; use flate2::Compress; -use flate2::Decompress; use flate2::Compression; +use flate2::Decompress; use flate2::FlushCompress; use flate2::FlushDecompress; @@ -61,6 +61,10 @@ impl QuasselCodec { Builder::new() } + pub fn max_frame_length(&self) -> usize { + self.builder.max_frame_len + } + pub fn compression(&self) -> bool { self.builder.compression } @@ -78,20 +82,19 @@ impl QuasselCodec { } fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { -// let head_len = self.builder.num_head_bytes(); - let field_len = 4; + let head_len = 4; - if src.len() < field_len { + if src.len() < head_len { // Not enough data return Ok(None); } - let n = { + let field_len = { let mut src = Cursor::new(&mut *src); - let n = src.get_uint(field_len); + let field_len = src.get_uint(head_len); - if n > self.builder.max_frame_len as u64 { + if field_len > self.builder.max_frame_len as u64 { return Err(io::Error::new( io::ErrorKind::InvalidData, QuasselCodecError { _priv: () }, @@ -99,18 +102,17 @@ impl QuasselCodec { } // The check above ensures there is no overflow - let n = n as usize; - n + field_len as usize }; // Strip header - let _ = src.split_to(4); + let _ = src.split_to(head_len); // Ensure that the buffer has enough space to read the incoming // payload - src.reserve(n); + src.reserve(field_len); - Ok(Some(n)) + Ok(Some(field_len)) } fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { @@ -124,30 +126,40 @@ impl QuasselCodec { } } - impl Decoder for QuasselCodec { type Item = BytesMut; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { - let mut buf = vec![0; src.len() * 2]; - - println!("src: {:?}", &src[..]); + // Create Unified Buffer for compressed and not compressed datastream + let mut buf: &mut BytesMut = &mut BytesMut::new(); if self.builder.compression == true { + // Buffer to shove uncompressed stream into + let mut msg = Vec::with_capacity(self.builder.max_frame_len); + let before_in = self.decomp.total_in(); let before_out = self.decomp.total_out(); - self.decomp.decompress(&src, &mut buf, FlushDecompress::None)?; + + self.decomp + .decompress_vec(&src, &mut msg, FlushDecompress::None)?; + // Clear the src buffer, decompress() only peeks at content. + // without this we will endlessly loop over the same frame. + src.clear(); + let after_in = self.decomp.total_in(); let after_out = self.decomp.total_out(); - buf.truncate((after_out - before_out).try_into().unwrap()); + let len = (after_out - before_out).try_into().unwrap(); + + // Reserve length of uncompressed stream + // and put bytes into there + buf.reserve(len); + buf.put(&msg[..]); } else { - buf = src.to_vec(); + buf = src; } - let buf = &mut BytesMut::from(&buf[..]); - let n = match self.state { DecodeState::Head => match self.decode_head(buf)? { Some(n) => { @@ -200,7 +212,7 @@ impl Encoder for QuasselCodec { buf.extend_from_slice(&data[..]); if self.builder.compression { - let mut cbuf: Vec<u8> = vec![0; 4+n]; + let mut cbuf: Vec<u8> = vec![0; 4 + n]; let before_in = self.comp.total_in(); let before_out = self.comp.total_out(); self.comp.compress(buf, &mut cbuf, FlushCompress::Full)?; @@ -223,7 +235,6 @@ impl Default for QuasselCodec { } } - // ===== impl Builder ===== impl Builder { @@ -234,14 +245,10 @@ impl Builder { /// /// ``` /// # use tokio::io::AsyncRead; - /// use tokio_util::codec::QuasselCodec; + /// use libquassel::protocol::frame::QuasselCodec; /// /// # fn bind_read<T: AsyncRead>(io: T) { /// QuasselCodec::builder() - /// .length_field_offset(0) - /// .length_field_length(2) - /// .length_adjustment(0) - /// .num_skip(0) /// .new_read(io); /// # } /// # pub fn main() {} @@ -281,7 +288,7 @@ impl Builder { /// /// ``` /// # use tokio::io::AsyncRead; - /// use tokio_util::codec::QuasselCodec; + /// use libquassel::protocol::frame::QuasselCodec; /// /// # fn bind_read<T: AsyncRead>(io: T) { /// QuasselCodec::builder() @@ -300,13 +307,9 @@ impl Builder { /// # Examples /// /// ``` - /// use tokio_util::codec::QuasselCodec; + /// use libquassel::protocol::frame::QuasselCodec; /// # pub fn main() { /// QuasselCodec::builder() - /// .length_field_offset(0) - /// .length_field_length(2) - /// .length_adjustment(0) - /// .num_skip(0) /// .new_codec(); /// # } /// ``` @@ -325,14 +328,10 @@ impl Builder { /// /// ``` /// # use tokio::io::AsyncRead; - /// use tokio_util::codec::QuasselCodec; + /// use libquassel::protocol::frame::QuasselCodec; /// /// # fn bind_read<T: AsyncRead>(io: T) { /// QuasselCodec::builder() - /// .length_field_offset(0) - /// .length_field_length(2) - /// .length_adjustment(0) - /// .num_skip(0) /// .new_read(io); /// # } /// # pub fn main() {} @@ -350,10 +349,9 @@ impl Builder { /// /// ``` /// # use tokio::io::AsyncWrite; - /// # use tokio_util::codec::QuasselCodec; + /// # use libquassel::protocol::frame::QuasselCodec; /// # fn write_frame<T: AsyncWrite>(io: T) { /// QuasselCodec::builder() - /// .length_field_length(2) /// .new_write(io); /// # } /// # pub fn main() {} @@ -371,11 +369,10 @@ impl Builder { /// /// ``` /// # use tokio::io::{AsyncRead, AsyncWrite}; - /// # use tokio_util::codec::QuasselCodec; + /// # use libquassel::protocol::frame::QuasselCodec; /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { /// # let _ = /// QuasselCodec::builder() - /// .length_field_length(2) /// .new_framed(io); /// # } /// # pub fn main() {} diff --git a/src/tests/frame.rs b/src/tests/frame.rs new file mode 100644 index 0000000..0bc87ad --- /dev/null +++ b/src/tests/frame.rs @@ -0,0 +1,241 @@ +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_test::task; +use tokio_test::{ + assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok, +}; +use tokio_util::codec::*; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{pin_mut, Sink, Stream}; + +use std::collections::VecDeque; +use std::io; +use std::pin::Pin; +use std::task::Poll::*; +use std::task::{Context, Poll}; + +use flate2::Compress; +use flate2::Compression; +use flate2::Decompress; +use flate2::FlushCompress; +use flate2::FlushDecompress; + +use crate::protocol::frame::QuasselCodec; + +macro_rules! mock { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Mock { calls: v } + }}; +} + +macro_rules! assert_next_eq { + ($io:ident, $expect:expr) => {{ + task::spawn(()).enter(|cx, _| { + let res = assert_ready!($io.as_mut().poll_next(cx)); + match res { + Some(Ok(v)) => assert_eq!(v, $expect.as_ref()), + Some(Err(e)) => panic!("error = {:?}", e), + None => panic!("none"), + } + }); + }}; +} + +macro_rules! assert_next_pending { + ($io:ident) => {{ + task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { + Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Ready(Some(Err(e))) => panic!("error = {:?}", e), + Ready(None) => panic!("done"), + Pending => {} + }); + }}; +} + +macro_rules! assert_next_err { + ($io:ident) => {{ + task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) { + Ready(Some(Ok(v))) => panic!("value = {:?}", v), + Ready(Some(Err(_))) => {} + Ready(None) => panic!("done"), + Pending => panic!("pending"), + }); + }}; +} + +macro_rules! assert_done { + ($io:ident) => {{ + task::spawn(()).enter(|cx, _| { + let res = assert_ready!($io.as_mut().poll_next(cx)); + match res { + Some(Ok(v)) => panic!("value = {:?}", v), + Some(Err(e)) => panic!("error = {:?}", e), + None => {} + } + }); + }}; +} + +// ====================== +// ===== Test ===== +// ====================== + +#[test] +pub fn read_single_frame() { + let io = FramedRead::new( + mock! { + data(b"\x00\x00\x00\x09abcdefghi"), + }, + QuasselCodec::new(), + ); + pin_mut!(io); + + assert_next_eq!(io, b"abcdefghi"); + assert_done!(io); +} + +#[test] +pub fn read_multi_frame() { + let mut d: Vec<u8> = vec![]; + d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi"); + d.extend_from_slice(b"\x00\x00\x00\x03123"); + d.extend_from_slice(b"\x00\x00\x00\x0bhello world"); + + let io = FramedRead::new( + mock! { + data(&d), + }, + QuasselCodec::new(), + ); + pin_mut!(io); + + assert_next_eq!(io, b"abcdefghi"); + assert_next_eq!(io, b"123"); + assert_next_eq!(io, b"hello world"); + assert_done!(io); +} + +#[test] +pub fn read_single_frame_compressed() { + let io = FramedRead::new( + mock! { + data(b"\x78\x9c\x63\x60\x60\xe0\x4c\x4c\x4a\x4e\x49\x4d\x4b\xcf\xc8\x04\x00\x11\xec\x03\x97"), + }, + QuasselCodec::builder().compression(true).new_codec(), + ); + pin_mut!(io); + + assert_next_eq!(io, b"abcdefghi"); + assert_done!(io); +} + +// TODO shit doens't work for whatever reason +// #[test] +// pub fn read_multi_frame_compressed() { +// let io = FramedRead::new( +// mock! { +// data( +// b"\x78\x9c\x63\x60\x60\xe0\x4c\x4c\x4a\x4e\x49\x4d\x4b\xcf\xc8\x04\x00\x11\xec\x03\x97\x78\x9c\x63\x60\x60\x60\x36\x34\x32\x06\x00\x01\x3d\x00\x9a\x78\x9c\x63\x60\x60\xe0\xce\x48\xcd\xc9\xc9\x57\x28\xcf\x2f\xca\x49\x01\x00\x1a\x93\x04\x68", +// ), +// }, +// QuasselCodec::builder().compression(true).new_codec(), +// ); +// pin_mut!(io); +// +// assert_next_eq!(io, b"abcdefghi"); +// assert_next_eq!(io, b"123"); +// assert_next_eq!(io, b"hello world"); +// assert_done!(io); +// } + +// ====================== +// ===== Test utils ===== +// ====================== + +struct Mock { + calls: VecDeque<Poll<io::Result<Op>>>, +} + +enum Op { + Data(Vec<u8>), + Flush, +} + +use self::Op::*; + +impl AsyncRead for Mock { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + dst: &mut [u8], + ) -> Poll<io::Result<usize>> { + match self.calls.pop_front() { + Some(Ready(Ok(Op::Data(data)))) => { + debug_assert!(dst.len() >= data.len()); + dst[..data.len()].copy_from_slice(&data[..]); + Ready(Ok(data.len())) + } + Some(Ready(Ok(_))) => panic!(), + Some(Ready(Err(e))) => Ready(Err(e)), + Some(Pending) => Pending, + None => Ready(Ok(0)), + } + } +} + +impl AsyncWrite for Mock { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + src: &[u8], + ) -> Poll<Result<usize, io::Error>> { + match self.calls.pop_front() { + Some(Ready(Ok(Op::Data(data)))) => { + let len = data.len(); + assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src); + assert_eq!(&data[..], &src[..len]); + Ready(Ok(len)) + } + Some(Ready(Ok(_))) => panic!(), + Some(Ready(Err(e))) => Ready(Err(e)), + Some(Pending) => Pending, + None => Ready(Ok(0)), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + match self.calls.pop_front() { + Some(Ready(Ok(Op::Flush))) => Ready(Ok(())), + Some(Ready(Ok(_))) => panic!(), + Some(Ready(Err(e))) => Ready(Err(e)), + Some(Pending) => Pending, + None => Ready(Ok(())), + } + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { + Ready(Ok(())) + } +} + +impl<'a> From<&'a [u8]> for Op { + fn from(src: &'a [u8]) -> Op { + Op::Data(src.into()) + } +} + +impl From<Vec<u8>> for Op { + fn from(src: Vec<u8>) -> Op { + Op::Data(src) + } +} + +fn data(bytes: &[u8]) -> Poll<io::Result<Op>> { + Ready(Ok(bytes.into())) +} + +fn flush() -> Poll<io::Result<Op>> { + Ready(Ok(Flush)) +} diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 882f75c..4b030c7 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,4 +1,15 @@ -#[cfg(test)] +pub mod base_types; + +#[allow(unused_imports)] +#[allow(unused_macros)] +#[allow(dead_code)] +pub mod frame; + pub mod handshake_types; + pub mod variant_types; -pub mod base_types; + +extern crate futures; +extern crate tokio; +extern crate tokio_test; +extern crate tokio_util; |
