diff options
| author | Max Audron <audron@cocaine.farm> | 2020-01-22 16:11:38 +0100 |
|---|---|---|
| committer | Max Audron <audron@cocaine.farm> | 2020-01-22 16:11:38 +0100 |
| commit | 20da110e38ea2390bc0f44136998464fef98aefd (patch) | |
| tree | 0ddfabf9aa862f398c9c4aebfad93bab4fe1a77e /src/client | |
| parent | update (diff) | |
update
Diffstat (limited to '')
| -rw-r--r-- | src/client/mod.rs | 156 |
1 files changed, 24 insertions, 132 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 213f6ff..4ab0601 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,67 +1,36 @@ //use std::io::BufWriter; use std::result::Result; use std::vec::Vec; -use std::convert::TryInto; -use std::io::Cursor; - -use flate2::Compress; -use flate2::Decompress; -use flate2::Compression; -use flate2::FlushCompress; -use flate2::FlushDecompress; -use flate2::read::ZlibDecoder; use tokio::net::TcpStream; -use tokio::net::tcp::{ReadHalf, WriteHalf}; use tokio::prelude::*; +use tokio_util::codec::{Framed}; +use futures_util::stream::StreamExt; +use futures::SinkExt; + +use crate::protocol::frame::QuasselCodec; + use failure::Error; extern crate log; // use log::{info, warn, debug}; -use crate::protocol::message; - -pub enum State { - Handshake, - Connected -} - pub struct Client { - stream: TcpStream, - encoder: Compress, - decoder: Decompress, - state: State, + stream: Framed<TcpStream, QuasselCodec>, pub tls: bool, pub compression: bool, } impl Client { - pub async fn handler(mut self) -> Result<(), Error> { - loop { - let mut buf: Vec<u8> = vec![0; 2048]; - match self.stream.read(&mut buf).await { - Ok(n) => { - buf.truncate(n); - let mut cbuf: Vec<u8> = vec![0; n * 2]; - - let before_in = self.decoder.total_in(); - let before_out = self.decoder.total_out(); - self.decoder.decompress(&buf, &mut cbuf, FlushDecompress::None)?; - let after_in = self.decoder.total_in(); - let after_out = self.decoder.total_out(); - - cbuf.truncate(after_out.try_into()?); - - match self.state { - State::Handshake => handle_login_message(&mut self, &cbuf), - State::Connected => handle_login_message(&mut self, &cbuf) - }.await?; - } - Err(e) => { panic!(e) } - } - } - } + pub async fn run(&mut self) { + // TODO while endlessly loops over same stream element + while let Some(msg) = self.stream.next().await { + println!("bing"); + let msg = msg.unwrap(); + handle_login_message(self, &msg).await.unwrap(); + }; + } pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> { use crate::protocol::primitive::deserialize::Deserialize; @@ -94,11 +63,13 @@ impl Client { let (_, val) = ConnAck::parse(&buf).unwrap(); println!("Received: {:?}", val); + let codec = QuasselCodec::builder() + .compression(compression) + .new_codec(); + let stream = Framed::new(s, codec); + let mut client = Client { - stream: s, - state: State::Handshake, - encoder: Compress::new(Compression::best(), true), - decoder: Decompress::new(true), + stream: stream, tls: tls, compression: compression, }; @@ -113,93 +84,14 @@ impl Client { feature_list: features, client_features: 0x00008000, }; - write_to_stream(&mut client, &client_init.serialize()?).await?; + + client.stream.send(client_init.serialize()?).await?; return Ok(client); } - -// pub fn login(&mut self, user: &'static str, pass: &'static str, client: message::ClientInit) { -// use crate::protocol::message::handshake::{HandshakeDeserialize, HandshakeSerialize, HandshakeQRead, VariantMap}; -// use crate::protocol::message::handshake::{ClientInitAck, ClientLogin, ClientLoginAck, SessionInit}; -// -// self.write(&client.serialize().unwrap()).unwrap(); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let res = ClientInitAck::parse(&buf).unwrap(); -// println!("res: {:?}", res); -// -// let login = ClientLogin {user: user.to_string(), password: pass.to_string()}; -// self.write(&login.serialize().unwrap()).unwrap(); -// println!("res: {:?}", res); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let _res = ClientLoginAck::parse(&buf).unwrap(); -// -// let mut buf: Vec<u8> = [0; 2048].to_vec(); -// let len = VariantMap::read(self, &mut buf).unwrap(); -// buf.truncate(len); -// let res = SessionInit::parse(&buf).unwrap(); -// -// println!("res: {:?}", res); -// } -} - -// impl std::io::Read for Client { -// fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> { -// let mut cbuf = [0; 2048].to_vec(); -// let read_bytes = self.tcp_stream.read(&mut cbuf)?; -// println!("read bytes: {:?}", read_bytes); -// cbuf.truncate(read_bytes); -// println!("cbuf: {:?}", &cbuf[0..]); -// let before_in = self.decoder.total_in(); -// let before_out = self.decoder.total_out(); -// self.decoder.decompress(&cbuf, buf, FlushDecompress::None)?; -// let after_in = self.decoder.total_in(); -// let after_out = self.decoder.total_out(); -// -// println!("in: {:?} / {:?}\nout: {:?} / {:?}", before_in, after_in, before_out, after_out); -// -// println!("buf: {:?}", buf); -// return Ok(((after_in - after_out)).try_into().unwrap()); -// // -// // let res = self.tcp_stream.read(buf); -// // println!("buf: {:?}, total in: {:?}, total out: {:?}", buf, self.tcp_stream.total_in(), self.tcp_stream.total_out()); -// // return res; -// } -// } -// -// impl std::io::Write for Client { -// fn write(&mut self, buf: &[u8]) -> Result<usize, Error> { -// let mut cbuf = Vec::with_capacity(buf.len()); -// self.encoder.compress_vec(buf, &mut cbuf, FlushCompress::Finish)?; -// return self.tcp_stream.write(&cbuf); -// } -// -// fn flush(&mut self) -> Result<(), Error> { -// return self.tcp_stream.flush(); -// } -// } - -pub async fn write_to_stream(client: &mut Client, buf: &[u8]) -> Result<usize, Error> { - let mut cbuf = vec![0; buf.len()]; - let before_in = client.encoder.total_in(); - let before_out = client.encoder.total_out(); - client.encoder.compress(buf, &mut cbuf, FlushCompress::Full)?; - let after_in = client.encoder.total_in(); - let after_out = client.encoder.total_out(); - println!("out {:?} - {:?}", after_out, before_out); - cbuf.truncate((after_out - before_out).try_into()?); - println!("cbuf {:?}", cbuf); - let i = client.stream.write(&cbuf).await?; - return Ok(i); } pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> { - use crate::protocol::primitive::{Variant, StringList}; use crate::protocol::message::ClientLogin; use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap}; use crate::protocol::error::ProtocolError; @@ -211,7 +103,7 @@ pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), match msgtype { "ClientInitAck" => { let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()}; - write_to_stream(client, &login.serialize()?).await?; + client.stream.send(login.serialize()?).await?; }, "ClientInitReject" => { println!("init failed: {:?}", res) }, "ClientLoginAck" => { println!("login done: {:?}", res) }, |
