aboutsummaryrefslogtreecommitdiff
path: root/src/client
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2020-01-22 16:11:38 +0100
committerMax Audron <audron@cocaine.farm>2020-01-22 16:11:38 +0100
commit20da110e38ea2390bc0f44136998464fef98aefd (patch)
tree0ddfabf9aa862f398c9c4aebfad93bab4fe1a77e /src/client
parentupdate (diff)
update
Diffstat (limited to '')
-rw-r--r--src/client/mod.rs156
1 files changed, 24 insertions, 132 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 213f6ff..4ab0601 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -1,67 +1,36 @@
//use std::io::BufWriter;
use std::result::Result;
use std::vec::Vec;
-use std::convert::TryInto;
-use std::io::Cursor;
-
-use flate2::Compress;
-use flate2::Decompress;
-use flate2::Compression;
-use flate2::FlushCompress;
-use flate2::FlushDecompress;
-use flate2::read::ZlibDecoder;
use tokio::net::TcpStream;
-use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::prelude::*;
+use tokio_util::codec::{Framed};
+use futures_util::stream::StreamExt;
+use futures::SinkExt;
+
+use crate::protocol::frame::QuasselCodec;
+
use failure::Error;
extern crate log;
// use log::{info, warn, debug};
-use crate::protocol::message;
-
-pub enum State {
- Handshake,
- Connected
-}
-
pub struct Client {
- stream: TcpStream,
- encoder: Compress,
- decoder: Decompress,
- state: State,
+ stream: Framed<TcpStream, QuasselCodec>,
pub tls: bool,
pub compression: bool,
}
impl Client {
- pub async fn handler(mut self) -> Result<(), Error> {
- loop {
- let mut buf: Vec<u8> = vec![0; 2048];
- match self.stream.read(&mut buf).await {
- Ok(n) => {
- buf.truncate(n);
- let mut cbuf: Vec<u8> = vec![0; n * 2];
-
- let before_in = self.decoder.total_in();
- let before_out = self.decoder.total_out();
- self.decoder.decompress(&buf, &mut cbuf, FlushDecompress::None)?;
- let after_in = self.decoder.total_in();
- let after_out = self.decoder.total_out();
-
- cbuf.truncate(after_out.try_into()?);
-
- match self.state {
- State::Handshake => handle_login_message(&mut self, &cbuf),
- State::Connected => handle_login_message(&mut self, &cbuf)
- }.await?;
- }
- Err(e) => { panic!(e) }
- }
- }
- }
+ pub async fn run(&mut self) {
+ // TODO while endlessly loops over same stream element
+ while let Some(msg) = self.stream.next().await {
+ println!("bing");
+ let msg = msg.unwrap();
+ handle_login_message(self, &msg).await.unwrap();
+ };
+ }
pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> {
use crate::protocol::primitive::deserialize::Deserialize;
@@ -94,11 +63,13 @@ impl Client {
let (_, val) = ConnAck::parse(&buf).unwrap();
println!("Received: {:?}", val);
+ let codec = QuasselCodec::builder()
+ .compression(compression)
+ .new_codec();
+ let stream = Framed::new(s, codec);
+
let mut client = Client {
- stream: s,
- state: State::Handshake,
- encoder: Compress::new(Compression::best(), true),
- decoder: Decompress::new(true),
+ stream: stream,
tls: tls,
compression: compression,
};
@@ -113,93 +84,14 @@ impl Client {
feature_list: features,
client_features: 0x00008000,
};
- write_to_stream(&mut client, &client_init.serialize()?).await?;
+
+ client.stream.send(client_init.serialize()?).await?;
return Ok(client);
}
-
-// pub fn login(&mut self, user: &'static str, pass: &'static str, client: message::ClientInit) {
-// use crate::protocol::message::handshake::{HandshakeDeserialize, HandshakeSerialize, HandshakeQRead, VariantMap};
-// use crate::protocol::message::handshake::{ClientInitAck, ClientLogin, ClientLoginAck, SessionInit};
-//
-// self.write(&client.serialize().unwrap()).unwrap();
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let res = ClientInitAck::parse(&buf).unwrap();
-// println!("res: {:?}", res);
-//
-// let login = ClientLogin {user: user.to_string(), password: pass.to_string()};
-// self.write(&login.serialize().unwrap()).unwrap();
-// println!("res: {:?}", res);
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let _res = ClientLoginAck::parse(&buf).unwrap();
-//
-// let mut buf: Vec<u8> = [0; 2048].to_vec();
-// let len = VariantMap::read(self, &mut buf).unwrap();
-// buf.truncate(len);
-// let res = SessionInit::parse(&buf).unwrap();
-//
-// println!("res: {:?}", res);
-// }
-}
-
-// impl std::io::Read for Client {
-// fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
-// let mut cbuf = [0; 2048].to_vec();
-// let read_bytes = self.tcp_stream.read(&mut cbuf)?;
-// println!("read bytes: {:?}", read_bytes);
-// cbuf.truncate(read_bytes);
-// println!("cbuf: {:?}", &cbuf[0..]);
-// let before_in = self.decoder.total_in();
-// let before_out = self.decoder.total_out();
-// self.decoder.decompress(&cbuf, buf, FlushDecompress::None)?;
-// let after_in = self.decoder.total_in();
-// let after_out = self.decoder.total_out();
-//
-// println!("in: {:?} / {:?}\nout: {:?} / {:?}", before_in, after_in, before_out, after_out);
-//
-// println!("buf: {:?}", buf);
-// return Ok(((after_in - after_out)).try_into().unwrap());
-// //
-// // let res = self.tcp_stream.read(buf);
-// // println!("buf: {:?}, total in: {:?}, total out: {:?}", buf, self.tcp_stream.total_in(), self.tcp_stream.total_out());
-// // return res;
-// }
-// }
-//
-// impl std::io::Write for Client {
-// fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
-// let mut cbuf = Vec::with_capacity(buf.len());
-// self.encoder.compress_vec(buf, &mut cbuf, FlushCompress::Finish)?;
-// return self.tcp_stream.write(&cbuf);
-// }
-//
-// fn flush(&mut self) -> Result<(), Error> {
-// return self.tcp_stream.flush();
-// }
-// }
-
-pub async fn write_to_stream(client: &mut Client, buf: &[u8]) -> Result<usize, Error> {
- let mut cbuf = vec![0; buf.len()];
- let before_in = client.encoder.total_in();
- let before_out = client.encoder.total_out();
- client.encoder.compress(buf, &mut cbuf, FlushCompress::Full)?;
- let after_in = client.encoder.total_in();
- let after_out = client.encoder.total_out();
- println!("out {:?} - {:?}", after_out, before_out);
- cbuf.truncate((after_out - before_out).try_into()?);
- println!("cbuf {:?}", cbuf);
- let i = client.stream.write(&cbuf).await?;
- return Ok(i);
}
pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> {
- use crate::protocol::primitive::{Variant, StringList};
use crate::protocol::message::ClientLogin;
use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap};
use crate::protocol::error::ProtocolError;
@@ -211,7 +103,7 @@ pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(),
match msgtype {
"ClientInitAck" => {
let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()};
- write_to_stream(client, &login.serialize()?).await?;
+ client.stream.send(login.serialize()?).await?;
},
"ClientInitReject" => { println!("init failed: {:?}", res) },
"ClientLoginAck" => { println!("login done: {:?}", res) },