diff options
| author | Max Audron <audron@cocaine.farm> | 2020-04-25 19:35:29 +0200 |
|---|---|---|
| committer | Max Audron <audron@cocaine.farm> | 2020-04-25 19:35:29 +0200 |
| commit | c546e2ef6c69bb1c6a86093f3cc7b2dab20d6ac4 (patch) | |
| tree | 5f761765863f39405a3ae6e27cb865ead6be2e38 /src/client/mod.rs | |
| parent | finish FramedCodec (diff) | |
finish parsing of primitive types
Diffstat (limited to 'src/client/mod.rs')
| -rw-r--r-- | src/client/mod.rs | 230 |
1 files changed, 169 insertions, 61 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs index 4ab0601..fbb5b35 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -2,10 +2,16 @@ use std::result::Result; use std::vec::Vec; +use tokio::io::{AsyncRead, AsyncWrite}; +use core::marker::Unpin; use tokio::net::TcpStream; use tokio::prelude::*; -use tokio_util::codec::{Framed}; +use native_tls::TlsConnector; + +use tokio_tls; +use tokio_tls::TlsStream; +use tokio_util::codec::Framed; use futures_util::stream::StreamExt; use futures::SinkExt; @@ -13,102 +19,204 @@ use crate::protocol::frame::QuasselCodec; use failure::Error; +use log::{trace, debug, info, error}; + +use crate::protocol::message::ConnAck; + extern crate log; -// use log::{info, warn, debug}; -pub struct Client { - stream: Framed<TcpStream, QuasselCodec>, +pub struct Client<T: AsyncRead + AsyncWrite + Unpin> { + stream: Framed<T, QuasselCodec>, pub tls: bool, pub compression: bool, + pub state: ClientState, +} + +pub enum ClientState { + Handshake, + Connected, } -impl Client { +impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> { pub async fn run(&mut self) { - // TODO while endlessly loops over same stream element + use crate::protocol::primitive::StringList; + use crate::protocol::message::handshake::ClientInit; + use crate::protocol::message::handshake::HandshakeSerialize; + + info!(target: "init", "Setting Features"); + + let mut features = StringList::new(); + features.push("SynchronizedMarkerLine".to_string()); + features.push("Authenticators".to_string()); + features.push("ExtendedFeatures".to_string()); + let client_init = ClientInit { + client_version:String::from("Rust 0.0.0"), + client_date: String::from("1579009211"), + feature_list: features, + client_features: 0x00008000, + }; + + self.stream.send(client_init.serialize().unwrap()).await.unwrap(); + + // Start event loop while let Some(msg) = self.stream.next().await { - println!("bing"); let msg = msg.unwrap(); - handle_login_message(self, &msg).await.unwrap(); + match self.state { + ClientState::Handshake => handle_login_message(self, &msg).await.unwrap(), + ClientState::Connected => handle_message(self, &msg).await.unwrap(), + } }; - } + } - pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> { - use crate::protocol::primitive::deserialize::Deserialize; - use crate::protocol::message::ConnAck; - use crate::protocol::primitive::{StringList}; - use crate::protocol::message::ClientInit; - use crate::protocol::message::handshake::HandshakeSerialize; + pub async fn connect(address: &'static str, port: u64, compression: bool) -> Result<Client<TcpStream>, Error> { + let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?; - let mut s = TcpStream::connect(format!("{}:{}", address, port)).await?; + info!(target: "init", "Establishing Connection"); + let connack = init(&mut stream, false, compression).await?; - // Set Features - let mut init: Vec<u8> = vec![]; - let mut handshake: u32 = 0x42b33f00; - if tls { - handshake |= 0x01; - } - if compression { - handshake |= 0x02; - } - let mut proto: u32 = 0x00000002; - let fin: u32 = 0x80000000; - proto |= fin; - init.extend(handshake.to_be_bytes().iter()); - init.extend(proto.to_be_bytes().iter()); - s.write(&init).await?; + debug!(target: "init", "{:?}", connack); + let codec = QuasselCodec::builder() + .compression(compression) + .new_codec(); - let mut buf = [0; 4]; - s.read(&mut buf).await?; - let (_, val) = ConnAck::parse(&buf).unwrap(); - println!("Received: {:?}", val); + let framed_stream = Framed::new(stream, codec); + + info!(target: "init", "Established Connection"); + + return Ok(Client { + stream: framed_stream, + tls: false, + compression, + state: ClientState::Handshake, + }); + } + + pub async fn connect_tls(address: &'static str, port: u64, compression: bool) -> Result<Client<TlsStream<TcpStream>>, Error> { + let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?; + + info!(target: "init", "Establishing Connection"); + let connack = init(&mut stream, true, compression).await?; + + debug!(target: "init", "{:?}", connack); let codec = QuasselCodec::builder() .compression(compression) .new_codec(); - let stream = Framed::new(s, codec); - let mut client = Client { - stream: stream, - tls: tls, - compression: compression, - }; + let tls_connector = tokio_tls::TlsConnector::from(TlsConnector::builder().build().unwrap()); - let mut features = StringList::new(); - features.push("SynchronizedMarkerLine".to_string()); - features.push("Authenticators".to_string()); - features.push("ExtendedFeatures".to_string()); - let client_init = ClientInit { - client_version:String::from("Rust 0.0.0"), - client_date: String::from("1579009211"), - feature_list: features, - client_features: 0x00008000, - }; + let tls_stream = tls_connector.connect(address, stream).await?; - client.stream.send(client_init.serialize()?).await?; + let framed_stream = Framed::new(tls_stream, codec); - return Ok(client); + info!(target: "init", "Established Connection"); + + return Ok(Client { + stream: framed_stream, + tls: true, + compression, + state: ClientState::Handshake, + }); } + } -pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> { +pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> { use crate::protocol::message::ClientLogin; use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap}; - use crate::protocol::error::ProtocolError; use crate::util::get_msg_type; + trace!(target: "message", "Received bytes: {:x?}", buf); let (_, res) = VariantMap::parse(buf)?; - println!("res {:?}", res); + debug!(target: "init", "Received Messsage: {:#?}", res); let msgtype = get_msg_type(&res["MsgType"])?; match msgtype { "ClientInitAck" => { - let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()}; + info!(target: "init", "Initialization successfull"); + info!(target: "login", "Starting Login"); + let login = ClientLogin {user: "audron".to_string(), password: "***REMOVED***".to_string()}; client.stream.send(login.serialize()?).await?; }, - "ClientInitReject" => { println!("init failed: {:?}", res) }, - "ClientLoginAck" => { println!("login done: {:?}", res) }, - "ClientLoginReject" => { println!("login failed: {:?}", res)}, - _ => bail!(ProtocolError::WrongMsgType) + "ClientInitReject" => { + error!(target: "init", "Initialization failed: {:?}", res); + }, + "ClientLoginAck" => { + info!(target: "login", "Login successfull"); + }, + "SessionInit" => { + info!(target: "message", "Received SessionInit: {:#?}", res); + info!(target: "login", "Session Initialization finished. Switching to Connected state"); + client.state = ClientState::Connected; + } + "ClientLoginReject" => { + error!(target: "login", "Login failed: {:?}", res); + }, + _ => { + error!(target: "client", "Error: WrongMsgType: {:#?}", res); + } } return Ok(()); } + +pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> { + use crate::protocol::primitive::VariantList; + use crate::protocol::primitive::deserialize::Deserialize; + use crate::protocol::primitive::serialize::Serialize; + use crate::util::get_msg_type; + + trace!(target: "message", "Received bytes: {:x?}", buf); + let (_, res) = VariantList::parse(buf)?; + debug!(target: "init", "Received Messsage: {:#?}", res); + // let msgtype = get_msg_type(&res["MsgType"])?; + // match msgtype { + // _ => { + // error!(target: "client", "Error: WrongMsgType: {:#?}", res); + // } + // } + return Ok(()); +} + +// Send the initialization message to the stream +pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> { + use crate::protocol::primitive::deserialize::Deserialize; + + // Buffer for our initialization + let mut init: Vec<u8> = vec![]; + + // The handshake message + let mut handshake: u32 = 0x42b33f00; + + // If TLS is enabled set the TLS bit on the handshake + if tls { + info!(target: "init", "Enabled TLS"); + handshake |= 0x01; + } + + // If COMPRESSION is enabled set the COMPRESSION bit on the handshake + if compression { + info!(target: "init", "Enabled Compression"); + handshake |= 0x02; + } + + // Select Protocol 2: Datastream + let mut proto: u32 = 0x00000002; + + // Flag proto as the last protocol + let fin: u32 = 0x80000000; + proto |= fin; + + // Add handshake and protocol to our buffer + init.extend(handshake.to_be_bytes().iter()); + init.extend(proto.to_be_bytes().iter()); + + // Send Buffer + stream.write(&init).await?; + + // Read Response + let mut buf = [0; 4]; + stream.read(&mut buf).await?; + + let (_, connack) = ConnAck::parse(&buf)?; + Ok(connack) +} |
