aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml27
-rw-r--r--src/client/mod.rs251
-rw-r--r--src/frame/mod.rs3
-rw-r--r--src/lib.rs13
4 files changed, 27 insertions, 267 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 2b6ccaf..4d35ab4 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,24 +18,29 @@ failure = "0.1"
either = "1.5"
time = "0.2"
-bytes = { version = "0.5" }
+default-macro = { path = "../default-macro" }
+
+bytes = { version = "1.0" }
flate2 = { version = "1.0", features = ["tokio"], optional = true }
-tokio = { version = "0.2", features = ["full"], optional = true }
-tokio-util = { version = "0.2", features = ["codec"], optional = true }
-tokio-tls = { version = "0.3", optional = true }
-native-tls = { version = "0.2", optional = true }
+tokio = { version = "1.0", features = [], optional = true }
+tokio-util = { version = "0.6", features = ["codec"], optional = true }
futures-util = { version = "0.3", features = ["std"], optional = true }
futures = { version = "0.3", optional = true }
[features]
framing = ["tokio", "tokio-util", "flate2"]
-client = ["tokio", "tokio-util", "tokio-tls", "native-tls", "futures", "futures-util", "framing"]
default = []
+[package.metadata.docs.rs]
+# document all features
+all-features = true
+# defines the configuration attribute `docsrs`
+rustdoc-args = ["--cfg", "docsrs"]
+
[dev-dependencies]
-futures = { version = "0.3" }
-flate2 = { version = "1.0", features = ["tokio"] }
-tokio = { version = "0.2", features = ["full"] }
-tokio-util = { version = "0.2", features = ["codec"] }
-tokio-test = { version = "0.2" }
+# futures = { version = "0.3" }
+# flate2 = { version = "1.0", features = ["tokio"] }
+# tokio = { version = "0.3", features = ["full"] }
+# tokio-util = { version = "0.5", features = ["codec"] }
+# tokio-test = { version = "0.3" }
diff --git a/src/client/mod.rs b/src/client/mod.rs
deleted file mode 100644
index 1b6139f..0000000
--- a/src/client/mod.rs
+++ /dev/null
@@ -1,251 +0,0 @@
-//use std::io::BufWriter;
-use std::result::Result;
-use std::vec::Vec;
-
-use tokio::io::{AsyncRead, AsyncWrite};
-use core::marker::Unpin;
-use tokio::net::TcpStream;
-use tokio::prelude::*;
-
-use native_tls::TlsConnector;
-
-use tokio_tls;
-use tokio_tls::TlsStream;
-use tokio_util::codec::Framed;
-use futures_util::stream::StreamExt;
-use futures::SinkExt;
-
-use crate::frame::QuasselCodec;
-
-use failure::Error;
-
-use log::{trace, debug, info, error};
-
-use crate::message::ConnAck;
-
-extern crate log;
-
-pub struct Client<T: AsyncRead + AsyncWrite + Unpin> {
- stream: Framed<T, QuasselCodec>,
- pub tls: bool,
- pub compression: bool,
- pub state: ClientState,
- pub user: User,
- pub funcs: Funcs,
-}
-
-pub struct Funcs {
- pub rpc_call: crate::message::RpcCallClient,
-}
-
-pub struct User {
- pub name: String,
- pub password: String,
-}
-
-pub enum ClientState {
- Handshake,
- Connected,
-}
-
-impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
- pub async fn run(&mut self) {
- use crate::primitive::StringList;
- use crate::message::ClientInit;
- use crate::HandshakeSerialize;
-
- info!(target: "init", "Setting Features");
-
- let mut features = StringList::new();
- features.push("SynchronizedMarkerLine".to_string());
- features.push("Authenticators".to_string());
- features.push("ExtendedFeatures".to_string());
- features.push("BufferActivitySync".to_string());
- let client_init = ClientInit {
- client_version:String::from("Rust 0.0.0"),
- client_date: String::from("1579009211"),
- feature_list: features,
- client_features: 0x00008000,
- };
-
- self.stream.send(client_init.serialize().unwrap()).await.unwrap();
-
- // Start event loop
- while let Some(msg) = self.stream.next().await {
- let msg = msg.unwrap();
- match self.state {
- ClientState::Handshake => handle_login_message(self, &msg).await.unwrap(),
- ClientState::Connected => handle_message(self, &msg).await.unwrap(),
- }
- };
- }
-
- pub async fn connect(address: &'static str, port: u64, compression: bool, user: User, funcs: Funcs) -> Result<Client<TcpStream>, Error> {
- let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?;
-
- info!(target: "init", "Establishing Connection");
- let connack = init(&mut stream, false, compression).await?;
-
- debug!(target: "init", "{:?}", connack);
-
- let codec = QuasselCodec::builder()
- .compression(compression)
- .new_codec();
-
- let framed_stream = Framed::new(stream, codec);
-
- info!(target: "init", "Established Connection");
-
- return Ok(Client {
- stream: framed_stream,
- tls: false,
- compression,
- state: ClientState::Handshake,
- user,
- funcs,
- });
- }
-
- pub async fn connect_tls(address: &'static str, port: u64, compression: bool, user: User, funcs: Funcs) -> Result<Client<TlsStream<TcpStream>>, Error> {
- let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?;
-
- info!(target: "init", "Establishing Connection");
- let connack = init(&mut stream, true, compression).await?;
-
- debug!(target: "init", "{:?}", connack);
-
- let codec = QuasselCodec::builder()
- .compression(compression)
- .new_codec();
-
- let tls_connector = tokio_tls::TlsConnector::from(TlsConnector::builder().build().unwrap());
-
- let tls_stream = tls_connector.connect(address, stream).await?;
-
- let framed_stream = Framed::new(tls_stream, codec);
-
- info!(target: "init", "Established Connection");
-
- return Ok(Client {
- stream: framed_stream,
- tls: true,
- compression,
- state: ClientState::Handshake,
- user,
- funcs,
- });
- }
-
-}
-
-pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
- use crate::{HandshakeSerialize, HandshakeDeserialize};
- use crate::message::ClientLogin;
- use crate::primitive::{VariantMap, Variant};
-
- trace!(target: "message", "Received bytes: {:x?}", buf);
- let (_, res) = VariantMap::parse(buf)?;
- debug!(target: "init", "Received Messsage: {:#?}", res);
-
- let msgtype = match_variant!(&res["MsgType"], Variant::String);
- match msgtype.as_str() {
- "ClientInitAck" => {
- info!(target: "init", "Initialization successfull");
- info!(target: "login", "Starting Login");
- let login = ClientLogin {user: client.user.name.clone(), password: client.user.password.clone()};
- client.stream.send(login.serialize()?).await?;
- },
- "ClientInitReject" => {
- error!(target: "init", "Initialization failed: {:?}", res);
- },
- "ClientLoginAck" => {
- info!(target: "login", "Login successfull");
- },
- "SessionInit" => {
- info!(target: "login", "Session Initialization finished. Switching to Connected state");
- client.state = ClientState::Connected;
- }
- "ClientLoginReject" => {
- error!(target: "login", "Login failed: {:?}", res);
- },
- _ => {
- error!(target: "client", "Error: WrongMsgType: {:#?}", res);
- }
- }
-
- return Ok(());
-}
-
-pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
- use crate::message::Message;
- use crate::primitive::{VariantList, Variant};
- use crate::Deserialize;
- use crate::Serialize;
-
- trace!(target: "message", "Received bytes: {:x?}", buf);
- let (_, res) = Message::parse(buf)?;
- debug!(target: "init", "Received Messsage: {:#?}", res);
-
- match res {
- Message::SyncMessage(_) => {}
- Message::RpcCall(msg) => {
- match msg.slot_name.as_str() {
- "2displayMsg(Message)" => {
- (client.funcs.rpc_call.display_message)(match_variant!(msg.params[0], Variant::Message));
- },
- _ => {},
- }
-
- }
- Message::InitRequest(_) => {}
- Message::InitData(_) => {}
- Message::HeartBeat(_) => {}
- Message::HeartBeatReply(_) => {}
- }
-
- return Ok(());
-}
-
-// Send the initialization message to the stream
-pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> {
- use crate::Deserialize;
-
- // Buffer for our initialization
- let mut init: Vec<u8> = vec![];
-
- // The handshake message
- let mut handshake: u32 = 0x42b33f00;
-
- // If TLS is enabled set the TLS bit on the handshake
- if tls {
- info!(target: "init", "Enabled TLS");
- handshake |= 0x01;
- }
-
- // If COMPRESSION is enabled set the COMPRESSION bit on the handshake
- if compression {
- info!(target: "init", "Enabled Compression");
- handshake |= 0x02;
- }
-
- // Select Protocol 2: Datastream
- let mut proto: u32 = 0x00000002;
-
- // Flag proto as the last protocol
- let fin: u32 = 0x80000000;
- proto |= fin;
-
- // Add handshake and protocol to our buffer
- init.extend(handshake.to_be_bytes().iter());
- init.extend(proto.to_be_bytes().iter());
-
- // Send Buffer
- stream.write(&init).await?;
-
- // Read Response
- let mut buf = [0; 4];
- stream.read(&mut buf).await?;
-
- let (_, connack) = ConnAck::parse(&buf)?;
- Ok(connack)
-}
diff --git a/src/frame/mod.rs b/src/frame/mod.rs
index 709d3af..cbeddb0 100644
--- a/src/frame/mod.rs
+++ b/src/frame/mod.rs
@@ -195,8 +195,7 @@ impl Decoder for QuasselCodec {
}
}
-impl Encoder for QuasselCodec {
- type Item = Vec<u8>;
+impl Encoder<Vec<u8>> for QuasselCodec {
type Error = io::Error;
fn encode(&mut self, data: Vec<u8>, dst: &mut BytesMut) -> Result<(), io::Error> {
diff --git a/src/lib.rs b/src/lib.rs
index a11ccf2..223a2da 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,25 +1,32 @@
+#![feature(external_doc)]
+#![feature(doc_cfg)]
+#![doc(include = "../README.md")]
+#[cfg_attr(docsrs, feature(doc_cfg))]
#[macro_use]
mod util;
-#[cfg(feature = "client")]
-pub mod client;
-
#[cfg(test)]
pub mod tests;
#[macro_use]
extern crate failure;
+/// Quassel Structures for serialization and deserialization
pub mod message;
+
+/// Quassels QT based primitive types that make up the more complex messages
pub mod primitive;
pub mod session;
#[allow(dead_code)]
+/// Error Types
pub mod error;
#[allow(unused_variables, dead_code)]
#[cfg(feature = "framing")]
+#[cfg_attr(docsrs, doc(cfg(feature = "framing")))]
+/// Framing impl to be used with [`tokio_util::codec::Framed`]
pub mod frame;
use failure::Error;