aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bin/quassel-client.rs2
-rw-r--r--src/protocol/frame/mod.rs87
-rw-r--r--src/tests/frame.rs241
-rw-r--r--src/tests/mod.rs15
4 files changed, 297 insertions, 48 deletions
diff --git a/src/bin/quassel-client.rs b/src/bin/quassel-client.rs
index 542d265..ce450b6 100644
--- a/src/bin/quassel-client.rs
+++ b/src/bin/quassel-client.rs
@@ -13,7 +13,7 @@ async fn main() -> Result<(), Error> {
"localhost",
4242,
false,
- false,
+ true,
).await.unwrap();
client.run().await;
diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs
index b8296aa..8c5a8d3 100644
--- a/src/protocol/frame/mod.rs
+++ b/src/protocol/frame/mod.rs
@@ -1,7 +1,7 @@
-use std::error::Error as StdError;
-use std::io::{self, Cursor};
use std::convert::TryInto;
+use std::error::Error as StdError;
use std::fmt;
+use std::io::{self, Cursor};
use bytes::{Buf, BufMut, BytesMut};
@@ -10,8 +10,8 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
use flate2::Compress;
-use flate2::Decompress;
use flate2::Compression;
+use flate2::Decompress;
use flate2::FlushCompress;
use flate2::FlushDecompress;
@@ -61,6 +61,10 @@ impl QuasselCodec {
Builder::new()
}
+ pub fn max_frame_length(&self) -> usize {
+ self.builder.max_frame_len
+ }
+
pub fn compression(&self) -> bool {
self.builder.compression
}
@@ -78,20 +82,19 @@ impl QuasselCodec {
}
fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
-// let head_len = self.builder.num_head_bytes();
- let field_len = 4;
+ let head_len = 4;
- if src.len() < field_len {
+ if src.len() < head_len {
// Not enough data
return Ok(None);
}
- let n = {
+ let field_len = {
let mut src = Cursor::new(&mut *src);
- let n = src.get_uint(field_len);
+ let field_len = src.get_uint(head_len);
- if n > self.builder.max_frame_len as u64 {
+ if field_len > self.builder.max_frame_len as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
QuasselCodecError { _priv: () },
@@ -99,18 +102,17 @@ impl QuasselCodec {
}
// The check above ensures there is no overflow
- let n = n as usize;
- n
+ field_len as usize
};
// Strip header
- let _ = src.split_to(4);
+ let _ = src.split_to(head_len);
// Ensure that the buffer has enough space to read the incoming
// payload
- src.reserve(n);
+ src.reserve(field_len);
- Ok(Some(n))
+ Ok(Some(field_len))
}
fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
@@ -124,30 +126,40 @@ impl QuasselCodec {
}
}
-
impl Decoder for QuasselCodec {
type Item = BytesMut;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
- let mut buf = vec![0; src.len() * 2];
-
- println!("src: {:?}", &src[..]);
+ // Create Unified Buffer for compressed and not compressed datastream
+ let mut buf: &mut BytesMut = &mut BytesMut::new();
if self.builder.compression == true {
+ // Buffer to shove uncompressed stream into
+ let mut msg = Vec::with_capacity(self.builder.max_frame_len);
+
let before_in = self.decomp.total_in();
let before_out = self.decomp.total_out();
- self.decomp.decompress(&src, &mut buf, FlushDecompress::None)?;
+
+ self.decomp
+ .decompress_vec(&src, &mut msg, FlushDecompress::None)?;
+ // Clear the src buffer, decompress() only peeks at content.
+ // without this we will endlessly loop over the same frame.
+ src.clear();
+
let after_in = self.decomp.total_in();
let after_out = self.decomp.total_out();
- buf.truncate((after_out - before_out).try_into().unwrap());
+ let len = (after_out - before_out).try_into().unwrap();
+
+ // Reserve length of uncompressed stream
+ // and put bytes into there
+ buf.reserve(len);
+ buf.put(&msg[..]);
} else {
- buf = src.to_vec();
+ buf = src;
}
- let buf = &mut BytesMut::from(&buf[..]);
-
let n = match self.state {
DecodeState::Head => match self.decode_head(buf)? {
Some(n) => {
@@ -200,7 +212,7 @@ impl Encoder for QuasselCodec {
buf.extend_from_slice(&data[..]);
if self.builder.compression {
- let mut cbuf: Vec<u8> = vec![0; 4+n];
+ let mut cbuf: Vec<u8> = vec![0; 4 + n];
let before_in = self.comp.total_in();
let before_out = self.comp.total_out();
self.comp.compress(buf, &mut cbuf, FlushCompress::Full)?;
@@ -223,7 +235,6 @@ impl Default for QuasselCodec {
}
}
-
// ===== impl Builder =====
impl Builder {
@@ -234,14 +245,10 @@ impl Builder {
///
/// ```
/// # use tokio::io::AsyncRead;
- /// use tokio_util::codec::QuasselCodec;
+ /// use libquassel::protocol::frame::QuasselCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// QuasselCodec::builder()
- /// .length_field_offset(0)
- /// .length_field_length(2)
- /// .length_adjustment(0)
- /// .num_skip(0)
/// .new_read(io);
/// # }
/// # pub fn main() {}
@@ -281,7 +288,7 @@ impl Builder {
///
/// ```
/// # use tokio::io::AsyncRead;
- /// use tokio_util::codec::QuasselCodec;
+ /// use libquassel::protocol::frame::QuasselCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// QuasselCodec::builder()
@@ -300,13 +307,9 @@ impl Builder {
/// # Examples
///
/// ```
- /// use tokio_util::codec::QuasselCodec;
+ /// use libquassel::protocol::frame::QuasselCodec;
/// # pub fn main() {
/// QuasselCodec::builder()
- /// .length_field_offset(0)
- /// .length_field_length(2)
- /// .length_adjustment(0)
- /// .num_skip(0)
/// .new_codec();
/// # }
/// ```
@@ -325,14 +328,10 @@ impl Builder {
///
/// ```
/// # use tokio::io::AsyncRead;
- /// use tokio_util::codec::QuasselCodec;
+ /// use libquassel::protocol::frame::QuasselCodec;
///
/// # fn bind_read<T: AsyncRead>(io: T) {
/// QuasselCodec::builder()
- /// .length_field_offset(0)
- /// .length_field_length(2)
- /// .length_adjustment(0)
- /// .num_skip(0)
/// .new_read(io);
/// # }
/// # pub fn main() {}
@@ -350,10 +349,9 @@ impl Builder {
///
/// ```
/// # use tokio::io::AsyncWrite;
- /// # use tokio_util::codec::QuasselCodec;
+ /// # use libquassel::protocol::frame::QuasselCodec;
/// # fn write_frame<T: AsyncWrite>(io: T) {
/// QuasselCodec::builder()
- /// .length_field_length(2)
/// .new_write(io);
/// # }
/// # pub fn main() {}
@@ -371,11 +369,10 @@ impl Builder {
///
/// ```
/// # use tokio::io::{AsyncRead, AsyncWrite};
- /// # use tokio_util::codec::QuasselCodec;
+ /// # use libquassel::protocol::frame::QuasselCodec;
/// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
/// # let _ =
/// QuasselCodec::builder()
- /// .length_field_length(2)
/// .new_framed(io);
/// # }
/// # pub fn main() {}
diff --git a/src/tests/frame.rs b/src/tests/frame.rs
new file mode 100644
index 0000000..0bc87ad
--- /dev/null
+++ b/src/tests/frame.rs
@@ -0,0 +1,241 @@
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio_test::task;
+use tokio_test::{
+ assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
+};
+use tokio_util::codec::*;
+
+use bytes::{BufMut, Bytes, BytesMut};
+use futures::{pin_mut, Sink, Stream};
+
+use std::collections::VecDeque;
+use std::io;
+use std::pin::Pin;
+use std::task::Poll::*;
+use std::task::{Context, Poll};
+
+use flate2::Compress;
+use flate2::Compression;
+use flate2::Decompress;
+use flate2::FlushCompress;
+use flate2::FlushDecompress;
+
+use crate::protocol::frame::QuasselCodec;
+
+macro_rules! mock {
+ ($($x:expr,)*) => {{
+ let mut v = VecDeque::new();
+ v.extend(vec![$($x),*]);
+ Mock { calls: v }
+ }};
+}
+
+macro_rules! assert_next_eq {
+ ($io:ident, $expect:expr) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => panic!("none"),
+ }
+ });
+ }};
+}
+
+macro_rules! assert_next_pending {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(e))) => panic!("error = {:?}", e),
+ Ready(None) => panic!("done"),
+ Pending => {}
+ });
+ }};
+}
+
+macro_rules! assert_next_err {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
+ Ready(Some(Ok(v))) => panic!("value = {:?}", v),
+ Ready(Some(Err(_))) => {}
+ Ready(None) => panic!("done"),
+ Pending => panic!("pending"),
+ });
+ }};
+}
+
+macro_rules! assert_done {
+ ($io:ident) => {{
+ task::spawn(()).enter(|cx, _| {
+ let res = assert_ready!($io.as_mut().poll_next(cx));
+ match res {
+ Some(Ok(v)) => panic!("value = {:?}", v),
+ Some(Err(e)) => panic!("error = {:?}", e),
+ None => {}
+ }
+ });
+ }};
+}
+
+// ======================
+// ===== Test =====
+// ======================
+
+#[test]
+pub fn read_single_frame() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x00\x00\x00\x09abcdefghi"),
+ },
+ QuasselCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+#[test]
+pub fn read_multi_frame() {
+ let mut d: Vec<u8> = vec![];
+ d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
+ d.extend_from_slice(b"\x00\x00\x00\x03123");
+ d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
+
+ let io = FramedRead::new(
+ mock! {
+ data(&d),
+ },
+ QuasselCodec::new(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_next_eq!(io, b"123");
+ assert_next_eq!(io, b"hello world");
+ assert_done!(io);
+}
+
+#[test]
+pub fn read_single_frame_compressed() {
+ let io = FramedRead::new(
+ mock! {
+ data(b"\x78\x9c\x63\x60\x60\xe0\x4c\x4c\x4a\x4e\x49\x4d\x4b\xcf\xc8\x04\x00\x11\xec\x03\x97"),
+ },
+ QuasselCodec::builder().compression(true).new_codec(),
+ );
+ pin_mut!(io);
+
+ assert_next_eq!(io, b"abcdefghi");
+ assert_done!(io);
+}
+
+// TODO shit doens't work for whatever reason
+// #[test]
+// pub fn read_multi_frame_compressed() {
+// let io = FramedRead::new(
+// mock! {
+// data(
+// b"\x78\x9c\x63\x60\x60\xe0\x4c\x4c\x4a\x4e\x49\x4d\x4b\xcf\xc8\x04\x00\x11\xec\x03\x97\x78\x9c\x63\x60\x60\x60\x36\x34\x32\x06\x00\x01\x3d\x00\x9a\x78\x9c\x63\x60\x60\xe0\xce\x48\xcd\xc9\xc9\x57\x28\xcf\x2f\xca\x49\x01\x00\x1a\x93\x04\x68",
+// ),
+// },
+// QuasselCodec::builder().compression(true).new_codec(),
+// );
+// pin_mut!(io);
+//
+// assert_next_eq!(io, b"abcdefghi");
+// assert_next_eq!(io, b"123");
+// assert_next_eq!(io, b"hello world");
+// assert_done!(io);
+// }
+
+// ======================
+// ===== Test utils =====
+// ======================
+
+struct Mock {
+ calls: VecDeque<Poll<io::Result<Op>>>,
+}
+
+enum Op {
+ Data(Vec<u8>),
+ Flush,
+}
+
+use self::Op::*;
+
+impl AsyncRead for Mock {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ dst: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ debug_assert!(dst.len() >= data.len());
+ dst[..data.len()].copy_from_slice(&data[..]);
+ Ready(Ok(data.len()))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(0)),
+ }
+ }
+}
+
+impl AsyncWrite for Mock {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut Context<'_>,
+ src: &[u8],
+ ) -> Poll<Result<usize, io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Data(data)))) => {
+ let len = data.len();
+ assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
+ assert_eq!(&data[..], &src[..len]);
+ Ready(Ok(len))
+ }
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(0)),
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ match self.calls.pop_front() {
+ Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
+ Some(Ready(Ok(_))) => panic!(),
+ Some(Ready(Err(e))) => Ready(Err(e)),
+ Some(Pending) => Pending,
+ None => Ready(Ok(())),
+ }
+ }
+
+ fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
+ Ready(Ok(()))
+ }
+}
+
+impl<'a> From<&'a [u8]> for Op {
+ fn from(src: &'a [u8]) -> Op {
+ Op::Data(src.into())
+ }
+}
+
+impl From<Vec<u8>> for Op {
+ fn from(src: Vec<u8>) -> Op {
+ Op::Data(src)
+ }
+}
+
+fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
+ Ready(Ok(bytes.into()))
+}
+
+fn flush() -> Poll<io::Result<Op>> {
+ Ready(Ok(Flush))
+}
diff --git a/src/tests/mod.rs b/src/tests/mod.rs
index 882f75c..4b030c7 100644
--- a/src/tests/mod.rs
+++ b/src/tests/mod.rs
@@ -1,4 +1,15 @@
-#[cfg(test)]
+pub mod base_types;
+
+#[allow(unused_imports)]
+#[allow(unused_macros)]
+#[allow(dead_code)]
+pub mod frame;
+
pub mod handshake_types;
+
pub mod variant_types;
-pub mod base_types;
+
+extern crate futures;
+extern crate tokio;
+extern crate tokio_test;
+extern crate tokio_util;