aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--Cargo.lock12
-rw-r--r--Cargo.toml1
-rw-r--r--src/bin/quassel-client.rs74
-rw-r--r--src/client/mod.rs80
4 files changed, 152 insertions, 15 deletions
diff --git a/Cargo.lock b/Cargo.lock
index fc1f82b..dc9844f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -19,6 +19,16 @@ version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "async-trait"
+version = "0.1.30"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "proc-macro2 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "quote 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "syn 1.0.18 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -355,6 +365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "libquassel"
version = "0.1.0"
dependencies = [
+ "async-trait 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1089,6 +1100,7 @@ dependencies = [
"checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2"
"checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada"
"checksum arc-swap 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b585a98a234c46fc563103e9278c9391fde1f4e6850334da895d27edb9580f62"
+"checksum async-trait 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)" = "da71fef07bc806586090247e971229289f64c210a278ee5ae419314eb386b31d"
"checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
"checksum backtrace 0.3.46 (registry+https://github.com/rust-lang/crates.io-index)" = "b1e692897359247cc6bb902933361652380af0f1b7651ae5c5013407f30e109e"
diff --git a/Cargo.toml b/Cargo.toml
index fd66a39..d65ce6f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,7 @@ byteorder = "1.3.2"
failure = "0.1"
either = "1.5"
time = "0.2"
+async-trait = "0.1"
bytes = { version = "0.5" }
flate2 = { version = "1.0", features = ["tokio"], optional = true }
diff --git a/src/bin/quassel-client.rs b/src/bin/quassel-client.rs
index 233d3ae..fa3fdf8 100644
--- a/src/bin/quassel-client.rs
+++ b/src/bin/quassel-client.rs
@@ -1,11 +1,21 @@
use failure::Error;
extern crate libquassel;
-use libquassel::client;
extern crate tokio;
extern crate pretty_env_logger;
+use libquassel::primitive::*;
+use libquassel::message::*;
+use libquassel::client::*;
+
+use tokio::io::{AsyncRead, AsyncWrite};
+use core::marker::Unpin;
+use futures::SinkExt;
+use std::future::Future;
+
+use log::*;
+
#[tokio::main]
async fn main() -> Result<(), Error> {
pretty_env_logger::init();
@@ -19,19 +29,69 @@ async fn main() -> Result<(), Error> {
let username = std::env::args().nth(1).expect("no username given");
let password = std::env::args().nth(2).expect("no password given");
+ let funcs = Funcs {
+ init: InitFuncs {
+ client_init_ack,
+ client_init_reject,
+ client_login_ack,
+ client_login_reject,
+ session_init
+ },
+ message: MessageFuncs {
+ sync_message,
+ rpc_call,
+ init_request,
+ init_data,
+ heart_beat,
+ heart_beat_reply
+ }
+ };
- let mut client = client::Client::<tokio_tls::TlsStream<tokio::net::TcpStream>>::connect_tls(
+ let mut client = Client::<tokio_tls::TlsStream<tokio::net::TcpStream>>::connect_tls(
"cocaine.farm",
4242,
true,
- client::User {
+ User {
name: username,
- password: password,
- }
+ password,
+ },
+ //funcs,
).await.unwrap();
client.run().await;
-// client.login("audron", "audron", client_init);
Ok(())
-} // the stream is closed here
+}
+
+async fn client_init_ack<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, item: VariantMap) {
+ use libquassel::{HandshakeSerialize, HandshakeDeserialize};
+
+ info!(target: "init", "Initialization successfull");
+ info!(target: "login", "Starting Login");
+ let login = ClientLogin {user: client.user.name.clone(), password: client.user.password.clone()};
+ client.stream.send(login.serialize().unwrap()).await.unwrap();
+}
+
+async fn client_init_reject<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ error!(target: "init", "Initialization failed: {:?}", item);
+}
+
+async fn client_login_ack<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ info!(target: "login", "Login successfull");
+}
+
+async fn client_login_reject<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ error!(target: "login", "Login failed: {:?}", item);
+}
+
+async fn session_init<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, item: VariantMap) {
+ info!(target: "login", "Session Initialization finished. Switching to Connected state");
+ client.state = ClientState::Connected;
+}
+
+async fn sync_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: SyncMessage) { unimplemented!() }
+async fn rpc_call<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: RpcCall) { unimplemented!() }
+async fn init_request<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: InitRequest) { unimplemented!() }
+async fn init_data<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: InitData) { unimplemented!() }
+async fn heart_beat<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: HeartBeat) { unimplemented!() }
+async fn heart_beat_reply<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: HeartBeatReply) { unimplemented!() }
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 9f0d66c..cded8c9 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -14,6 +14,7 @@ use tokio_tls::TlsStream;
use tokio_util::codec::Framed;
use futures_util::stream::StreamExt;
use futures::SinkExt;
+use std::future::Future;
use crate::frame::QuasselCodec;
@@ -21,16 +22,22 @@ use failure::Error;
use log::{trace, debug, info, error};
-use crate::message::ConnAck;
+use crate::message::*;
+use crate::primitive::*;
extern crate log;
-pub struct Client<T: AsyncRead + AsyncWrite + Unpin> {
- stream: Framed<T, QuasselCodec>,
+pub struct Client<T, F>
+where
+ F: Future,
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+{
+ pub stream: Framed<T, QuasselCodec>,
pub tls: bool,
pub compression: bool,
pub state: ClientState,
pub user: User,
+ pub funcs: Funcs<T, F>,
}
pub struct User {
@@ -43,7 +50,7 @@ pub enum ClientState {
Connected,
}
-impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
+impl <T: AsyncRead + AsyncWrite + Unpin, F: Future> Client<T, F> {
pub async fn run(&mut self) {
use crate::primitive::StringList;
use crate::message::ClientInit;
@@ -75,7 +82,13 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
};
}
- pub async fn connect(address: &'static str, port: u64, compression: bool, user: User) -> Result<Client<TcpStream>, Error> {
+ pub async fn connect(
+ address: &'static str,
+ port: u64,
+ compression: bool,
+ user: User,
+ funcs: Funcs<TcpStream, impl Future>
+ ) -> Result<Client<TcpStream, impl Future>, Error> {
let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?;
info!(target: "init", "Establishing Connection");
@@ -97,10 +110,17 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
compression,
state: ClientState::Handshake,
user,
+ funcs,
});
}
- pub async fn connect_tls(address: &'static str, port: u64, compression: bool, user: User) -> Result<Client<TlsStream<TcpStream>>, Error> {
+ pub async fn connect_tls(
+ address: &'static str,
+ port: u64,
+ compression: bool,
+ user: User,
+ funcs: Funcs<TlsStream<TcpStream>, impl Future>
+ ) -> Result<Client<TlsStream<TcpStream>, impl Future>, Error> {
let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?;
info!(target: "init", "Establishing Connection");
@@ -126,12 +146,13 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
compression,
state: ClientState::Handshake,
user,
+ funcs,
});
}
}
-pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
+pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, buf: &[u8]) -> Result<(), Error> {
use crate::{HandshakeSerialize, HandshakeDeserialize};
use crate::message::ClientLogin;
use crate::primitive::{VariantMap, Variant};
@@ -169,7 +190,7 @@ pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mu
return Ok(());
}
-pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
+pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, buf: &[u8]) -> Result<(), Error> {
use crate::message::Message;
use crate::primitive::VariantList;
use crate::Deserialize;
@@ -179,9 +200,52 @@ pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Clie
let (_, res) = Message::parse(buf)?;
debug!(target: "init", "Received Messsage: {:#?}", res);
+ match res {
+ Message::SyncMessage(_) => {}
+ Message::RpcCall(_) => {}
+ Message::InitRequest(_) => {}
+ Message::InitData(_) => {}
+ Message::HeartBeat(_) => {}
+ Message::HeartBeatReply(_) => {}
+ }
+
return Ok(());
}
+pub struct Funcs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub init: InitFuncs<T, F>,
+ pub message: MessageFuncs<T, F>,
+}
+
+pub struct InitFuncs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub client_init_ack: fn(&mut Client<T, F>, VariantMap) -> F,
+ pub client_init_reject: fn(Client<T, F>, VariantMap) -> F,
+ pub client_login_ack: fn(Client<T, F>, VariantMap) -> F,
+ pub client_login_reject: fn(Client<T, F>, VariantMap) -> F,
+ pub session_init: fn(&mut Client<T, F>, VariantMap) -> F,
+}
+
+pub struct MessageFuncs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub sync_message: fn(Client<T, F>, SyncMessage) -> F,
+ pub rpc_call: fn(Client<T, F>, RpcCall) -> F,
+ pub init_request: fn(Client<T, F>, InitRequest) -> F,
+ pub init_data: fn(Client<T, F>, InitData) -> F,
+ pub heart_beat: fn(Client<T, F>, HeartBeat) -> F,
+ pub heart_beat_reply: fn(Client<T, F>, HeartBeatReply) -> F,
+}
+
// Send the initialization message to the stream
pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> {
use crate::Deserialize;