aboutsummaryrefslogtreecommitdiff
path: root/examples/statetracker/src/server.rs
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2021-08-20 14:11:54 +0200
committerMax Audron <audron@cocaine.farm>2021-09-28 18:15:50 +0200
commit8dba16270a31e8574aada487b2b14c27f20c63eb (patch)
tree2aeaf3e36068d870b3143b85458b5ca57885bdcc /examples/statetracker/src/server.rs
parentrename override_type to type (diff)
statetracker: first iteration of working rpc object
Diffstat (limited to '')
-rw-r--r--examples/statetracker/src/server.rs325
1 files changed, 325 insertions, 0 deletions
diff --git a/examples/statetracker/src/server.rs b/examples/statetracker/src/server.rs
new file mode 100644
index 0000000..ac9c550
--- /dev/null
+++ b/examples/statetracker/src/server.rs
@@ -0,0 +1,325 @@
+use anyhow::{bail, Error};
+
+use druid::{
+ widget::{
+ Align, Button, Checkbox, Container, Controller, ControllerHost, Flex, Label, TextBox,
+ },
+ Command, Data, ExtEventSink, Lens, Target, Widget, WidgetExt,
+};
+
+use libquassel::{
+ deserialize::Deserialize,
+ frame::QuasselCodec,
+ message::{self, objects, ConnAck, HandshakeMessage, Init},
+};
+
+use futures::{
+ stream::{SplitSink, SplitStream},
+ SinkExt, StreamExt,
+};
+use tokio::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+ net::TcpStream,
+};
+use tokio_util::codec::Framed;
+
+use tracing::{debug, info, trace};
+
+use crate::{command, formatter};
+
+#[derive(Clone, Debug, Data, Lens)]
+pub struct ServerSettings {
+ pub tls: bool,
+ pub compression: bool,
+ pub host: String,
+ pub port: u16,
+ pub username: String,
+ pub password: String,
+}
+
+impl Default for ServerSettings {
+ fn default() -> Self {
+ ServerSettings {
+ tls: false,
+ compression: false,
+ host: String::from("localhost"),
+ port: 4242,
+ username: String::default(),
+ password: String::default(),
+ }
+ }
+}
+
+#[derive(Clone, Data, Lens)]
+pub struct Server {
+ pub server_name: String,
+ pub listen_port: u16,
+ pub listen_host: String,
+ pub settings: ServerSettings,
+}
+
+#[derive(Debug)]
+pub enum ClientState {
+ Handshake,
+ Connected,
+}
+
+#[derive(Clone, Debug)]
+pub enum Message {
+ Handshake(HandshakeMessage),
+ SignalProxy(message::Message),
+}
+
+impl Data for Message {
+ fn same(&self, other: &Self) -> bool {
+ if let Self::Handshake(_) = self {
+ if let Self::Handshake(_) = other {
+ return true;
+ } else {
+ return false;
+ }
+ } else if let Self::SignalProxy(_) = self {
+ if let Self::SignalProxy(_) = other {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
+
+impl Server {
+ pub async fn init(
+ &self,
+ stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
+ ) -> Result<ConnAck, Error> {
+ let init = Init::new()
+ .tls(self.settings.tls)
+ .compression(self.settings.compression);
+
+ stream.write(&init.serialize()).await?;
+
+ let mut buf = [0; 4];
+ stream.read(&mut buf).await?;
+
+ let (_, connack) = ConnAck::parse(&buf).unwrap();
+ Ok(connack)
+ }
+
+ pub async fn run(
+ mut stream: SplitStream<Framed<TcpStream, QuasselCodec>>,
+ mut sink: SplitSink<Framed<TcpStream, QuasselCodec>, Vec<u8>>,
+ mut state: ClientState,
+ ctx: ExtEventSink,
+ direction: &str,
+ ) {
+ // Start event loop
+ while let Some(msg) = stream.next().await {
+ let msg = msg.unwrap();
+ sink.send(msg.to_vec()).await.unwrap();
+ let msg = match state {
+ ClientState::Handshake => {
+ Server::handle_login_message(&msg, &mut state, direction, ctx.clone())
+ .await
+ .unwrap()
+ }
+ ClientState::Connected => Server::handle_message(&msg, direction, ctx.clone())
+ .await
+ .unwrap(),
+ };
+
+ ctx.submit_command(command::ADD_MESSAGE, msg, Target::Global)
+ .unwrap();
+ }
+ }
+
+ async fn handle_login_message(
+ buf: &[u8],
+ state: &mut ClientState,
+ direction: &str,
+ _ctx: ExtEventSink,
+ ) -> Result<Message, Error> {
+ use libquassel::HandshakeDeserialize;
+
+ trace!(target: "handshakemessage", "Received bytes: {:x?}", buf);
+ match HandshakeMessage::parse(buf) {
+ Ok((_size, res)) => {
+ info!("{}: {:#?}", direction, res);
+
+ match res {
+ HandshakeMessage::SessionInit(_) => *state = ClientState::Connected,
+ HandshakeMessage::ClientLogin(_) => *state = ClientState::Connected,
+ _ => {}
+ }
+
+ return Ok(Message::Handshake(res));
+ }
+ Err(e) => bail!("failed to parse handshake message {}", e),
+ }
+ }
+
+ async fn handle_message(
+ buf: &[u8],
+ _direction: &str,
+ ctx: ExtEventSink,
+ ) -> Result<Message, Error> {
+ use libquassel::deserialize::*;
+ trace!(target: "message", "Received bytes: {:x?}", buf);
+
+ match message::Message::parse(buf) {
+ Ok((_size, res)) => {
+ #[allow(unused_variables)]
+ match &res {
+ message::Message::SyncMessage(msg) => (),
+ message::Message::RpcCall(msg) => (),
+ message::Message::InitRequest(msg) => (),
+ message::Message::InitData(msg) => match &msg.init_data {
+ objects::Types::AliasManager(alias_manager) => ctx
+ .submit_command(
+ command::ALIASMANAGER_INIT,
+ alias_manager.clone(),
+ Target::Global,
+ )
+ .unwrap(),
+ _ => (),
+ },
+ message::Message::HeartBeat(msg) => (),
+ message::Message::HeartBeatReply(msg) => (),
+ }
+
+ return Ok(Message::SignalProxy(res));
+ }
+ Err(e) => {
+ bail!("failed to parse message {}", e);
+ }
+ }
+ }
+}
+
+impl Default for Server {
+ fn default() -> Self {
+ Server {
+ server_name: String::default(),
+ listen_port: 4243,
+ listen_host: String::from("localhost"),
+ settings: ServerSettings::default(),
+ }
+ }
+}
+
+impl std::fmt::Debug for Server {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut fmt = f.debug_struct("Server");
+ fmt.field("settings", &self.settings);
+ fmt.field("name", &self.server_name).finish()
+ }
+}
+
+pub struct ServerWidget {}
+
+impl ServerWidget {
+ pub fn new() -> ControllerHost<impl Widget<Server>, ServerWidget> {
+ let layout = Flex::column()
+ .with_child(Label::new("Connect").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Server Name")
+ .expand_width()
+ .lens(Server::server_name),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Container::new(
+ Flex::column()
+ .with_child(
+ Flex::row()
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Host")
+ .expand_width()
+ .lens(ServerSettings::host),
+ 2.0,
+ )
+ .with_spacer(crate::SPACING)
+ .with_flex_child(
+ TextBox::new()
+ .with_formatter(formatter::U16Formatter)
+ .lens(ServerSettings::port),
+ 1.0,
+ )
+ .expand_width(),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Flex::row()
+ .with_child(Checkbox::new("TLS").lens(ServerSettings::tls))
+ .with_flex_spacer(1.0)
+ .with_child(
+ Checkbox::new("Compression").lens(ServerSettings::compression),
+ ),
+ )
+ .with_spacer(crate::SPACING * 2.0)
+ .with_child(Label::new("Login").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Username")
+ .expand_width()
+ .lens(ServerSettings::username),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Password")
+ .expand_width()
+ .lens(ServerSettings::password),
+ ),
+ )
+ .lens(Server::settings),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(Label::new("Listen").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Flex::row()
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Address")
+ .expand_width()
+ .lens(Server::listen_host),
+ 2.0,
+ )
+ .with_spacer(crate::SPACING)
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Port")
+ .with_formatter(formatter::U16Formatter)
+ .lens(Server::listen_port),
+ 1.0,
+ )
+ .expand_width(),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Button::new("Connect")
+ .on_click(move |ctx, _, _| {
+ debug!("connect button pressed, sending command");
+ ctx.submit_command(Command::new(
+ command::CONNECT,
+ (),
+ druid::Target::Global,
+ ))
+ })
+ .align_right(),
+ );
+
+ let widget = Align::centered(layout);
+
+ ControllerHost::new(widget, ServerWidget {})
+ }
+}
+
+impl<T, W: Widget<T>> Controller<T, W> for ServerWidget {}