diff options
| -rw-r--r-- | Cargo.toml | 27 | ||||
| -rw-r--r-- | src/client/mod.rs | 251 | ||||
| -rw-r--r-- | src/frame/mod.rs | 3 | ||||
| -rw-r--r-- | src/lib.rs | 13 |
4 files changed, 27 insertions, 267 deletions
@@ -18,24 +18,29 @@ failure = "0.1" either = "1.5" time = "0.2" -bytes = { version = "0.5" } +default-macro = { path = "../default-macro" } + +bytes = { version = "1.0" } flate2 = { version = "1.0", features = ["tokio"], optional = true } -tokio = { version = "0.2", features = ["full"], optional = true } -tokio-util = { version = "0.2", features = ["codec"], optional = true } -tokio-tls = { version = "0.3", optional = true } -native-tls = { version = "0.2", optional = true } +tokio = { version = "1.0", features = [], optional = true } +tokio-util = { version = "0.6", features = ["codec"], optional = true } futures-util = { version = "0.3", features = ["std"], optional = true } futures = { version = "0.3", optional = true } [features] framing = ["tokio", "tokio-util", "flate2"] -client = ["tokio", "tokio-util", "tokio-tls", "native-tls", "futures", "futures-util", "framing"] default = [] +[package.metadata.docs.rs] +# document all features +all-features = true +# defines the configuration attribute `docsrs` +rustdoc-args = ["--cfg", "docsrs"] + [dev-dependencies] -futures = { version = "0.3" } -flate2 = { version = "1.0", features = ["tokio"] } -tokio = { version = "0.2", features = ["full"] } -tokio-util = { version = "0.2", features = ["codec"] } -tokio-test = { version = "0.2" } +# futures = { version = "0.3" } +# flate2 = { version = "1.0", features = ["tokio"] } +# tokio = { version = "0.3", features = ["full"] } +# tokio-util = { version = "0.5", features = ["codec"] } +# tokio-test = { version = "0.3" } diff --git a/src/client/mod.rs b/src/client/mod.rs deleted file mode 100644 index 1b6139f..0000000 --- a/src/client/mod.rs +++ /dev/null @@ -1,251 +0,0 @@ -//use std::io::BufWriter; -use std::result::Result; -use std::vec::Vec; - -use tokio::io::{AsyncRead, AsyncWrite}; -use core::marker::Unpin; -use tokio::net::TcpStream; -use tokio::prelude::*; - -use native_tls::TlsConnector; - -use tokio_tls; -use tokio_tls::TlsStream; -use tokio_util::codec::Framed; -use futures_util::stream::StreamExt; -use futures::SinkExt; - -use crate::frame::QuasselCodec; - -use failure::Error; - -use log::{trace, debug, info, error}; - -use crate::message::ConnAck; - -extern crate log; - -pub struct Client<T: AsyncRead + AsyncWrite + Unpin> { - stream: Framed<T, QuasselCodec>, - pub tls: bool, - pub compression: bool, - pub state: ClientState, - pub user: User, - pub funcs: Funcs, -} - -pub struct Funcs { - pub rpc_call: crate::message::RpcCallClient, -} - -pub struct User { - pub name: String, - pub password: String, -} - -pub enum ClientState { - Handshake, - Connected, -} - -impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> { - pub async fn run(&mut self) { - use crate::primitive::StringList; - use crate::message::ClientInit; - use crate::HandshakeSerialize; - - info!(target: "init", "Setting Features"); - - let mut features = StringList::new(); - features.push("SynchronizedMarkerLine".to_string()); - features.push("Authenticators".to_string()); - features.push("ExtendedFeatures".to_string()); - features.push("BufferActivitySync".to_string()); - let client_init = ClientInit { - client_version:String::from("Rust 0.0.0"), - client_date: String::from("1579009211"), - feature_list: features, - client_features: 0x00008000, - }; - - self.stream.send(client_init.serialize().unwrap()).await.unwrap(); - - // Start event loop - while let Some(msg) = self.stream.next().await { - let msg = msg.unwrap(); - match self.state { - ClientState::Handshake => handle_login_message(self, &msg).await.unwrap(), - ClientState::Connected => handle_message(self, &msg).await.unwrap(), - } - }; - } - - pub async fn connect(address: &'static str, port: u64, compression: bool, user: User, funcs: Funcs) -> Result<Client<TcpStream>, Error> { - let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?; - - info!(target: "init", "Establishing Connection"); - let connack = init(&mut stream, false, compression).await?; - - debug!(target: "init", "{:?}", connack); - - let codec = QuasselCodec::builder() - .compression(compression) - .new_codec(); - - let framed_stream = Framed::new(stream, codec); - - info!(target: "init", "Established Connection"); - - return Ok(Client { - stream: framed_stream, - tls: false, - compression, - state: ClientState::Handshake, - user, - funcs, - }); - } - - pub async fn connect_tls(address: &'static str, port: u64, compression: bool, user: User, funcs: Funcs) -> Result<Client<TlsStream<TcpStream>>, Error> { - let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?; - - info!(target: "init", "Establishing Connection"); - let connack = init(&mut stream, true, compression).await?; - - debug!(target: "init", "{:?}", connack); - - let codec = QuasselCodec::builder() - .compression(compression) - .new_codec(); - - let tls_connector = tokio_tls::TlsConnector::from(TlsConnector::builder().build().unwrap()); - - let tls_stream = tls_connector.connect(address, stream).await?; - - let framed_stream = Framed::new(tls_stream, codec); - - info!(target: "init", "Established Connection"); - - return Ok(Client { - stream: framed_stream, - tls: true, - compression, - state: ClientState::Handshake, - user, - funcs, - }); - } - -} - -pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> { - use crate::{HandshakeSerialize, HandshakeDeserialize}; - use crate::message::ClientLogin; - use crate::primitive::{VariantMap, Variant}; - - trace!(target: "message", "Received bytes: {:x?}", buf); - let (_, res) = VariantMap::parse(buf)?; - debug!(target: "init", "Received Messsage: {:#?}", res); - - let msgtype = match_variant!(&res["MsgType"], Variant::String); - match msgtype.as_str() { - "ClientInitAck" => { - info!(target: "init", "Initialization successfull"); - info!(target: "login", "Starting Login"); - let login = ClientLogin {user: client.user.name.clone(), password: client.user.password.clone()}; - client.stream.send(login.serialize()?).await?; - }, - "ClientInitReject" => { - error!(target: "init", "Initialization failed: {:?}", res); - }, - "ClientLoginAck" => { - info!(target: "login", "Login successfull"); - }, - "SessionInit" => { - info!(target: "login", "Session Initialization finished. Switching to Connected state"); - client.state = ClientState::Connected; - } - "ClientLoginReject" => { - error!(target: "login", "Login failed: {:?}", res); - }, - _ => { - error!(target: "client", "Error: WrongMsgType: {:#?}", res); - } - } - - return Ok(()); -} - -pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> { - use crate::message::Message; - use crate::primitive::{VariantList, Variant}; - use crate::Deserialize; - use crate::Serialize; - - trace!(target: "message", "Received bytes: {:x?}", buf); - let (_, res) = Message::parse(buf)?; - debug!(target: "init", "Received Messsage: {:#?}", res); - - match res { - Message::SyncMessage(_) => {} - Message::RpcCall(msg) => { - match msg.slot_name.as_str() { - "2displayMsg(Message)" => { - (client.funcs.rpc_call.display_message)(match_variant!(msg.params[0], Variant::Message)); - }, - _ => {}, - } - - } - Message::InitRequest(_) => {} - Message::InitData(_) => {} - Message::HeartBeat(_) => {} - Message::HeartBeatReply(_) => {} - } - - return Ok(()); -} - -// Send the initialization message to the stream -pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> { - use crate::Deserialize; - - // Buffer for our initialization - let mut init: Vec<u8> = vec![]; - - // The handshake message - let mut handshake: u32 = 0x42b33f00; - - // If TLS is enabled set the TLS bit on the handshake - if tls { - info!(target: "init", "Enabled TLS"); - handshake |= 0x01; - } - - // If COMPRESSION is enabled set the COMPRESSION bit on the handshake - if compression { - info!(target: "init", "Enabled Compression"); - handshake |= 0x02; - } - - // Select Protocol 2: Datastream - let mut proto: u32 = 0x00000002; - - // Flag proto as the last protocol - let fin: u32 = 0x80000000; - proto |= fin; - - // Add handshake and protocol to our buffer - init.extend(handshake.to_be_bytes().iter()); - init.extend(proto.to_be_bytes().iter()); - - // Send Buffer - stream.write(&init).await?; - - // Read Response - let mut buf = [0; 4]; - stream.read(&mut buf).await?; - - let (_, connack) = ConnAck::parse(&buf)?; - Ok(connack) -} diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 709d3af..cbeddb0 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -195,8 +195,7 @@ impl Decoder for QuasselCodec { } } -impl Encoder for QuasselCodec { - type Item = Vec<u8>; +impl Encoder<Vec<u8>> for QuasselCodec { type Error = io::Error; fn encode(&mut self, data: Vec<u8>, dst: &mut BytesMut) -> Result<(), io::Error> { @@ -1,25 +1,32 @@ +#![feature(external_doc)] +#![feature(doc_cfg)] +#![doc(include = "../README.md")] +#[cfg_attr(docsrs, feature(doc_cfg))] #[macro_use] mod util; -#[cfg(feature = "client")] -pub mod client; - #[cfg(test)] pub mod tests; #[macro_use] extern crate failure; +/// Quassel Structures for serialization and deserialization pub mod message; + +/// Quassels QT based primitive types that make up the more complex messages pub mod primitive; pub mod session; #[allow(dead_code)] +/// Error Types pub mod error; #[allow(unused_variables, dead_code)] #[cfg(feature = "framing")] +#[cfg_attr(docsrs, doc(cfg(feature = "framing")))] +/// Framing impl to be used with [`tokio_util::codec::Framed`] pub mod frame; use failure::Error; |
