aboutsummaryrefslogtreecommitdiff
path: root/src/client/mod.rs
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2020-04-25 19:35:29 +0200
committerMax Audron <audron@cocaine.farm>2020-04-25 19:35:29 +0200
commitc546e2ef6c69bb1c6a86093f3cc7b2dab20d6ac4 (patch)
tree5f761765863f39405a3ae6e27cb865ead6be2e38 /src/client/mod.rs
parentfinish FramedCodec (diff)
finish parsing of primitive types
Diffstat (limited to 'src/client/mod.rs')
-rw-r--r--src/client/mod.rs230
1 files changed, 169 insertions, 61 deletions
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 4ab0601..fbb5b35 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -2,10 +2,16 @@
use std::result::Result;
use std::vec::Vec;
+use tokio::io::{AsyncRead, AsyncWrite};
+use core::marker::Unpin;
use tokio::net::TcpStream;
use tokio::prelude::*;
-use tokio_util::codec::{Framed};
+use native_tls::TlsConnector;
+
+use tokio_tls;
+use tokio_tls::TlsStream;
+use tokio_util::codec::Framed;
use futures_util::stream::StreamExt;
use futures::SinkExt;
@@ -13,102 +19,204 @@ use crate::protocol::frame::QuasselCodec;
use failure::Error;
+use log::{trace, debug, info, error};
+
+use crate::protocol::message::ConnAck;
+
extern crate log;
-// use log::{info, warn, debug};
-pub struct Client {
- stream: Framed<TcpStream, QuasselCodec>,
+pub struct Client<T: AsyncRead + AsyncWrite + Unpin> {
+ stream: Framed<T, QuasselCodec>,
pub tls: bool,
pub compression: bool,
+ pub state: ClientState,
+}
+
+pub enum ClientState {
+ Handshake,
+ Connected,
}
-impl Client {
+impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
pub async fn run(&mut self) {
- // TODO while endlessly loops over same stream element
+ use crate::protocol::primitive::StringList;
+ use crate::protocol::message::handshake::ClientInit;
+ use crate::protocol::message::handshake::HandshakeSerialize;
+
+ info!(target: "init", "Setting Features");
+
+ let mut features = StringList::new();
+ features.push("SynchronizedMarkerLine".to_string());
+ features.push("Authenticators".to_string());
+ features.push("ExtendedFeatures".to_string());
+ let client_init = ClientInit {
+ client_version:String::from("Rust 0.0.0"),
+ client_date: String::from("1579009211"),
+ feature_list: features,
+ client_features: 0x00008000,
+ };
+
+ self.stream.send(client_init.serialize().unwrap()).await.unwrap();
+
+ // Start event loop
while let Some(msg) = self.stream.next().await {
- println!("bing");
let msg = msg.unwrap();
- handle_login_message(self, &msg).await.unwrap();
+ match self.state {
+ ClientState::Handshake => handle_login_message(self, &msg).await.unwrap(),
+ ClientState::Connected => handle_message(self, &msg).await.unwrap(),
+ }
};
- }
+ }
- pub async fn connect(address: &'static str, port: u64, tls: bool, compression: bool) -> Result<Client, Error> {
- use crate::protocol::primitive::deserialize::Deserialize;
- use crate::protocol::message::ConnAck;
- use crate::protocol::primitive::{StringList};
- use crate::protocol::message::ClientInit;
- use crate::protocol::message::handshake::HandshakeSerialize;
+ pub async fn connect(address: &'static str, port: u64, compression: bool) -> Result<Client<TcpStream>, Error> {
+ let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?;
- let mut s = TcpStream::connect(format!("{}:{}", address, port)).await?;
+ info!(target: "init", "Establishing Connection");
+ let connack = init(&mut stream, false, compression).await?;
- // Set Features
- let mut init: Vec<u8> = vec![];
- let mut handshake: u32 = 0x42b33f00;
- if tls {
- handshake |= 0x01;
- }
- if compression {
- handshake |= 0x02;
- }
- let mut proto: u32 = 0x00000002;
- let fin: u32 = 0x80000000;
- proto |= fin;
- init.extend(handshake.to_be_bytes().iter());
- init.extend(proto.to_be_bytes().iter());
- s.write(&init).await?;
+ debug!(target: "init", "{:?}", connack);
+ let codec = QuasselCodec::builder()
+ .compression(compression)
+ .new_codec();
- let mut buf = [0; 4];
- s.read(&mut buf).await?;
- let (_, val) = ConnAck::parse(&buf).unwrap();
- println!("Received: {:?}", val);
+ let framed_stream = Framed::new(stream, codec);
+
+ info!(target: "init", "Established Connection");
+
+ return Ok(Client {
+ stream: framed_stream,
+ tls: false,
+ compression,
+ state: ClientState::Handshake,
+ });
+ }
+
+ pub async fn connect_tls(address: &'static str, port: u64, compression: bool) -> Result<Client<TlsStream<TcpStream>>, Error> {
+ let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?;
+
+ info!(target: "init", "Establishing Connection");
+ let connack = init(&mut stream, true, compression).await?;
+
+ debug!(target: "init", "{:?}", connack);
let codec = QuasselCodec::builder()
.compression(compression)
.new_codec();
- let stream = Framed::new(s, codec);
- let mut client = Client {
- stream: stream,
- tls: tls,
- compression: compression,
- };
+ let tls_connector = tokio_tls::TlsConnector::from(TlsConnector::builder().build().unwrap());
- let mut features = StringList::new();
- features.push("SynchronizedMarkerLine".to_string());
- features.push("Authenticators".to_string());
- features.push("ExtendedFeatures".to_string());
- let client_init = ClientInit {
- client_version:String::from("Rust 0.0.0"),
- client_date: String::from("1579009211"),
- feature_list: features,
- client_features: 0x00008000,
- };
+ let tls_stream = tls_connector.connect(address, stream).await?;
- client.stream.send(client_init.serialize()?).await?;
+ let framed_stream = Framed::new(tls_stream, codec);
- return Ok(client);
+ info!(target: "init", "Established Connection");
+
+ return Ok(Client {
+ stream: framed_stream,
+ tls: true,
+ compression,
+ state: ClientState::Handshake,
+ });
}
+
}
-pub async fn handle_login_message(client: &mut Client, buf: &[u8]) -> Result<(), Error> {
+pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
use crate::protocol::message::ClientLogin;
use crate::protocol::message::handshake::{HandshakeSerialize, HandshakeDeserialize, VariantMap};
- use crate::protocol::error::ProtocolError;
use crate::util::get_msg_type;
+ trace!(target: "message", "Received bytes: {:x?}", buf);
let (_, res) = VariantMap::parse(buf)?;
- println!("res {:?}", res);
+ debug!(target: "init", "Received Messsage: {:#?}", res);
let msgtype = get_msg_type(&res["MsgType"])?;
match msgtype {
"ClientInitAck" => {
- let login = ClientLogin {user: "audron".to_string(), password: "audron".to_string()};
+ info!(target: "init", "Initialization successfull");
+ info!(target: "login", "Starting Login");
+ let login = ClientLogin {user: "audron".to_string(), password: "***REMOVED***".to_string()};
client.stream.send(login.serialize()?).await?;
},
- "ClientInitReject" => { println!("init failed: {:?}", res) },
- "ClientLoginAck" => { println!("login done: {:?}", res) },
- "ClientLoginReject" => { println!("login failed: {:?}", res)},
- _ => bail!(ProtocolError::WrongMsgType)
+ "ClientInitReject" => {
+ error!(target: "init", "Initialization failed: {:?}", res);
+ },
+ "ClientLoginAck" => {
+ info!(target: "login", "Login successfull");
+ },
+ "SessionInit" => {
+ info!(target: "message", "Received SessionInit: {:#?}", res);
+ info!(target: "login", "Session Initialization finished. Switching to Connected state");
+ client.state = ClientState::Connected;
+ }
+ "ClientLoginReject" => {
+ error!(target: "login", "Login failed: {:?}", res);
+ },
+ _ => {
+ error!(target: "client", "Error: WrongMsgType: {:#?}", res);
+ }
}
return Ok(());
}
+
+pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
+ use crate::protocol::primitive::VariantList;
+ use crate::protocol::primitive::deserialize::Deserialize;
+ use crate::protocol::primitive::serialize::Serialize;
+ use crate::util::get_msg_type;
+
+ trace!(target: "message", "Received bytes: {:x?}", buf);
+ let (_, res) = VariantList::parse(buf)?;
+ debug!(target: "init", "Received Messsage: {:#?}", res);
+ // let msgtype = get_msg_type(&res["MsgType"])?;
+ // match msgtype {
+ // _ => {
+ // error!(target: "client", "Error: WrongMsgType: {:#?}", res);
+ // }
+ // }
+ return Ok(());
+}
+
+// Send the initialization message to the stream
+pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> {
+ use crate::protocol::primitive::deserialize::Deserialize;
+
+ // Buffer for our initialization
+ let mut init: Vec<u8> = vec![];
+
+ // The handshake message
+ let mut handshake: u32 = 0x42b33f00;
+
+ // If TLS is enabled set the TLS bit on the handshake
+ if tls {
+ info!(target: "init", "Enabled TLS");
+ handshake |= 0x01;
+ }
+
+ // If COMPRESSION is enabled set the COMPRESSION bit on the handshake
+ if compression {
+ info!(target: "init", "Enabled Compression");
+ handshake |= 0x02;
+ }
+
+ // Select Protocol 2: Datastream
+ let mut proto: u32 = 0x00000002;
+
+ // Flag proto as the last protocol
+ let fin: u32 = 0x80000000;
+ proto |= fin;
+
+ // Add handshake and protocol to our buffer
+ init.extend(handshake.to_be_bytes().iter());
+ init.extend(proto.to_be_bytes().iter());
+
+ // Send Buffer
+ stream.write(&init).await?;
+
+ // Read Response
+ let mut buf = [0; 4];
+ stream.read(&mut buf).await?;
+
+ let (_, connack) = ConnAck::parse(&buf)?;
+ Ok(connack)
+}