diff options
| -rw-r--r-- | Cargo.lock | 155 | ||||
| -rw-r--r-- | Cargo.toml | 14 | ||||
| -rw-r--r-- | src/client/mod.rs | 156 | ||||
| -rw-r--r-- | src/main.rs | 22 | ||||
| -rw-r--r-- | src/protocol/frame/mod.rs | 405 | ||||
| -rw-r--r-- | src/protocol/message/handshake/types.rs | 23 | ||||
| -rw-r--r-- | src/protocol/mod.rs | 6 | ||||
| -rw-r--r-- | src/tests/handshake_types.rs | 31 | ||||
| -rw-r--r-- | src/util.rs | 2 |
9 files changed, 633 insertions, 181 deletions
@@ -42,6 +42,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "bytes" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -90,8 +99,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", "miniz_oxide 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -114,11 +125,93 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "futures" +version = "0.1.29" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-channel" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "futures-core" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "futures-executor" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-io" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-macro" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-sink" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-task" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "futures-util" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "hermit-abi" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -158,10 +251,14 @@ name = "libquassel" version = "0.1.0" dependencies = [ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", "failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "flate2 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -269,6 +366,26 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "pin-utils" +version = "0.1.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "proc-macro-hack" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-nested" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "proc-macro2" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -364,6 +481,16 @@ dependencies = [ ] [[package]] +name = "tokio-io" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "tokio-macros" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -373,6 +500,19 @@ dependencies = [ ] [[package]] +name = "tokio-util" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "unicode-xid" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -422,6 +562,7 @@ dependencies = [ "checksum backtrace-sys 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6575f128516de27e3ce99689419835fce9643a9b215a14d2b5b685be018491" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" +"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" "checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38" "checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" @@ -432,7 +573,16 @@ dependencies = [ "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" +"checksum futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6f16056ecbb57525ff698bb955162d0cd03bee84e6241c27ff75c08d8ca5987" +"checksum futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86" "checksum futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" +"checksum futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231" +"checksum futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" +"checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" +"checksum futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16" +"checksum futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" +"checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" "checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" @@ -449,6 +599,9 @@ dependencies = [ "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" "checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" +"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +"checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5" +"checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" "checksum proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548" "checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" @@ -459,7 +612,9 @@ dependencies = [ "checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5" "checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" "checksum tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ffa2fdcfa937b20cb3c822a635ceecd5fc1a27a6a474527e5516aa24b8c8820a" +"checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926" "checksum tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "50a61f268a3db2acee8dcab514efc813dc6dbe8a00e86076f935f94304b59a7a" +"checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" @@ -8,7 +8,17 @@ edition = "2018" [dependencies] log = "0.4" +bytes = "0.5" byteorder = "1.3.2" -flate2 = "1.0" -tokio = { version = "0.2", features = ["full"] } +flate2 = { version = "1.0", features = ["tokio"], optional = true } +tokio = { version = "0.2", features = ["full"], optional = true } +tokio-util = { version = "0.2", features = ["codec"], optional = true } failure = "0.1" +futures-util = { version = "0.3", features = ["std"] } +futures = "0.3" + +[features] +framing = ["tokio", "tokio-util", "flate2"] +client = ["tokio", "tokio-util", "framing", "futures-util"] + +default = [] diff --git a/src/client/mod.rs b/src/client/mod.rs index 213f6ff..4ab0601 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,67 +1,36 @@ //use std::io::BufWriter; use std::result::Result; use std::vec::Vec; -use std::convert::TryInto; -use std::io::Cursor; - -use flate2::Compress; -use flate2::Decompress; -use flate2::Compression; -use flate2::FlushCompress; -use flate2::FlushDecompress; -use flate2::read::ZlibDecoder; use tokio::net::TcpStream; -use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::prelude::*; +use tokio_util::codec::{Framed}; +use futures_util::stream::StreamExt; +use futures::SinkExt; + +use crate::protocol::frame::QuasselCodec; + use failure::Error; extern crate log; // use log::{info, warn, debug}; -use crate::protocol::message; - -pub enum State { - Handshake, - Connected -} - pub struct Client { - stream: TcpStream, - encoder: Compress, - decoder: Decompress, - state: State, + stream: Framed<TcpStream, QuasselCodec>, pub tls: bool, pub compression: bool, } impl Client { - pub async fn handler(mut self) -> Result<(), Error> { - loop { - let mut buf: Vec<u8> = vec![0; 2048]; - match self.stream.read(&mut buf).await { - Ok(n) => { - buf.truncate(n); - let mut cbuf: Vec<u8> = vec![0; n * 2]; - - let before_in = self.decoder.total_in(); - let before_out = self.decoder.total_out(); - self.decoder.decompress(&buf, &mut cbuf, FlushDecompress::None)?; - let after_in = self.decoder.total_in(); - let after_out = self.decoder.total_out(); - - cbuf.truncate(after_out.try_into()?); - - match self.state { - State::Handshake => handle_login_message(&mut self, &cbuf), - State::Connected => handle_login_message(&mut self, &cbuf) - }.await?; - } - Err(e) => { panic!(e) } - } - } - } + pub async fn run(&mut self) { + // TODO while endlessly loops over same stream element + while let Some(msg) = self.stream.next().await { + println!("bing"); + let msg = msg.unwrap(); + handle_login_message(self, &msg).await.unwrap(); + }; + } pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> { use crate::protocol::primitive::deserialize::Deserialize; @@ -94,11 +63,13 @@ impl Client { let (_, val) = ConnAck::parse(&buf).unwrap(); println!("Received: {:?}", val); + let codec = QuasselCodec::builder() + .compression(compression) + .new_codec(); + let stream = Framed::new(s, codec); + let mut client = Client { - stream: s, - state: State::Handshake, - encoder: Compress::new(Compression::best(), true), - decoder: Decompress::new(true), + stream: stream, tls: tls, compression: compression, }; @@ -113,93 +84,14 @@ impl Client { feature_list: features, client_features: 0x00008000, }; - write_to_stream(&mut client, &client_init.serialize()?).await?; + + client.stream.send(client_init.serialize()?).await?; return Ok(client); } - -// pub fn login(&mut self, user: &'static str, pass: &'static str, client: message::ClientInit) { -// use crate::protocol::message::handshake::{HandshakeDeserialize, HandshakeSerialize, HandshakeQRead, VariantMap}; -// use crate::protocol::message::handshake::{ClientInitAck, ClientLogin, ClientLoginAck, SessionInit}; -// -// self.write(&client.serialize().unwrap()).unwrap(); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let res = ClientInitAck::parse(&buf).unwrap(); -// println!("res: {:?}", res); -// -// let login = ClientLogin {user: user.to_string(), password: pass.to_string()}; -// self.write(&login.serialize().unwrap()).unwrap(); -// println!("res: {:?}", res); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let _res = ClientLoginAck::parse(&buf).unwrap(); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let res = SessionInit::parse(&buf).unwrap(); -// -// println!("res: {:?}", res); -// } -} - -// impl std::io::Read for Client { -// fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { -// let mut cbuf = [0; 2048].to_vec(); -// let read_bytes = self.tcp_stream.read(&mut cbuf)?; -// println!("read bytes: {:?}", read_bytes); -// cbuf.truncate(read_bytes); -// println!("cbuf: {:?}", &cbuf[0..]); -// let before_in = self.decoder.total_in(); -// let before_out = self.decoder.total_out(); -// self.decoder.decompress(&cbuf, buf, FlushDecompress::None)?; -// let after_in = self.decoder.total_in(); -// let after_out = self.decoder.total_out(); -// -// println!("in: {:?} / {:?}\nout: {:?} / {:?}", before_in, after_in, before_out, after_out); -// -// println!("buf: {:?}", buf); -// return Ok(((after_in - after_out)).try_into().unwrap()); -// // -// // let res = self.tcp_stream.read(buf); -// // println!("buf: {:?}, total in: {:?}, total out: {:?}", buf, self.tcp_stream.total_in(), self.tcp_stream.total_out()); -// // return res; -// } -// } -// -// impl std::io::Write for Client { -// fn write(&mut self, buf: &[u8]) -> Result<usize, Error> { -// let mut cbuf = Vec::with_capacity(buf.len()); -// self.encoder.compress_vec(buf, &mut cbuf, FlushCompress::Finish)?; -// return self.tcp_stream.write(&cbuf); -// } -// -// fn flush(&mut self) -> Result<(), Error> { -// return self.tcp_stream.flush(); -// } -// } - -pub async fn write_to_stream(client: &mut Client, buf: &[u8]) -> Result<usize, Error> { - let mut cbuf = vec![0; buf.len()]; - let before_in = client.encoder.total_in(); - let before_out = client.encoder.total_out(); - client.encoder.compress(buf, &mut cbuf, FlushCompress::Full)?; - let after_in = client.encoder.total_in(); - let after_out = client.encoder.total_out(); - println!("out {:?} - {:?}", after_out, before_out); - cbuf.truncate((after_out - before_out).try_into()?); - println!("cbuf {:?}", cbuf); - let i = client.stream.write(&cbuf).await?; - return Ok(i); } pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> { - use crate::protocol::primitive::{Variant, StringList}; use crate::protocol::message::ClientLogin; use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap}; use crate::protocol::error::ProtocolError; @@ -211,7 +103,7 @@ pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), match msgtype { "ClientInitAck" => { let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()}; - write_to_stream(client, &login.serialize()?).await?; + client.stream.send(login.serialize()?).await?; }, "ClientInitReject" => { println!("init failed: {:?}", res) }, "ClientLoginAck" => { println!("login done: {:?}", res) }, diff --git a/src/main.rs b/src/main.rs index d496270..5c34bae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,8 @@ mod consts; + +#[cfg(features = "client")] mod client; + mod protocol; #[macro_use] @@ -11,10 +14,6 @@ extern crate failure; #[cfg(test)] mod tests; -//use util::Hex; -use protocol::primitive::{String, StringList}; -use protocol::message::{ClientInit}; - use failure::Error; #[tokio::main] @@ -24,21 +23,10 @@ async fn main() -> Result<(), Error> { "localhost", 4242, false, - true, + false, ).await.unwrap(); - let mut features = StringList::new(); - features.push("SynchronizedMarkerLine".to_string()); - features.push("Authenticators".to_string()); - features.push("ExtendedFeatures".to_string()); - let client_init = ClientInit { - client_version:String::from("Rust 0.0.0"), - client_date: String::from("1579009211"), - feature_list: features, - client_features: 0x00008000, - }; - - client.handler().await?; + client.run().await; // client.login("audron", "audron", client_init); Ok(()) diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs new file mode 100644 index 0000000..b8296aa --- /dev/null +++ b/src/protocol/frame/mod.rs @@ -0,0 +1,405 @@ +use std::error::Error as StdError; +use std::io::{self, Cursor}; +use std::convert::TryInto; +use std::fmt; + +use bytes::{Buf, BufMut, BytesMut}; + +use tokio::io::{AsyncRead, AsyncWrite}; + +use tokio_util::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite}; + +use flate2::Compress; +use flate2::Decompress; +use flate2::Compression; +use flate2::FlushCompress; +use flate2::FlushDecompress; + +#[derive(Debug, Clone, Copy)] +pub struct Builder { + // Maximum frame length + compression: bool, + compression_level: Compression, + + // Maximum frame length + max_frame_len: usize, +} + +// An error when the number of bytes read is more than max frame length. +pub struct QuasselCodecError { + _priv: (), +} + +#[derive(Debug)] +pub struct QuasselCodec { + builder: Builder, + state: DecodeState, + comp: Compress, + decomp: Decompress, +} + +#[derive(Debug, Clone, Copy)] +enum DecodeState { + Head, + Data(usize), +} + +impl QuasselCodec { + // Creates a new quassel codec with default values + pub fn new() -> Self { + Self { + builder: Builder::new(), + state: DecodeState::Head, + comp: Compress::new(Compression::default(), true), + decomp: Decompress::new(true), + } + } + + /// Creates a new quassel codec builder with default configuration + /// values. + pub fn builder() -> Builder { + Builder::new() + } + + pub fn compression(&self) -> bool { + self.builder.compression + } + + pub fn compression_level(&self) -> Compression { + self.builder.compression_level + } + + pub fn set_compression(&mut self, val: bool) { + self.builder.compression(val); + } + + pub fn set_compression_level(&mut self, val: Compression) { + self.builder.compression_level(val); + } + + fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> { +// let head_len = self.builder.num_head_bytes(); + let field_len = 4; + + if src.len() < field_len { + // Not enough data + return Ok(None); + } + + let n = { + let mut src = Cursor::new(&mut *src); + + let n = src.get_uint(field_len); + + if n > self.builder.max_frame_len as u64 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + QuasselCodecError { _priv: () }, + )); + } + + // The check above ensures there is no overflow + let n = n as usize; + n + }; + + // Strip header + let _ = src.split_to(4); + + // Ensure that the buffer has enough space to read the incoming + // payload + src.reserve(n); + + Ok(Some(n)) + } + + fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> { + // At this point, the buffer has already had the required capacity + // reserved. All there is to do is read. + if src.len() < n { + return Ok(None); + } + + Ok(Some(src.split_to(n))) + } +} + + +impl Decoder for QuasselCodec { + type Item = BytesMut; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> { + let mut buf = vec![0; src.len() * 2]; + + println!("src: {:?}", &src[..]); + + if self.builder.compression == true { + let before_in = self.decomp.total_in(); + let before_out = self.decomp.total_out(); + self.decomp.decompress(&src, &mut buf, FlushDecompress::None)?; + let after_in = self.decomp.total_in(); + let after_out = self.decomp.total_out(); + + buf.truncate((after_out - before_out).try_into().unwrap()); + } else { + buf = src.to_vec(); + } + + let buf = &mut BytesMut::from(&buf[..]); + + let n = match self.state { + DecodeState::Head => match self.decode_head(buf)? { + Some(n) => { + self.state = DecodeState::Data(n); + n + } + None => return Ok(None), + }, + DecodeState::Data(n) => n, + }; + + match self.decode_data(n, buf)? { + Some(data) => { + // Update the decode state + self.state = DecodeState::Head; + + // Make sure the buffer has enough space to read the next head + buf.reserve(4); + + Ok(Some(data)) + } + None => Ok(None), + } + } +} + +impl Encoder for QuasselCodec { + type Item = Vec<u8>; + type Error = io::Error; + + fn encode(&mut self, data: Vec<u8>, dst: &mut BytesMut) -> Result<(), io::Error> { + let buf = &mut BytesMut::new(); + + let n = (&data).len(); + + if n > self.builder.max_frame_len { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + QuasselCodecError { _priv: () }, + )); + } + + // Reserve capacity in the destination buffer to fit the frame and + // length field (plus adjustment). + buf.reserve(4 + n); + + buf.put_uint(n as u64, 4); + + // Write the frame to the buffer + buf.extend_from_slice(&data[..]); + + if self.builder.compression { + let mut cbuf: Vec<u8> = vec![0; 4+n]; + let before_in = self.comp.total_in(); + let before_out = self.comp.total_out(); + self.comp.compress(buf, &mut cbuf, FlushCompress::Full)?; + let after_in = self.comp.total_in(); + let after_out = self.comp.total_out(); + + cbuf.truncate((after_out - before_out).try_into().unwrap()); + *dst = BytesMut::from(&cbuf[..]); + } else { + *dst = buf.clone(); + } + + Ok(()) + } +} + +impl Default for QuasselCodec { + fn default() -> Self { + Self::new() + } +} + + +// ===== impl Builder ===== + +impl Builder { + /// Creates a new length delimited codec builder with default configuration + /// values. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::AsyncRead; + /// use tokio_util::codec::QuasselCodec; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// QuasselCodec::builder() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new() -> Builder { + Builder { + compression: false, + compression_level: Compression::default(), + max_frame_len: 64 * 1024 * 1024, + } + } + + pub fn compression(&mut self, val: bool) -> &mut Self { + self.compression = val; + self + } + + pub fn compression_level(&mut self, val: Compression) -> &mut Self { + self.compression_level = val; + self + } + + /// Sets the max frame length + /// + /// This configuration option applies to both encoding and decoding. The + /// default value is 8MB. + /// + /// When decoding, the length field read from the byte stream is checked + /// against this setting **before** any adjustments are applied. When + /// encoding, the length of the submitted payload is checked against this + /// setting. + /// + /// When frames exceed the max length, an `io::Error` with the custom value + /// of the `QuasselCodecError` type will be returned. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::AsyncRead; + /// use tokio_util::codec::QuasselCodec; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// QuasselCodec::builder() + /// .max_frame_length(8 * 1024) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn max_frame_length(&mut self, val: usize) -> &mut Self { + self.max_frame_len = val; + self + } + + /// Create a configured length delimited `QuasselCodec` + /// + /// # Examples + /// + /// ``` + /// use tokio_util::codec::QuasselCodec; + /// # pub fn main() { + /// QuasselCodec::builder() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_codec(); + /// # } + /// ``` + pub fn new_codec(&self) -> QuasselCodec { + QuasselCodec { + builder: *self, + state: DecodeState::Head, + comp: Compress::new(self.compression_level, true), + decomp: Decompress::new(true), + } + } + + /// Create a configured length delimited `FramedRead` + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::AsyncRead; + /// use tokio_util::codec::QuasselCodec; + /// + /// # fn bind_read<T: AsyncRead>(io: T) { + /// QuasselCodec::builder() + /// .length_field_offset(0) + /// .length_field_length(2) + /// .length_adjustment(0) + /// .num_skip(0) + /// .new_read(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, QuasselCodec> + where + T: AsyncRead, + { + FramedRead::new(upstream, self.new_codec()) + } + + /// Create a configured length delimited `FramedWrite` + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::AsyncWrite; + /// # use tokio_util::codec::QuasselCodec; + /// # fn write_frame<T: AsyncWrite>(io: T) { + /// QuasselCodec::builder() + /// .length_field_length(2) + /// .new_write(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, QuasselCodec> + where + T: AsyncWrite, + { + FramedWrite::new(inner, self.new_codec()) + } + + /// Create a configured length delimited `Framed` + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use tokio_util::codec::QuasselCodec; + /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) { + /// # let _ = + /// QuasselCodec::builder() + /// .length_field_length(2) + /// .new_framed(io); + /// # } + /// # pub fn main() {} + /// ``` + pub fn new_framed<T>(&self, inner: T) -> Framed<T, QuasselCodec> + where + T: AsyncRead + AsyncWrite, + { + Framed::new(inner, self.new_codec()) + } +} + +// ===== impl LengthDelimitedCodecError ===== + +impl fmt::Debug for QuasselCodecError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("QuasselCodecError").finish() + } +} + +impl fmt::Display for QuasselCodecError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("frame size too big") + } +} + +impl StdError for QuasselCodecError {} diff --git a/src/protocol/message/handshake/types.rs b/src/protocol/message/handshake/types.rs index 643b376..0c70914 100644 --- a/src/protocol/message/handshake/types.rs +++ b/src/protocol/message/handshake/types.rs @@ -37,10 +37,8 @@ impl HandshakeSerialize for VariantMap { res.extend(v.serialize()?); } - util::insert_bytes(0, &mut res, &mut [0, 0, 0, 10]); - - let len: i32 = res.len().try_into().unwrap(); - util::insert_bytes(0, &mut res, &mut ((len).to_be_bytes())); + let len: i32 = (self.len() * 2).try_into().unwrap(); + util::insert_bytes(0, &mut res, &mut (len).to_be_bytes()); return Ok(res); } @@ -50,11 +48,10 @@ impl HandshakeDeserialize for VariantMap { fn parse(b: &[u8]) -> Result<(usize, Self), Error> { let (_, len) = i32::parse(&b[0..4])?; - let mut pos: usize = 8; + let mut pos: usize = 4; let mut map = VariantMap::new(); - let ulen: usize = len as usize; - loop { - if (pos) >= ulen { break; } + + for _ in 0..(len / 2) { let (nlen, name) = Variant::parse(&b[pos..])?; pos += nlen; @@ -76,10 +73,12 @@ impl HandshakeQRead for VariantMap { fn read<T: Read>(s: &mut T, b: &mut [u8]) -> Result<usize, Error> { s.read(&mut b[0..4])?; let (_, len) = i32::parse(&b[0..4])?; - let ulen = len as usize; - // Read the 00 00 00 0a VariantType bytes and discard - s.read(&mut b[4..(ulen + 4)])?; + let mut pos = 4; + for _ in 0..(len / 2) { + pos += Variant::read(s, &mut b[pos..])?; + pos += Variant::read(s, &mut b[pos..])?; + } // let mut pos = 8; // let len: usize = len as usize; @@ -89,6 +88,6 @@ impl HandshakeQRead for VariantMap { // pos += Variant::read(s, &mut b[pos..])?; // } - return Ok(ulen + 4); + return Ok(pos); } } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 8739fd8..3630fab 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,3 +1,9 @@ pub mod message; pub mod primitive; + +#[allow(dead_code)] pub mod error; + +#[allow(unused_variables, dead_code)] +#[cfg(feature = "framing")] +pub mod frame; diff --git a/src/tests/handshake_types.rs b/src/tests/handshake_types.rs index 227bdad..d9368b2 100644 --- a/src/tests/handshake_types.rs +++ b/src/tests/handshake_types.rs @@ -5,7 +5,7 @@ use crate::protocol::primitive::{Variant}; pub fn serialize_variantmap() { let mut test_variantmap = VariantMap::new(); test_variantmap.insert("Configured".to_string(), Variant::bool(true)); - let bytes = [0, 0, 0, 39, 0, 0, 0, 10, 0, 0, 0, 10, 0, + let bytes = [0, 0, 0, 2, 0, 0, 0, 10, 0, 0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100, 0, 0, 0, 1, 0, 1].to_vec(); assert_eq!( @@ -20,9 +20,7 @@ pub fn read_variantmap() { let test_bytes: Vec<u8> = vec![ // len - 0, 0, 0, 74, - // var - 0, 0, 0, 10, // 4 + 0, 0, 0, 4, // 4 // var 0, 0, 0, 10, 0, // 5 // strlen, str @@ -38,16 +36,14 @@ pub fn read_variantmap() { // extra 0, 0, 0, 1]; - let mut buf: Vec<u8> = [0; 78].to_vec(); + let mut buf: Vec<u8> = [0; 74].to_vec(); let len = VariantMap::read(&mut Cursor::new(&test_bytes), &mut buf).unwrap(); - assert_eq!(len, 78); + assert_eq!(len, 74); let result_bytes: Vec<u8> = vec![ // len - 0, 0, 0, 74, - // var - 0, 0, 0, 10, + 0, 0, 0, 4, // var 0, 0, 0, 10, 0, // strlen, str @@ -65,31 +61,32 @@ pub fn read_variantmap() { #[test] pub fn deserialize_variantmap() { - let test_bytes: &[u8] = &[0, 0, 0, 39, 0, 0, 0, 10, 0, 0, 0, 10, 0, - 0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100, - 0, 0, 0, 1, 0, 1, 0, 0, 0, 1]; + let test_bytes: &[u8] = &[0, 0, 0, 2, + 0, 0, 0, 10, 0, + 0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100, + 0, 0, 0, 1, 0, 1, + 0, 0, 0, 1]; let mut test_variantmap = VariantMap::new(); test_variantmap.insert("Configured".to_string(), Variant::bool(true)); let (len, res) = VariantMap::parse(test_bytes).unwrap(); - assert_eq!(len, 43); + assert_eq!(len, 39); assert_eq!(res, test_variantmap); } #[test] pub fn deserialize_variantmap_utf8() { - let test_bytes: &[u8] = &[0, 0, 0, 29, 0, 0, 0, 10, 0, 0, 0, 12, 0, + let test_bytes: &[u8] = &[0, 0, 0, 2, + 0, 0, 0, 12, 0, 0, 0, 0, 10, 67, 111, 110, 102, 105, 103, 117, 114, 101, 100, 0, 0, 0, 1, 0, 1, -// 0, 0, 0, 10, 67, 111, 110, 102, 105, 103, 117, 114 -// 0, 0, 0, 1, 0, 1, 0, 0, 0, 1]; let mut test_variantmap = VariantMap::new(); test_variantmap.insert("Configured".to_string(), Variant::bool(true)); let (len, res) = VariantMap::parse(test_bytes).unwrap(); - assert_eq!(len, 33); + assert_eq!(len, 29); assert_eq!(res, test_variantmap); } diff --git a/src/util.rs b/src/util.rs index 7c26ec5..33735f1 100644 --- a/src/util.rs +++ b/src/util.rs @@ -24,7 +24,7 @@ macro_rules! match_variant { } } -use crate::protocol::primitive::{Variant, String}; +use crate::protocol::primitive::{Variant}; use crate::protocol::error::ProtocolError; use failure::Error; |
