aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2020-05-09 15:34:58 +0200
committerMax Audron <audron@cocaine.farm>2020-09-26 12:03:01 +0200
commit5c1b48536d502df9ef3a5a03d7d87c40b0dc97a1 (patch)
treed77554ee22756fa14229806c0282a0f8a5d4eb2c /src
parentsplit handshake.rs (diff)
WIP: function api
Diffstat (limited to '')
-rw-r--r--src/bin/quassel-client.rs74
-rw-r--r--src/client/mod.rs80
2 files changed, 139 insertions, 15 deletions
diff --git a/src/bin/quassel-client.rs b/src/bin/quassel-client.rs
index 233d3ae..fa3fdf8 100644
--- a/src/bin/quassel-client.rs
+++ b/src/bin/quassel-client.rs
@@ -1,11 +1,21 @@
use failure::Error;
extern crate libquassel;
-use libquassel::client;
extern crate tokio;
extern crate pretty_env_logger;
+use libquassel::primitive::*;
+use libquassel::message::*;
+use libquassel::client::*;
+
+use tokio::io::{AsyncRead, AsyncWrite};
+use core::marker::Unpin;
+use futures::SinkExt;
+use std::future::Future;
+
+use log::*;
+
#[tokio::main]
async fn main() -> Result<(), Error> {
pretty_env_logger::init();
@@ -19,19 +29,69 @@ async fn main() -> Result<(), Error> {
let username = std::env::args().nth(1).expect("no username given");
let password = std::env::args().nth(2).expect("no password given");
+ let funcs = Funcs {
+ init: InitFuncs {
+ client_init_ack,
+ client_init_reject,
+ client_login_ack,
+ client_login_reject,
+ session_init
+ },
+ message: MessageFuncs {
+ sync_message,
+ rpc_call,
+ init_request,
+ init_data,
+ heart_beat,
+ heart_beat_reply
+ }
+ };
- let mut client = client::Client::<tokio_tls::TlsStream<tokio::net::TcpStream>>::connect_tls(
+ let mut client = Client::<tokio_tls::TlsStream<tokio::net::TcpStream>>::connect_tls(
"cocaine.farm",
4242,
true,
- client::User {
+ User {
name: username,
- password: password,
- }
+ password,
+ },
+ //funcs,
).await.unwrap();
client.run().await;
-// client.login("audron", "audron", client_init);
Ok(())
-} // the stream is closed here
+}
+
+async fn client_init_ack<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, item: VariantMap) {
+ use libquassel::{HandshakeSerialize, HandshakeDeserialize};
+
+ info!(target: "init", "Initialization successfull");
+ info!(target: "login", "Starting Login");
+ let login = ClientLogin {user: client.user.name.clone(), password: client.user.password.clone()};
+ client.stream.send(login.serialize().unwrap()).await.unwrap();
+}
+
+async fn client_init_reject<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ error!(target: "init", "Initialization failed: {:?}", item);
+}
+
+async fn client_login_ack<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ info!(target: "login", "Login successfull");
+}
+
+async fn client_login_reject<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: VariantMap) {
+ error!(target: "login", "Login failed: {:?}", item);
+}
+
+async fn session_init<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, item: VariantMap) {
+ info!(target: "login", "Session Initialization finished. Switching to Connected state");
+ client.state = ClientState::Connected;
+}
+
+async fn sync_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: SyncMessage) { unimplemented!() }
+async fn rpc_call<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: RpcCall) { unimplemented!() }
+async fn init_request<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: InitRequest) { unimplemented!() }
+async fn init_data<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: InitData) { unimplemented!() }
+async fn heart_beat<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: HeartBeat) { unimplemented!() }
+async fn heart_beat_reply<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: Client<T, F>, item: HeartBeatReply) { unimplemented!() }
diff --git a/src/client/mod.rs b/src/client/mod.rs
index 9f0d66c..cded8c9 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -14,6 +14,7 @@ use tokio_tls::TlsStream;
use tokio_util::codec::Framed;
use futures_util::stream::StreamExt;
use futures::SinkExt;
+use std::future::Future;
use crate::frame::QuasselCodec;
@@ -21,16 +22,22 @@ use failure::Error;
use log::{trace, debug, info, error};
-use crate::message::ConnAck;
+use crate::message::*;
+use crate::primitive::*;
extern crate log;
-pub struct Client<T: AsyncRead + AsyncWrite + Unpin> {
- stream: Framed<T, QuasselCodec>,
+pub struct Client<T, F>
+where
+ F: Future,
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+{
+ pub stream: Framed<T, QuasselCodec>,
pub tls: bool,
pub compression: bool,
pub state: ClientState,
pub user: User,
+ pub funcs: Funcs<T, F>,
}
pub struct User {
@@ -43,7 +50,7 @@ pub enum ClientState {
Connected,
}
-impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
+impl <T: AsyncRead + AsyncWrite + Unpin, F: Future> Client<T, F> {
pub async fn run(&mut self) {
use crate::primitive::StringList;
use crate::message::ClientInit;
@@ -75,7 +82,13 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
};
}
- pub async fn connect(address: &'static str, port: u64, compression: bool, user: User) -> Result<Client<TcpStream>, Error> {
+ pub async fn connect(
+ address: &'static str,
+ port: u64,
+ compression: bool,
+ user: User,
+ funcs: Funcs<TcpStream, impl Future>
+ ) -> Result<Client<TcpStream, impl Future>, Error> {
let mut stream = TcpStream::connect(format!("{}:{}", address, port)).await?;
info!(target: "init", "Establishing Connection");
@@ -97,10 +110,17 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
compression,
state: ClientState::Handshake,
user,
+ funcs,
});
}
- pub async fn connect_tls(address: &'static str, port: u64, compression: bool, user: User) -> Result<Client<TlsStream<TcpStream>>, Error> {
+ pub async fn connect_tls(
+ address: &'static str,
+ port: u64,
+ compression: bool,
+ user: User,
+ funcs: Funcs<TlsStream<TcpStream>, impl Future>
+ ) -> Result<Client<TlsStream<TcpStream>, impl Future>, Error> {
let mut stream: TcpStream = TcpStream::connect(format!("{}:{}", address, port)).await?;
info!(target: "init", "Establishing Connection");
@@ -126,12 +146,13 @@ impl <T: AsyncRead + AsyncWrite + Unpin> Client<T> {
compression,
state: ClientState::Handshake,
user,
+ funcs,
});
}
}
-pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
+pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, buf: &[u8]) -> Result<(), Error> {
use crate::{HandshakeSerialize, HandshakeDeserialize};
use crate::message::ClientLogin;
use crate::primitive::{VariantMap, Variant};
@@ -169,7 +190,7 @@ pub async fn handle_login_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mu
return Ok(());
}
-pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Client<T>, buf: &[u8]) -> Result<(), Error> {
+pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin, F: Future>(client: &mut Client<T, F>, buf: &[u8]) -> Result<(), Error> {
use crate::message::Message;
use crate::primitive::VariantList;
use crate::Deserialize;
@@ -179,9 +200,52 @@ pub async fn handle_message<T: AsyncRead + AsyncWrite + Unpin>(client: &mut Clie
let (_, res) = Message::parse(buf)?;
debug!(target: "init", "Received Messsage: {:#?}", res);
+ match res {
+ Message::SyncMessage(_) => {}
+ Message::RpcCall(_) => {}
+ Message::InitRequest(_) => {}
+ Message::InitData(_) => {}
+ Message::HeartBeat(_) => {}
+ Message::HeartBeatReply(_) => {}
+ }
+
return Ok(());
}
+pub struct Funcs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub init: InitFuncs<T, F>,
+ pub message: MessageFuncs<T, F>,
+}
+
+pub struct InitFuncs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub client_init_ack: fn(&mut Client<T, F>, VariantMap) -> F,
+ pub client_init_reject: fn(Client<T, F>, VariantMap) -> F,
+ pub client_login_ack: fn(Client<T, F>, VariantMap) -> F,
+ pub client_login_reject: fn(Client<T, F>, VariantMap) -> F,
+ pub session_init: fn(&mut Client<T, F>, VariantMap) -> F,
+}
+
+pub struct MessageFuncs<T, F>
+where
+ T: 'static + AsyncRead + AsyncWrite + Unpin,
+ F: std::future::Future,
+{
+ pub sync_message: fn(Client<T, F>, SyncMessage) -> F,
+ pub rpc_call: fn(Client<T, F>, RpcCall) -> F,
+ pub init_request: fn(Client<T, F>, InitRequest) -> F,
+ pub init_data: fn(Client<T, F>, InitData) -> F,
+ pub heart_beat: fn(Client<T, F>, HeartBeat) -> F,
+ pub heart_beat_reply: fn(Client<T, F>, HeartBeatReply) -> F,
+}
+
// Send the initialization message to the stream
pub async fn init(stream: &mut TcpStream, tls: bool, compression: bool) -> Result<ConnAck, Error> {
use crate::Deserialize;