diff options
| author | Max Audron <audron@cocaine.farm> | 2021-08-20 14:11:54 +0200 |
|---|---|---|
| committer | Max Audron <audron@cocaine.farm> | 2021-09-28 18:15:50 +0200 |
| commit | 8dba16270a31e8574aada487b2b14c27f20c63eb (patch) | |
| tree | 2aeaf3e36068d870b3143b85458b5ca57885bdcc /examples/statetracker | |
| parent | rename override_type to type (diff) | |
statetracker: first iteration of working rpc object
Diffstat (limited to '')
| -rw-r--r-- | examples/statetracker/Cargo.toml | 35 | ||||
| -rw-r--r-- | examples/statetracker/src/aliasmanager.rs | 85 | ||||
| -rw-r--r-- | examples/statetracker/src/command.rs | 8 | ||||
| -rw-r--r-- | examples/statetracker/src/connect.rs | 78 | ||||
| -rw-r--r-- | examples/statetracker/src/formatter.rs | 47 | ||||
| -rw-r--r-- | examples/statetracker/src/main.rs | 131 | ||||
| -rw-r--r-- | examples/statetracker/src/server.rs | 325 |
7 files changed, 709 insertions, 0 deletions
diff --git a/examples/statetracker/Cargo.toml b/examples/statetracker/Cargo.toml new file mode 100644 index 0000000..fa78217 --- /dev/null +++ b/examples/statetracker/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "statetracker" +version = "0.1.0" +edition = "2018" + +[dependencies] +libquassel = { path = "../../", features = ["framing", "all-quassel-features"] } + +# druid = "0.7" +druid = { git = "https://github.com/linebender/druid", features = ["im"] } +druid-widget-nursery = { path = "/home/audron/repo/github.com/linebender/druid-widget-nursery" } + +# iced = { version = "0.3", features = ["tokio"]} + +anyhow = "*" + +tracing = "0.1" +tracing-subscriber = "0.2" +tracing-futures = "0.2" + +byteorder = "1" +either = "1" +time = "0.2" +log = "*" + +tokio = { version = "1", features = ["full", "rt-multi-thread"]} +tokio-util = { version = "0.6", features = ["codec"] } +tokio-rustls = { version = "0.22" } + +bytes = { version = "1" } +flate2 = { version = "1", features = ["tokio"] } +webpki-roots = { version = "0.21" } +futures-util = { version = "0.3", features = ["std"] } +futures = { version = "0.3" } +pretty_env_logger = { version = "0.4" } diff --git a/examples/statetracker/src/aliasmanager.rs b/examples/statetracker/src/aliasmanager.rs new file mode 100644 index 0000000..9247c22 --- /dev/null +++ b/examples/statetracker/src/aliasmanager.rs @@ -0,0 +1,85 @@ +use std::sync::Arc; + +use druid::widget::Label; +use druid::{lens, Lens, Point, WidgetPod}; +use druid::{widget::Flex, Widget}; + +use libquassel::message::objects::AliasManager; + +pub struct AliasManagerWidget { + inner: WidgetPod<Arc<AliasManager>, Box<dyn Widget<Arc<AliasManager>>>>, +} + +impl AliasManagerWidget { + pub fn new() -> Self { + let widget = WidgetPod::new(Flex::column()).boxed(); + + AliasManagerWidget { inner: widget } + } +} + +impl Widget<Arc<AliasManager>> for AliasManagerWidget { + fn event( + &mut self, + ctx: &mut druid::EventCtx, + event: &druid::Event, + data: &mut Arc<AliasManager>, + env: &druid::Env, + ) { + self.inner.event(ctx, event, data, env) + } + + fn lifecycle( + &mut self, + ctx: &mut druid::LifeCycleCtx, + event: &druid::LifeCycle, + data: &Arc<AliasManager>, + env: &druid::Env, + ) { + self.inner.lifecycle(ctx, event, data, env) + } + + fn update( + &mut self, + ctx: &mut druid::UpdateCtx, + _old_data: &Arc<AliasManager>, + data: &Arc<AliasManager>, + _env: &druid::Env, + ) { + let aliases = lens!(AliasManager, aliases); + + let mut widget: Flex<Arc<AliasManager>> = Flex::column(); + + aliases.with(data, |aliases| { + for alias in aliases { + widget.add_child( + Flex::row() + .with_child(Label::new(alias.name.clone())) + .with_child(Label::new(alias.expansion.clone())), + ) + } + }); + + self.inner = WidgetPod::new(widget).boxed(); + + ctx.children_changed(); + ctx.request_layout(); + ctx.request_paint(); + } + + fn layout( + &mut self, + ctx: &mut druid::LayoutCtx, + bc: &druid::BoxConstraints, + data: &Arc<AliasManager>, + env: &druid::Env, + ) -> druid::Size { + let size = self.inner.layout(ctx, bc, data, env); + self.inner.set_origin(ctx, data, env, Point::ZERO); + return size; + } + + fn paint(&mut self, ctx: &mut druid::PaintCtx, data: &Arc<AliasManager>, env: &druid::Env) { + self.inner.paint(ctx, data, env) + } +} diff --git a/examples/statetracker/src/command.rs b/examples/statetracker/src/command.rs new file mode 100644 index 0000000..fc221c6 --- /dev/null +++ b/examples/statetracker/src/command.rs @@ -0,0 +1,8 @@ +use druid::Selector; +use libquassel::message::objects::{Alias, AliasManager}; + +pub const CONNECT: Selector = Selector::new("connect"); +pub const ADD_MESSAGE: Selector<crate::Message> = Selector::new("add_message"); + +pub const ALIASMANAGER_INIT: Selector<AliasManager> = Selector::new("aliasmanager_init"); +pub const ALIASMANAGER_ADD_ALIAS: Selector<Alias> = Selector::new("aliasmanager_add_alias"); diff --git a/examples/statetracker/src/connect.rs b/examples/statetracker/src/connect.rs new file mode 100644 index 0000000..9298b15 --- /dev/null +++ b/examples/statetracker/src/connect.rs @@ -0,0 +1,78 @@ +use std::thread; + +use druid::ExtEventSink; +use futures::StreamExt; +use libquassel::frame::QuasselCodec; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; +use tokio_util::codec::Framed; +use tracing::debug; + +use crate::{ + server::{ClientState, Server}, + StateTracker, +}; + +impl StateTracker { + pub fn connect(&mut self, ctx: ExtEventSink) { + debug!("starting connect"); + + let server = self.server.clone(); + + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async move { + let mut s_server = TcpStream::connect(format!( + "{}:{}", + server.settings.host, server.settings.port + )) + .await + .unwrap(); + + let _connack = server.init(&mut s_server).await.unwrap(); + + let codec = QuasselCodec::builder().compression(false).new_codec(); + let framed = Framed::new(s_server, codec); + let (s_sink, s_stream) = framed.split(); + + let listener = TcpListener::bind((server.listen_host, server.listen_port)) + .await + .unwrap(); + let (mut client, _) = listener.accept().await.unwrap(); + + // + // Setup Listener + // + + { + let (mut c_stream, mut c_sink) = client.split(); + + let mut init = [0; 12]; + let n = c_stream.peek(&mut init).await.unwrap(); + c_stream.read(&mut init[..n]).await.unwrap(); + let init = libquassel::message::Init::parse(&init); + debug!("send init bytes: {:?}", init); + + c_sink.write(&[0x0, 0x0, 0x0, 0x2]).await.unwrap(); + } + + let codec = QuasselCodec::builder().compression(false).new_codec(); + let framed = Framed::new(client, codec); + let (c_sink, c_stream) = framed.split(); + + // Start Processing + + let s_state = ClientState::Handshake; + let c_state = ClientState::Handshake; + + tokio::join!( + Server::run(s_stream, c_sink, s_state, ctx.clone(), "server -> client"), + Server::run(c_stream, s_sink, c_state, ctx.clone(), "client -> server") + ); + }); + }); + } +} diff --git a/examples/statetracker/src/formatter.rs b/examples/statetracker/src/formatter.rs new file mode 100644 index 0000000..4ceb68f --- /dev/null +++ b/examples/statetracker/src/formatter.rs @@ -0,0 +1,47 @@ +use druid::text::{Formatter, Validation, ValidationError}; + +pub struct U16Formatter; + +impl Formatter<u16> for U16Formatter { + fn format(&self, value: &u16) -> String { + value.to_string() + } + + fn validate_partial_input(&self, input: &str, _sel: &druid::text::Selection) -> Validation { + if input.is_empty() { + return Validation::success(); + } + + if input.len() > 6 { + return Validation::failure(U16ValidationError::WrongNumberOfCharacters); + } + + match input.parse::<u16>() { + Ok(_) => Validation::success(), + Err(err) => Validation::failure(err), + } + } + + fn value(&self, input: &str) -> Result<u16, ValidationError> { + if input.is_empty() || input.len() > 5 { + return Err(ValidationError::new( + U16ValidationError::WrongNumberOfCharacters, + )); + } + + input.parse().map_err(|err| ValidationError::new(err)) + } +} + +#[derive(Debug, Clone)] +pub enum U16ValidationError { + WrongNumberOfCharacters, +} + +impl std::fmt::Display for U16ValidationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", std::any::type_name_of_val(self)) + } +} + +impl std::error::Error for U16ValidationError {} diff --git a/examples/statetracker/src/main.rs b/examples/statetracker/src/main.rs new file mode 100644 index 0000000..7213000 --- /dev/null +++ b/examples/statetracker/src/main.rs @@ -0,0 +1,131 @@ +#![feature(type_name_of_val)] + +use std::sync::Arc; + +use aliasmanager::AliasManagerWidget; +use druid::{ + widget::{Align, Either, Flex, Label, List, Split}, + AppDelegate, Command, +}; +use druid::{AppLauncher, Data, Env, Lens, LocalizedString, Widget, WidgetExt, WindowDesc}; +use libquassel::message::objects::AliasManager; +use tracing::debug; + +use crate::server::{Message, ServerWidget}; + +const SPACING: f64 = 10.0; + +const VERTICAL_WIDGET_SPACING: f64 = 20.0; +const WINDOW_TITLE: LocalizedString<StateTracker> = LocalizedString::new("StateTracker"); + +mod command; +mod connect; +mod formatter; +mod server; + +mod aliasmanager; + +#[derive(Clone, Data, Lens)] +struct StateTracker { + server: server::Server, + messages: Arc<Vec<server::Message>>, + alias_manager: Arc<AliasManager>, + connected: bool, + test: i32, +} + +impl StateTracker { + fn new() -> StateTracker { + StateTracker { + server: server::Server::default(), + messages: Arc::new(Vec::new()), + alias_manager: Arc::new(AliasManager { + aliases: Vec::new(), + }), + connected: false, + test: 0, + } + } + + fn widget() -> impl Widget<StateTracker> { + let either = Either::new( + |server, _env| server.connected, + Split::columns( + Flex::column() + .with_child(Label::new("AliasManager")) + .with_child(AliasManagerWidget::new().lens(StateTracker::alias_manager)), + List::new(|| { + Label::new(|item: &Message, _env: &_| format!("{:#?}", item)).padding(10.0) + }) + .scroll() + .vertical() + .lens(StateTracker::messages), + ) + .expand(), + ServerWidget::new() + .fix_width(200.0) + .lens(StateTracker::server), + ); + + let layout = Flex::column() + .with_flex_child(either, 1.0) + .with_spacer(VERTICAL_WIDGET_SPACING); + + Align::centered(layout) + } +} + +struct StateTrackerDelegate; +impl AppDelegate<StateTracker> for StateTrackerDelegate { + fn command( + &mut self, + ctx: &mut druid::DelegateCtx, + _target: druid::Target, + cmd: &Command, + data: &mut StateTracker, + _env: &Env, + ) -> druid::Handled { + if let Some(_) = cmd.get(command::CONNECT) { + debug!("got CONNECT command"); + + data.connect(ctx.get_external_handle()); + data.connected = true; + + return druid::Handled::Yes; + } else if let Some(msg) = cmd.get(command::ADD_MESSAGE) { + debug!("got ADD_MESSAGE command"); + + let list = Arc::make_mut(&mut data.messages); + list.push(msg.clone()); + } else if let Some(alias) = cmd.get(command::ALIASMANAGER_ADD_ALIAS) { + let mut alias_manager = Arc::make_mut(&mut data.alias_manager).clone(); + alias_manager.add_alias(alias.to_owned()); + data.alias_manager = Arc::new(alias_manager); + } else if let Some(alias_manager) = cmd.get(command::ALIASMANAGER_INIT) { + data.alias_manager = Arc::new(alias_manager.to_owned()); + } + + druid::Handled::No + } +} + +fn main() { + // pretty_env_logger::init(); + + // describe the main window + let main_window = WindowDesc::new(StateTracker::widget()) + .title(WINDOW_TITLE) + .window_size((400.0, 400.0)); + + // create the initial app state + let initial_state = StateTracker::new(); + + // let state = Arc::new(RwLock::new(initial_state)); + + // start the application + AppLauncher::with_window(main_window) + .log_to_console() + .delegate(StateTrackerDelegate) + .launch(initial_state) + .expect("Failed to launch application"); +} diff --git a/examples/statetracker/src/server.rs b/examples/statetracker/src/server.rs new file mode 100644 index 0000000..ac9c550 --- /dev/null +++ b/examples/statetracker/src/server.rs @@ -0,0 +1,325 @@ +use anyhow::{bail, Error}; + +use druid::{ + widget::{ + Align, Button, Checkbox, Container, Controller, ControllerHost, Flex, Label, TextBox, + }, + Command, Data, ExtEventSink, Lens, Target, Widget, WidgetExt, +}; + +use libquassel::{ + deserialize::Deserialize, + frame::QuasselCodec, + message::{self, objects, ConnAck, HandshakeMessage, Init}, +}; + +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::TcpStream, +}; +use tokio_util::codec::Framed; + +use tracing::{debug, info, trace}; + +use crate::{command, formatter}; + +#[derive(Clone, Debug, Data, Lens)] +pub struct ServerSettings { + pub tls: bool, + pub compression: bool, + pub host: String, + pub port: u16, + pub username: String, + pub password: String, +} + +impl Default for ServerSettings { + fn default() -> Self { + ServerSettings { + tls: false, + compression: false, + host: String::from("localhost"), + port: 4242, + username: String::default(), + password: String::default(), + } + } +} + +#[derive(Clone, Data, Lens)] +pub struct Server { + pub server_name: String, + pub listen_port: u16, + pub listen_host: String, + pub settings: ServerSettings, +} + +#[derive(Debug)] +pub enum ClientState { + Handshake, + Connected, +} + +#[derive(Clone, Debug)] +pub enum Message { + Handshake(HandshakeMessage), + SignalProxy(message::Message), +} + +impl Data for Message { + fn same(&self, other: &Self) -> bool { + if let Self::Handshake(_) = self { + if let Self::Handshake(_) = other { + return true; + } else { + return false; + } + } else if let Self::SignalProxy(_) = self { + if let Self::SignalProxy(_) = other { + return true; + } else { + return false; + } + } else { + return false; + } + } +} + +impl Server { + pub async fn init( + &self, + stream: &mut (impl AsyncRead + AsyncWrite + Unpin), + ) -> Result<ConnAck, Error> { + let init = Init::new() + .tls(self.settings.tls) + .compression(self.settings.compression); + + stream.write(&init.serialize()).await?; + + let mut buf = [0; 4]; + stream.read(&mut buf).await?; + + let (_, connack) = ConnAck::parse(&buf).unwrap(); + Ok(connack) + } + + pub async fn run( + mut stream: SplitStream<Framed<TcpStream, QuasselCodec>>, + mut sink: SplitSink<Framed<TcpStream, QuasselCodec>, Vec<u8>>, + mut state: ClientState, + ctx: ExtEventSink, + direction: &str, + ) { + // Start event loop + while let Some(msg) = stream.next().await { + let msg = msg.unwrap(); + sink.send(msg.to_vec()).await.unwrap(); + let msg = match state { + ClientState::Handshake => { + Server::handle_login_message(&msg, &mut state, direction, ctx.clone()) + .await + .unwrap() + } + ClientState::Connected => Server::handle_message(&msg, direction, ctx.clone()) + .await + .unwrap(), + }; + + ctx.submit_command(command::ADD_MESSAGE, msg, Target::Global) + .unwrap(); + } + } + + async fn handle_login_message( + buf: &[u8], + state: &mut ClientState, + direction: &str, + _ctx: ExtEventSink, + ) -> Result<Message, Error> { + use libquassel::HandshakeDeserialize; + + trace!(target: "handshakemessage", "Received bytes: {:x?}", buf); + match HandshakeMessage::parse(buf) { + Ok((_size, res)) => { + info!("{}: {:#?}", direction, res); + + match res { + HandshakeMessage::SessionInit(_) => *state = ClientState::Connected, + HandshakeMessage::ClientLogin(_) => *state = ClientState::Connected, + _ => {} + } + + return Ok(Message::Handshake(res)); + } + Err(e) => bail!("failed to parse handshake message {}", e), + } + } + + async fn handle_message( + buf: &[u8], + _direction: &str, + ctx: ExtEventSink, + ) -> Result<Message, Error> { + use libquassel::deserialize::*; + trace!(target: "message", "Received bytes: {:x?}", buf); + + match message::Message::parse(buf) { + Ok((_size, res)) => { + #[allow(unused_variables)] + match &res { + message::Message::SyncMessage(msg) => (), + message::Message::RpcCall(msg) => (), + message::Message::InitRequest(msg) => (), + message::Message::InitData(msg) => match &msg.init_data { + objects::Types::AliasManager(alias_manager) => ctx + .submit_command( + command::ALIASMANAGER_INIT, + alias_manager.clone(), + Target::Global, + ) + .unwrap(), + _ => (), + }, + message::Message::HeartBeat(msg) => (), + message::Message::HeartBeatReply(msg) => (), + } + + return Ok(Message::SignalProxy(res)); + } + Err(e) => { + bail!("failed to parse message {}", e); + } + } + } +} + +impl Default for Server { + fn default() -> Self { + Server { + server_name: String::default(), + listen_port: 4243, + listen_host: String::from("localhost"), + settings: ServerSettings::default(), + } + } +} + +impl std::fmt::Debug for Server { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut fmt = f.debug_struct("Server"); + fmt.field("settings", &self.settings); + fmt.field("name", &self.server_name).finish() + } +} + +pub struct ServerWidget {} + +impl ServerWidget { + pub fn new() -> ControllerHost<impl Widget<Server>, ServerWidget> { + let layout = Flex::column() + .with_child(Label::new("Connect").align_left()) + .with_spacer(crate::SPACING) + .with_child( + TextBox::new() + .with_placeholder("Server Name") + .expand_width() + .lens(Server::server_name), + ) + .with_spacer(crate::SPACING) + .with_child( + Container::new( + Flex::column() + .with_child( + Flex::row() + .with_flex_child( + TextBox::new() + .with_placeholder("Host") + .expand_width() + .lens(ServerSettings::host), + 2.0, + ) + .with_spacer(crate::SPACING) + .with_flex_child( + TextBox::new() + .with_formatter(formatter::U16Formatter) + .lens(ServerSettings::port), + 1.0, + ) + .expand_width(), + ) + .with_spacer(crate::SPACING) + .with_child( + Flex::row() + .with_child(Checkbox::new("TLS").lens(ServerSettings::tls)) + .with_flex_spacer(1.0) + .with_child( + Checkbox::new("Compression").lens(ServerSettings::compression), + ), + ) + .with_spacer(crate::SPACING * 2.0) + .with_child(Label::new("Login").align_left()) + .with_spacer(crate::SPACING) + .with_child( + TextBox::new() + .with_placeholder("Username") + .expand_width() + .lens(ServerSettings::username), + ) + .with_spacer(crate::SPACING) + .with_child( + TextBox::new() + .with_placeholder("Password") + .expand_width() + .lens(ServerSettings::password), + ), + ) + .lens(Server::settings), + ) + .with_spacer(crate::SPACING) + .with_child(Label::new("Listen").align_left()) + .with_spacer(crate::SPACING) + .with_child( + Flex::row() + .with_flex_child( + TextBox::new() + .with_placeholder("Address") + .expand_width() + .lens(Server::listen_host), + 2.0, + ) + .with_spacer(crate::SPACING) + .with_flex_child( + TextBox::new() + .with_placeholder("Port") + .with_formatter(formatter::U16Formatter) + .lens(Server::listen_port), + 1.0, + ) + .expand_width(), + ) + .with_spacer(crate::SPACING) + .with_child( + Button::new("Connect") + .on_click(move |ctx, _, _| { + debug!("connect button pressed, sending command"); + ctx.submit_command(Command::new( + command::CONNECT, + (), + druid::Target::Global, + )) + }) + .align_right(), + ); + + let widget = Align::centered(layout); + + ControllerHost::new(widget, ServerWidget {}) + } +} + +impl<T, W: Widget<T>> Controller<T, W> for ServerWidget {} |
