aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock155
-rw-r--r--Cargo.toml14
-rw-r--r--src/client/mod.rs156
-rw-r--r--src/main.rs22
-rw-r--r--src/protocol/frame/mod.rs405
-rw-r--r--src/protocol/message/handshake/types.rs23
-rw-r--r--src/protocol/mod.rs6
-rw-r--r--src/tests/handshake_types.rs31
-rw-r--r--src/util.rs2
9 files changed, 633 insertions, 181 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1e32783..5bada13 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -42,6 +42,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bytes"
+version = "0.4.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "bytes"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -90,8 +99,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
"miniz_oxide 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -114,11 +125,93 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "futures"
+version = "0.1.29"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "futures-core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "futures-executor"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures-task"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "futures-util"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)",
+ "proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "hermit-abi"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -158,10 +251,14 @@ name = "libquassel"
version = "0.1.0"
dependencies = [
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
"failure 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
"flate2 1.0.13 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@@ -269,6 +366,26 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "pin-utils"
+version = "0.1.0-alpha.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
+name = "proc-macro-hack"
+version = "0.5.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "proc-macro-nested"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "proc-macro2"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -364,6 +481,16 @@ dependencies = [
]
[[package]]
+name = "tokio-io"
+version = "0.1.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "tokio-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -373,6 +500,19 @@ dependencies = [
]
[[package]]
+name = "tokio-util"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "unicode-xid"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -422,6 +562,7 @@ dependencies = [
"checksum backtrace-sys 0.1.32 (registry+https://github.com/rust-lang/crates.io-index)" = "5d6575f128516de27e3ce99689419835fce9643a9b215a14d2b5b685be018491"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5"
+"checksum bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)" = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
"checksum bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "10004c15deb332055f7a4a208190aed362cf9a7c2f6ab70a305fba50e1105f38"
"checksum cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)" = "95e28fa049fda1c330bcf9d723be7663a899c4679724b34c81e9f5a326aab8cd"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
@@ -432,7 +573,16 @@ dependencies = [
"checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3"
"checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
"checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
+"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef"
+"checksum futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6f16056ecbb57525ff698bb955162d0cd03bee84e6241c27ff75c08d8ca5987"
+"checksum futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86"
"checksum futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866"
+"checksum futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231"
+"checksum futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff"
+"checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764"
+"checksum futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16"
+"checksum futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9"
+"checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76"
"checksum hermit-abi 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eff2656d88f158ce120947499e971d743c05dbcbed62e5bd2f38f1698bbc3772"
"checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
@@ -449,6 +599,9 @@ dependencies = [
"checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88"
"checksum num_cpus 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
"checksum pin-project-lite 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae"
+"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587"
+"checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5"
+"checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e"
"checksum proc-macro2 1.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3acb317c6ff86a4e579dfa00fc5e6cca91ecbb4e7eb2df0468805b674eb88548"
"checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe"
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
@@ -459,7 +612,9 @@ dependencies = [
"checksum syn 1.0.14 (registry+https://github.com/rust-lang/crates.io-index)" = "af6f3550d8dff9ef7dc34d384ac6f107e5d31c8f57d9f28e0081503f547ac8f5"
"checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545"
"checksum tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "ffa2fdcfa937b20cb3c822a635ceecd5fc1a27a6a474527e5516aa24b8c8820a"
+"checksum tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "5090db468dad16e1a7a54c8c67280c5e4b544f3d3e018f0b913b400261f85926"
"checksum tokio-macros 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "50a61f268a3db2acee8dcab514efc813dc6dbe8a00e86076f935f94304b59a7a"
+"checksum tokio-util 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "571da51182ec208780505a32528fc5512a8fe1443ab960b3f2f3ef093cd16930"
"checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
"checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
diff --git a/Cargo.toml b/Cargo.toml
index e853be2..82d22d4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,7 +8,17 @@ edition = "2018"
[dependencies]
log = "0.4"
+bytes = "0.5"
byteorder = "1.3.2"
-flate2 = "1.0"
-tokio = { version = "0.2", features = ["full"] }
+flate2 = { version = "1.0", features = ["tokio"], optional = true }
+tokio = { version = "0.2", features = ["full"], optional = true }
+tokio-util = { version = "0.2", features = ["codec"], optional = true }
failure = "0.1"
+futures-util = { version = "0.3", features = ["std"] }
+futures = "0.3"
+
+[features]
+framing = ["tokio", "tokio-util", "flate2"]
+client = ["tokio", "tokio-util", "framing", "futures-util"]
+
+default = []
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 213f6ff..4ab0601 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -1,67 +1,36 @@
//use std::io::BufWriter;
use std::result::Result;
use std::vec::Vec;
-use std::convert::TryInto;
-use std::io::Cursor;
-
-use flate2::Compress;
-use flate2::Decompress;
-use flate2::Compression;
-use flate2::FlushCompress;
-use flate2::FlushDecompress;
-use flate2::read::ZlibDecoder;
use tokio::net::TcpStream;
-use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::prelude::*;
+use tokio_util::codec::{Framed};
+use futures_util::stream::StreamExt;
+use futures::SinkExt;
+
+use crate::protocol::frame::QuasselCodec;
+
use failure::Error;
extern crate log;
// use log::{info, warn, debug};
-use crate::protocol::message;
-
-pub enum State {
- Handshake,
- Connected
-}
-
pub struct Client {
- stream: TcpStream,
- encoder: Compress,
- decoder: Decompress,
- state: State,
+ stream: Framed<TcpStream, QuasselCodec>,
pub tls: bool,
pub compression: bool,
}
impl Client {
- pub async fn handler(mut self) -> Result<(), Error> {
- loop {
- let mut buf: Vec<u8> = vec![0; 2048];
- match self.stream.read(&mut buf).await {
- Ok(n) => {
- buf.truncate(n);
- let mut cbuf: Vec<u8> = vec![0; n * 2];
-
- let before_in = self.decoder.total_in();
- let before_out = self.decoder.total_out();
- self.decoder.decompress(&buf, &mut cbuf, FlushDecompress::None)?;
- let after_in = self.decoder.total_in();
- let after_out = self.decoder.total_out();
-
- cbuf.truncate(after_out.try_into()?);
-
- match self.state {
- State::Handshake => handle_login_message(&mut self, &cbuf),
- State::Connected => handle_login_message(&mut self, &cbuf)
- }.await?;
- }
- Err(e) => { panic!(e) }
- }
- }
- }
+ pub async fn run(&mut self) {
+ // TODO while endlessly loops over same stream element
+ while let Some(msg) = self.stream.next().await {
+ println!("bing");
+ let msg = msg.unwrap();
+ handle_login_message(self, &msg).await.unwrap();
+ };
+ }
pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> {
use crate::protocol::primitive::deserialize::Deserialize;
@@ -94,11 +63,13 @@ impl Client {
let (_, val) = ConnAck::parse(&buf).unwrap();
println!("Received: {:?}", val);
+ let codec = QuasselCodec::builder()
+ .compression(compression)
+ .new_codec();
+ let stream = Framed::new(s, codec);
+
let mut client = Client {
- stream: s,
- state: State::Handshake,
- encoder: Compress::new(Compression::best(), true),
- decoder: Decompress::new(true),
+ stream: stream,
tls: tls,
compression: compression,
};
@@ -113,93 +84,14 @@ impl Client {
feature_list: features,
client_features: 0x00008000,
};
- write_to_stream(&mut client, &client_init.serialize()?).await?;
+
+ client.stream.send(client_init.serialize()?).await?;
return Ok(client);
}
-
-// pub fn login(&mut self, user: &'static str, pass: &'static str, client: message::ClientInit) {
-// use crate::protocol::message::handshake::{HandshakeDeserialize, HandshakeSerialize, HandshakeQRead, VariantMap};
-// use crate::protocol::message::handshake::{ClientInitAck, ClientLogin, ClientLoginAck, SessionInit};
-//
-// self.write(&client.serialize().unwrap()).unwrap();
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let res = ClientInitAck::parse(&buf).unwrap();
-// println!("res: {:?}", res);
-//
-// let login = ClientLogin {user: user.to_string(), password: pass.to_string()};
-// self.write(&login.serialize().unwrap()).unwrap();
-// println!("res: {:?}", res);
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let _res = ClientLoginAck::parse(&buf).unwrap();
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let res = SessionInit::parse(&buf).unwrap();
-//
-// println!("res: {:?}", res);
-// }
-}
-
-// impl std::io::Read for Client {
-// fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
-// let mut cbuf = [0; 2048].to_vec();
-// let read_bytes = self.tcp_stream.read(&mut cbuf)?;
-// println!("read bytes: {:?}", read_bytes);
-// cbuf.truncate(read_bytes);
-// println!("cbuf: {:?}", &cbuf[0..]);
-// let before_in = self.decoder.total_in();
-// let before_out = self.decoder.total_out();
-// self.decoder.decompress(&cbuf, buf, FlushDecompress::None)?;
-// let after_in = self.decoder.total_in();
-// let after_out = self.decoder.total_out();
-//
-// println!("in: {:?} / {:?}\nout: {:?} / {:?}", before_in, after_in, before_out, after_out);
-//
-// println!("buf: {:?}", buf);
-// return Ok(((after_in - after_out)).try_into().unwrap());
-// //
-// // let res = self.tcp_stream.read(buf);
-// // println!("buf: {:?}, total in: {:?}, total out: {:?}", buf, self.tcp_stream.total_in(), self.tcp_stream.total_out());
-// // return res;
-// }
-// }
-//
-// impl std::io::Write for Client {
-// fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
-// let mut cbuf = Vec::with_capacity(buf.len());
-// self.encoder.compress_vec(buf, &mut cbuf, FlushCompress::Finish)?;
-// return self.tcp_stream.write(&cbuf);
-// }
-//
-// fn flush(&mut self) -> Result<(), Error> {
-// return self.tcp_stream.flush();
-// }
-// }
-
-pub async fn write_to_stream(client: &mut Client, buf: &[u8]) -> Result<usize, Error> {
- let mut cbuf = vec![0; buf.len()];
- let before_in = client.encoder.total_in();
- let before_out = client.encoder.total_out();
- client.encoder.compress(buf, &mut cbuf, FlushCompress::Full)?;
- let after_in = client.encoder.total_in();
- let after_out = client.encoder.total_out();
- println!("out {:?} - {:?}", after_out, before_out);
- cbuf.truncate((after_out - before_out).try_into()?);
- println!("cbuf {:?}", cbuf);
- let i = client.stream.write(&cbuf).await?;
- return Ok(i);
}
pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> {
- use crate::protocol::primitive::{Variant, StringList};
use crate::protocol::message::ClientLogin;
use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap};
use crate::protocol::error::ProtocolError;
@@ -211,7 +103,7 @@ pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(),
match msgtype {
"ClientInitAck" => {
let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()};
- write_to_stream(client, &login.serialize()?).await?;
+ client.stream.send(login.serialize()?).await?;
},
"ClientInitReject" => { println!("init failed: {:?}", res) },
"ClientLoginAck" => { println!("login done: {:?}", res) },
diff --git a/src/main.rs b/src/main.rs
index d496270..5c34bae 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,8 @@
mod consts;
+
+#[cfg(features = "client")]
mod client;
+
mod protocol;
#[macro_use]
@@ -11,10 +14,6 @@ extern crate failure;
#[cfg(test)]
mod tests;
-//use util::Hex;
-use protocol::primitive::{String, StringList};
-use protocol::message::{ClientInit};
-
use failure::Error;
#[tokio::main]
@@ -24,21 +23,10 @@ async fn main() -> Result<(), Error> {
"localhost",
4242,
false,
- true,
+ false,
).await.unwrap();
- let mut features = StringList::new();
- features.push("SynchronizedMarkerLine".to_string());
- features.push("Authenticators".to_string());
- features.push("ExtendedFeatures".to_string());
- let client_init = ClientInit {
- client_version:String::from("Rust 0.0.0"),
- client_date: String::from("1579009211"),
- feature_list: features,
- client_features: 0x00008000,
- };
-
- client.handler().await?;
+ client.run().await;
// client.login("audron", "audron", client_init);
Ok(())
diff --git a/src/protocol/frame/mod.rs b/src/protocol/frame/mod.rs
new file mode 100644
index 0000000..b8296aa
--- /dev/null
+++ b/src/protocol/frame/mod.rs
@@ -0,0 +1,405 @@
+use std::error::Error as StdError;
+use std::io::{self, Cursor};
+use std::convert::TryInto;
+use std::fmt;
+
+use bytes::{Buf, BufMut, BytesMut};
+
+use tokio::io::{AsyncRead, AsyncWrite};
+
+use tokio_util::codec::{Decoder, Encoder, Framed, FramedRead, FramedWrite};
+
+use flate2::Compress;
+use flate2::Decompress;
+use flate2::Compression;
+use flate2::FlushCompress;
+use flate2::FlushDecompress;
+
+#[derive(Debug, Clone, Copy)]
+pub struct Builder {
+ // Maximum frame length
+ compression: bool,
+ compression_level: Compression,
+
+ // Maximum frame length
+ max_frame_len: usize,
+}
+
+// An error when the number of bytes read is more than max frame length.
+pub struct QuasselCodecError {
+ _priv: (),
+}
+
+#[derive(Debug)]
+pub struct QuasselCodec {
+ builder: Builder,
+ state: DecodeState,
+ comp: Compress,
+ decomp: Decompress,
+}
+
+#[derive(Debug, Clone, Copy)]
+enum DecodeState {
+ Head,
+ Data(usize),
+}
+
+impl QuasselCodec {
+ // Creates a new quassel codec with default values
+ pub fn new() -> Self {
+ Self {
+ builder: Builder::new(),
+ state: DecodeState::Head,
+ comp: Compress::new(Compression::default(), true),
+ decomp: Decompress::new(true),
+ }
+ }
+
+ /// Creates a new quassel codec builder with default configuration
+ /// values.
+ pub fn builder() -> Builder {
+ Builder::new()
+ }
+
+ pub fn compression(&self) -> bool {
+ self.builder.compression
+ }
+
+ pub fn compression_level(&self) -> Compression {
+ self.builder.compression_level
+ }
+
+ pub fn set_compression(&mut self, val: bool) {
+ self.builder.compression(val);
+ }
+
+ pub fn set_compression_level(&mut self, val: Compression) {
+ self.builder.compression_level(val);
+ }
+
+ fn decode_head(&mut self, src: &mut BytesMut) -> io::Result<Option<usize>> {
+// let head_len = self.builder.num_head_bytes();
+ let field_len = 4;
+
+ if src.len() < field_len {
+ // Not enough data
+ return Ok(None);
+ }
+
+ let n = {
+ let mut src = Cursor::new(&mut *src);
+
+ let n = src.get_uint(field_len);
+
+ if n > self.builder.max_frame_len as u64 {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ QuasselCodecError { _priv: () },
+ ));
+ }
+
+ // The check above ensures there is no overflow
+ let n = n as usize;
+ n
+ };
+
+ // Strip header
+ let _ = src.split_to(4);
+
+ // Ensure that the buffer has enough space to read the incoming
+ // payload
+ src.reserve(n);
+
+ Ok(Some(n))
+ }
+
+ fn decode_data(&self, n: usize, src: &mut BytesMut) -> io::Result<Option<BytesMut>> {
+ // At this point, the buffer has already had the required capacity
+ // reserved. All there is to do is read.
+ if src.len() < n {
+ return Ok(None);
+ }
+
+ Ok(Some(src.split_to(n)))
+ }
+}
+
+
+impl Decoder for QuasselCodec {
+ type Item = BytesMut;
+ type Error = io::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BytesMut>, io::Error> {
+ let mut buf = vec![0; src.len() * 2];
+
+ println!("src: {:?}", &src[..]);
+
+ if self.builder.compression == true {
+ let before_in = self.decomp.total_in();
+ let before_out = self.decomp.total_out();
+ self.decomp.decompress(&src, &mut buf, FlushDecompress::None)?;
+ let after_in = self.decomp.total_in();
+ let after_out = self.decomp.total_out();
+
+ buf.truncate((after_out - before_out).try_into().unwrap());
+ } else {
+ buf = src.to_vec();
+ }
+
+ let buf = &mut BytesMut::from(&buf[..]);
+
+ let n = match self.state {
+ DecodeState::Head => match self.decode_head(buf)? {
+ Some(n) => {
+ self.state = DecodeState::Data(n);
+ n
+ }
+ None => return Ok(None),
+ },
+ DecodeState::Data(n) => n,
+ };
+
+ match self.decode_data(n, buf)? {
+ Some(data) => {
+ // Update the decode state
+ self.state = DecodeState::Head;
+
+ // Make sure the buffer has enough space to read the next head
+ buf.reserve(4);
+
+ Ok(Some(data))
+ }
+ None => Ok(None),
+ }
+ }
+}
+
+impl Encoder for QuasselCodec {
+ type Item = Vec<u8>;
+ type Error = io::Error;
+
+ fn encode(&mut self, data: Vec<u8>, dst: &mut BytesMut) -> Result<(), io::Error> {
+ let buf = &mut BytesMut::new();
+
+ let n = (&data).len();
+
+ if n > self.builder.max_frame_len {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ QuasselCodecError { _priv: () },
+ ));
+ }
+
+ // Reserve capacity in the destination buffer to fit the frame and
+ // length field (plus adjustment).
+ buf.reserve(4 + n);
+
+ buf.put_uint(n as u64, 4);
+
+ // Write the frame to the buffer
+ buf.extend_from_slice(&data[..]);
+
+ if self.builder.compression {
+ let mut cbuf: Vec<u8> = vec![0; 4+n];
+ let before_in = self.comp.total_in();
+ let before_out = self.comp.total_out();
+ self.comp.compress(buf, &mut cbuf, FlushCompress::Full)?;
+ let after_in = self.comp.total_in();
+ let after_out = self.comp.total_out();
+
+ cbuf.truncate((after_out - before_out).try_into().unwrap());
+ *dst = BytesMut::from(&cbuf[..]);
+ } else {
+ *dst = buf.clone();
+ }
+
+ Ok(())
+ }
+}
+
+impl Default for QuasselCodec {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+
+// ===== impl Builder =====
+
+impl Builder {
+ /// Creates a new length delimited codec builder with default configuration
+ /// values.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::QuasselCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// QuasselCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_length(2)
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new() -> Builder {
+ Builder {
+ compression: false,
+ compression_level: Compression::default(),
+ max_frame_len: 64 * 1024 * 1024,
+ }
+ }
+
+ pub fn compression(&mut self, val: bool) -> &mut Self {
+ self.compression = val;
+ self
+ }
+
+ pub fn compression_level(&mut self, val: Compression) -> &mut Self {
+ self.compression_level = val;
+ self
+ }
+
+ /// Sets the max frame length
+ ///
+ /// This configuration option applies to both encoding and decoding. The
+ /// default value is 8MB.
+ ///
+ /// When decoding, the length field read from the byte stream is checked
+ /// against this setting **before** any adjustments are applied. When
+ /// encoding, the length of the submitted payload is checked against this
+ /// setting.
+ ///
+ /// When frames exceed the max length, an `io::Error` with the custom value
+ /// of the `QuasselCodecError` type will be returned.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::QuasselCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// QuasselCodec::builder()
+ /// .max_frame_length(8 * 1024)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn max_frame_length(&mut self, val: usize) -> &mut Self {
+ self.max_frame_len = val;
+ self
+ }
+
+ /// Create a configured length delimited `QuasselCodec`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use tokio_util::codec::QuasselCodec;
+ /// # pub fn main() {
+ /// QuasselCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_length(2)
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_codec();
+ /// # }
+ /// ```
+ pub fn new_codec(&self) -> QuasselCodec {
+ QuasselCodec {
+ builder: *self,
+ state: DecodeState::Head,
+ comp: Compress::new(self.compression_level, true),
+ decomp: Decompress::new(true),
+ }
+ }
+
+ /// Create a configured length delimited `FramedRead`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncRead;
+ /// use tokio_util::codec::QuasselCodec;
+ ///
+ /// # fn bind_read<T: AsyncRead>(io: T) {
+ /// QuasselCodec::builder()
+ /// .length_field_offset(0)
+ /// .length_field_length(2)
+ /// .length_adjustment(0)
+ /// .num_skip(0)
+ /// .new_read(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_read<T>(&self, upstream: T) -> FramedRead<T, QuasselCodec>
+ where
+ T: AsyncRead,
+ {
+ FramedRead::new(upstream, self.new_codec())
+ }
+
+ /// Create a configured length delimited `FramedWrite`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::AsyncWrite;
+ /// # use tokio_util::codec::QuasselCodec;
+ /// # fn write_frame<T: AsyncWrite>(io: T) {
+ /// QuasselCodec::builder()
+ /// .length_field_length(2)
+ /// .new_write(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_write<T>(&self, inner: T) -> FramedWrite<T, QuasselCodec>
+ where
+ T: AsyncWrite,
+ {
+ FramedWrite::new(inner, self.new_codec())
+ }
+
+ /// Create a configured length delimited `Framed`
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # use tokio::io::{AsyncRead, AsyncWrite};
+ /// # use tokio_util::codec::QuasselCodec;
+ /// # fn write_frame<T: AsyncRead + AsyncWrite>(io: T) {
+ /// # let _ =
+ /// QuasselCodec::builder()
+ /// .length_field_length(2)
+ /// .new_framed(io);
+ /// # }
+ /// # pub fn main() {}
+ /// ```
+ pub fn new_framed<T>(&self, inner: T) -> Framed<T, QuasselCodec>
+ where
+ T: AsyncRead + AsyncWrite,
+ {
+ Framed::new(inner, self.new_codec())
+ }
+}
+
+// ===== impl LengthDelimitedCodecError =====
+
+impl fmt::Debug for QuasselCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("QuasselCodecError").finish()
+ }
+}
+
+impl fmt::Display for QuasselCodecError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.write_str("frame size too big")
+ }
+}
+
+impl StdError for QuasselCodecError {}
diff --git a/src/protocol/message/handshake/types.rs b/src/protocol/message/handshake/types.rs
index 643b376..0c70914 100644
--- a/src/protocol/message/handshake/types.rs
+++ b/src/protocol/message/handshake/types.rs
@@ -37,10 +37,8 @@ impl HandshakeSerialize for VariantMap {
res.extend(v.serialize()?);
}
- util::insert_bytes(0, &mut res, &mut [0, 0, 0, 10]);
-
- let len: i32 = res.len().try_into().unwrap();
- util::insert_bytes(0, &mut res, &mut ((len).to_be_bytes()));
+ let len: i32 = (self.len() * 2).try_into().unwrap();
+ util::insert_bytes(0, &mut res, &mut (len).to_be_bytes());
return Ok(res);
}
@@ -50,11 +48,10 @@ impl HandshakeDeserialize for VariantMap {
fn parse(b: &[u8]) -> Result<(usize, Self), Error> {
let (_, len) = i32::parse(&b[0..4])?;
- let mut pos: usize = 8;
+ let mut pos: usize = 4;
let mut map = VariantMap::new();
- let ulen: usize = len as usize;
- loop {
- if (pos) >= ulen { break; }
+
+ for _ in 0..(len / 2) {
let (nlen, name) = Variant::parse(&b[pos..])?;
pos += nlen;
@@ -76,10 +73,12 @@ impl HandshakeQRead for VariantMap {
fn read<T: Read>(s: &mut T, b: &mut [u8]) -> Result<usize, Error> {
s.read(&mut b[0..4])?;
let (_, len) = i32::parse(&b[0..4])?;
- let ulen = len as usize;
- // Read the 00 00 00 0a VariantType bytes and discard
- s.read(&mut b[4..(ulen + 4)])?;
+ let mut pos = 4;
+ for _ in 0..(len / 2) {
+ pos += Variant::read(s, &mut b[pos..])?;
+ pos += Variant::read(s, &mut b[pos..])?;
+ }
// let mut pos = 8;
// let len: usize = len as usize;
@@ -89,6 +88,6 @@ impl HandshakeQRead for VariantMap {
// pos += Variant::read(s, &mut b[pos..])?;
// }
- return Ok(ulen + 4);
+ return Ok(pos);
}
}
diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs
index 8739fd8..3630fab 100644
--- a/src/protocol/mod.rs
+++ b/src/protocol/mod.rs
@@ -1,3 +1,9 @@
pub mod message;
pub mod primitive;
+
+#[allow(dead_code)]
pub mod error;
+
+#[allow(unused_variables, dead_code)]
+#[cfg(feature = "framing")]
+pub mod frame;
diff --git a/src/tests/handshake_types.rs b/src/tests/handshake_types.rs
index 227bdad..d9368b2 100644
--- a/src/tests/handshake_types.rs
+++ b/src/tests/handshake_types.rs
@@ -5,7 +5,7 @@ use crate::protocol::primitive::{Variant};
pub fn serialize_variantmap() {
let mut test_variantmap = VariantMap::new();
test_variantmap.insert("Configured".to_string(), Variant::bool(true));
- let bytes = [0, 0, 0, 39, 0, 0, 0, 10, 0, 0, 0, 10, 0,
+ let bytes = [0, 0, 0, 2, 0, 0, 0, 10, 0,
0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100,
0, 0, 0, 1, 0, 1].to_vec();
assert_eq!(
@@ -20,9 +20,7 @@ pub fn read_variantmap() {
let test_bytes: Vec<u8> = vec![
// len
- 0, 0, 0, 74,
- // var
- 0, 0, 0, 10, // 4
+ 0, 0, 0, 4, // 4
// var
0, 0, 0, 10, 0, // 5
// strlen, str
@@ -38,16 +36,14 @@ pub fn read_variantmap() {
// extra
0, 0, 0, 1];
- let mut buf: Vec<u8> = [0; 78].to_vec();
+ let mut buf: Vec<u8> = [0; 74].to_vec();
let len = VariantMap::read(&mut Cursor::new(&test_bytes), &mut buf).unwrap();
- assert_eq!(len, 78);
+ assert_eq!(len, 74);
let result_bytes: Vec<u8> = vec![
// len
- 0, 0, 0, 74,
- // var
- 0, 0, 0, 10,
+ 0, 0, 0, 4,
// var
0, 0, 0, 10, 0,
// strlen, str
@@ -65,31 +61,32 @@ pub fn read_variantmap() {
#[test]
pub fn deserialize_variantmap() {
- let test_bytes: &[u8] = &[0, 0, 0, 39, 0, 0, 0, 10, 0, 0, 0, 10, 0,
- 0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100,
- 0, 0, 0, 1, 0, 1, 0, 0, 0, 1];
+ let test_bytes: &[u8] = &[0, 0, 0, 2,
+ 0, 0, 0, 10, 0,
+ 0, 0, 0, 20, 0, 67, 0, 111, 0, 110, 0, 102, 0, 105, 0, 103, 0, 117, 0, 114, 0, 101, 0, 100,
+ 0, 0, 0, 1, 0, 1,
+ 0, 0, 0, 1];
let mut test_variantmap = VariantMap::new();
test_variantmap.insert("Configured".to_string(), Variant::bool(true));
let (len, res) = VariantMap::parse(test_bytes).unwrap();
- assert_eq!(len, 43);
+ assert_eq!(len, 39);
assert_eq!(res, test_variantmap);
}
#[test]
pub fn deserialize_variantmap_utf8() {
- let test_bytes: &[u8] = &[0, 0, 0, 29, 0, 0, 0, 10, 0, 0, 0, 12, 0,
+ let test_bytes: &[u8] = &[0, 0, 0, 2,
+ 0, 0, 0, 12, 0,
0, 0, 0, 10, 67, 111, 110, 102, 105, 103, 117, 114, 101, 100,
0, 0, 0, 1, 0, 1,
-// 0, 0, 0, 10, 67, 111, 110, 102, 105, 103, 117, 114
-// 0, 0, 0, 1, 0, 1,
0, 0, 0, 1];
let mut test_variantmap = VariantMap::new();
test_variantmap.insert("Configured".to_string(), Variant::bool(true));
let (len, res) = VariantMap::parse(test_bytes).unwrap();
- assert_eq!(len, 33);
+ assert_eq!(len, 29);
assert_eq!(res, test_variantmap);
}
diff --git a/src/util.rs b/src/util.rs
index 7c26ec5..33735f1 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -24,7 +24,7 @@ macro_rules! match_variant {
}
}
-use crate::protocol::primitive::{Variant, String};
+use crate::protocol::primitive::{Variant};
use crate::protocol::error::ProtocolError;
use failure::Error;