aboutsummaryrefslogtreecommitdiff
path: root/examples/statetracker
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2021-08-20 14:11:54 +0200
committerMax Audron <audron@cocaine.farm>2021-09-28 18:15:50 +0200
commit8dba16270a31e8574aada487b2b14c27f20c63eb (patch)
tree2aeaf3e36068d870b3143b85458b5ca57885bdcc /examples/statetracker
parentrename override_type to type (diff)
statetracker: first iteration of working rpc object
Diffstat (limited to '')
-rw-r--r--examples/statetracker/Cargo.toml35
-rw-r--r--examples/statetracker/src/aliasmanager.rs85
-rw-r--r--examples/statetracker/src/command.rs8
-rw-r--r--examples/statetracker/src/connect.rs78
-rw-r--r--examples/statetracker/src/formatter.rs47
-rw-r--r--examples/statetracker/src/main.rs131
-rw-r--r--examples/statetracker/src/server.rs325
7 files changed, 709 insertions, 0 deletions
diff --git a/examples/statetracker/Cargo.toml b/examples/statetracker/Cargo.toml
new file mode 100644
index 0000000..fa78217
--- /dev/null
+++ b/examples/statetracker/Cargo.toml
@@ -0,0 +1,35 @@
+[package]
+name = "statetracker"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+libquassel = { path = "../../", features = ["framing", "all-quassel-features"] }
+
+# druid = "0.7"
+druid = { git = "https://github.com/linebender/druid", features = ["im"] }
+druid-widget-nursery = { path = "/home/audron/repo/github.com/linebender/druid-widget-nursery" }
+
+# iced = { version = "0.3", features = ["tokio"]}
+
+anyhow = "*"
+
+tracing = "0.1"
+tracing-subscriber = "0.2"
+tracing-futures = "0.2"
+
+byteorder = "1"
+either = "1"
+time = "0.2"
+log = "*"
+
+tokio = { version = "1", features = ["full", "rt-multi-thread"]}
+tokio-util = { version = "0.6", features = ["codec"] }
+tokio-rustls = { version = "0.22" }
+
+bytes = { version = "1" }
+flate2 = { version = "1", features = ["tokio"] }
+webpki-roots = { version = "0.21" }
+futures-util = { version = "0.3", features = ["std"] }
+futures = { version = "0.3" }
+pretty_env_logger = { version = "0.4" }
diff --git a/examples/statetracker/src/aliasmanager.rs b/examples/statetracker/src/aliasmanager.rs
new file mode 100644
index 0000000..9247c22
--- /dev/null
+++ b/examples/statetracker/src/aliasmanager.rs
@@ -0,0 +1,85 @@
+use std::sync::Arc;
+
+use druid::widget::Label;
+use druid::{lens, Lens, Point, WidgetPod};
+use druid::{widget::Flex, Widget};
+
+use libquassel::message::objects::AliasManager;
+
+pub struct AliasManagerWidget {
+ inner: WidgetPod<Arc<AliasManager>, Box<dyn Widget<Arc<AliasManager>>>>,
+}
+
+impl AliasManagerWidget {
+ pub fn new() -> Self {
+ let widget = WidgetPod::new(Flex::column()).boxed();
+
+ AliasManagerWidget { inner: widget }
+ }
+}
+
+impl Widget<Arc<AliasManager>> for AliasManagerWidget {
+ fn event(
+ &mut self,
+ ctx: &mut druid::EventCtx,
+ event: &druid::Event,
+ data: &mut Arc<AliasManager>,
+ env: &druid::Env,
+ ) {
+ self.inner.event(ctx, event, data, env)
+ }
+
+ fn lifecycle(
+ &mut self,
+ ctx: &mut druid::LifeCycleCtx,
+ event: &druid::LifeCycle,
+ data: &Arc<AliasManager>,
+ env: &druid::Env,
+ ) {
+ self.inner.lifecycle(ctx, event, data, env)
+ }
+
+ fn update(
+ &mut self,
+ ctx: &mut druid::UpdateCtx,
+ _old_data: &Arc<AliasManager>,
+ data: &Arc<AliasManager>,
+ _env: &druid::Env,
+ ) {
+ let aliases = lens!(AliasManager, aliases);
+
+ let mut widget: Flex<Arc<AliasManager>> = Flex::column();
+
+ aliases.with(data, |aliases| {
+ for alias in aliases {
+ widget.add_child(
+ Flex::row()
+ .with_child(Label::new(alias.name.clone()))
+ .with_child(Label::new(alias.expansion.clone())),
+ )
+ }
+ });
+
+ self.inner = WidgetPod::new(widget).boxed();
+
+ ctx.children_changed();
+ ctx.request_layout();
+ ctx.request_paint();
+ }
+
+ fn layout(
+ &mut self,
+ ctx: &mut druid::LayoutCtx,
+ bc: &druid::BoxConstraints,
+ data: &Arc<AliasManager>,
+ env: &druid::Env,
+ ) -> druid::Size {
+ let size = self.inner.layout(ctx, bc, data, env);
+ self.inner.set_origin(ctx, data, env, Point::ZERO);
+ return size;
+ }
+
+ fn paint(&mut self, ctx: &mut druid::PaintCtx, data: &Arc<AliasManager>, env: &druid::Env) {
+ self.inner.paint(ctx, data, env)
+ }
+}
diff --git a/examples/statetracker/src/command.rs b/examples/statetracker/src/command.rs
new file mode 100644
index 0000000..fc221c6
--- /dev/null
+++ b/examples/statetracker/src/command.rs
@@ -0,0 +1,8 @@
+use druid::Selector;
+use libquassel::message::objects::{Alias, AliasManager};
+
+pub const CONNECT: Selector = Selector::new("connect");
+pub const ADD_MESSAGE: Selector<crate::Message> = Selector::new("add_message");
+
+pub const ALIASMANAGER_INIT: Selector<AliasManager> = Selector::new("aliasmanager_init");
+pub const ALIASMANAGER_ADD_ALIAS: Selector<Alias> = Selector::new("aliasmanager_add_alias");
diff --git a/examples/statetracker/src/connect.rs b/examples/statetracker/src/connect.rs
new file mode 100644
index 0000000..9298b15
--- /dev/null
+++ b/examples/statetracker/src/connect.rs
@@ -0,0 +1,78 @@
+use std::thread;
+
+use druid::ExtEventSink;
+use futures::StreamExt;
+use libquassel::frame::QuasselCodec;
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt},
+ net::{TcpListener, TcpStream},
+};
+use tokio_util::codec::Framed;
+use tracing::debug;
+
+use crate::{
+ server::{ClientState, Server},
+ StateTracker,
+};
+
+impl StateTracker {
+ pub fn connect(&mut self, ctx: ExtEventSink) {
+ debug!("starting connect");
+
+ let server = self.server.clone();
+
+ thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+
+ rt.block_on(async move {
+ let mut s_server = TcpStream::connect(format!(
+ "{}:{}",
+ server.settings.host, server.settings.port
+ ))
+ .await
+ .unwrap();
+
+ let _connack = server.init(&mut s_server).await.unwrap();
+
+ let codec = QuasselCodec::builder().compression(false).new_codec();
+ let framed = Framed::new(s_server, codec);
+ let (s_sink, s_stream) = framed.split();
+
+ let listener = TcpListener::bind((server.listen_host, server.listen_port))
+ .await
+ .unwrap();
+ let (mut client, _) = listener.accept().await.unwrap();
+
+ //
+ // Setup Listener
+ //
+
+ {
+ let (mut c_stream, mut c_sink) = client.split();
+
+ let mut init = [0; 12];
+ let n = c_stream.peek(&mut init).await.unwrap();
+ c_stream.read(&mut init[..n]).await.unwrap();
+ let init = libquassel::message::Init::parse(&init);
+ debug!("send init bytes: {:?}", init);
+
+ c_sink.write(&[0x0, 0x0, 0x0, 0x2]).await.unwrap();
+ }
+
+ let codec = QuasselCodec::builder().compression(false).new_codec();
+ let framed = Framed::new(client, codec);
+ let (c_sink, c_stream) = framed.split();
+
+ // Start Processing
+
+ let s_state = ClientState::Handshake;
+ let c_state = ClientState::Handshake;
+
+ tokio::join!(
+ Server::run(s_stream, c_sink, s_state, ctx.clone(), "server -> client"),
+ Server::run(c_stream, s_sink, c_state, ctx.clone(), "client -> server")
+ );
+ });
+ });
+ }
+}
diff --git a/examples/statetracker/src/formatter.rs b/examples/statetracker/src/formatter.rs
new file mode 100644
index 0000000..4ceb68f
--- /dev/null
+++ b/examples/statetracker/src/formatter.rs
@@ -0,0 +1,47 @@
+use druid::text::{Formatter, Validation, ValidationError};
+
+pub struct U16Formatter;
+
+impl Formatter<u16> for U16Formatter {
+ fn format(&self, value: &u16) -> String {
+ value.to_string()
+ }
+
+ fn validate_partial_input(&self, input: &str, _sel: &druid::text::Selection) -> Validation {
+ if input.is_empty() {
+ return Validation::success();
+ }
+
+ if input.len() > 6 {
+ return Validation::failure(U16ValidationError::WrongNumberOfCharacters);
+ }
+
+ match input.parse::<u16>() {
+ Ok(_) => Validation::success(),
+ Err(err) => Validation::failure(err),
+ }
+ }
+
+ fn value(&self, input: &str) -> Result<u16, ValidationError> {
+ if input.is_empty() || input.len() > 5 {
+ return Err(ValidationError::new(
+ U16ValidationError::WrongNumberOfCharacters,
+ ));
+ }
+
+ input.parse().map_err(|err| ValidationError::new(err))
+ }
+}
+
+#[derive(Debug, Clone)]
+pub enum U16ValidationError {
+ WrongNumberOfCharacters,
+}
+
+impl std::fmt::Display for U16ValidationError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", std::any::type_name_of_val(self))
+ }
+}
+
+impl std::error::Error for U16ValidationError {}
diff --git a/examples/statetracker/src/main.rs b/examples/statetracker/src/main.rs
new file mode 100644
index 0000000..7213000
--- /dev/null
+++ b/examples/statetracker/src/main.rs
@@ -0,0 +1,131 @@
+#![feature(type_name_of_val)]
+
+use std::sync::Arc;
+
+use aliasmanager::AliasManagerWidget;
+use druid::{
+ widget::{Align, Either, Flex, Label, List, Split},
+ AppDelegate, Command,
+};
+use druid::{AppLauncher, Data, Env, Lens, LocalizedString, Widget, WidgetExt, WindowDesc};
+use libquassel::message::objects::AliasManager;
+use tracing::debug;
+
+use crate::server::{Message, ServerWidget};
+
+const SPACING: f64 = 10.0;
+
+const VERTICAL_WIDGET_SPACING: f64 = 20.0;
+const WINDOW_TITLE: LocalizedString<StateTracker> = LocalizedString::new("StateTracker");
+
+mod command;
+mod connect;
+mod formatter;
+mod server;
+
+mod aliasmanager;
+
+#[derive(Clone, Data, Lens)]
+struct StateTracker {
+ server: server::Server,
+ messages: Arc<Vec<server::Message>>,
+ alias_manager: Arc<AliasManager>,
+ connected: bool,
+ test: i32,
+}
+
+impl StateTracker {
+ fn new() -> StateTracker {
+ StateTracker {
+ server: server::Server::default(),
+ messages: Arc::new(Vec::new()),
+ alias_manager: Arc::new(AliasManager {
+ aliases: Vec::new(),
+ }),
+ connected: false,
+ test: 0,
+ }
+ }
+
+ fn widget() -> impl Widget<StateTracker> {
+ let either = Either::new(
+ |server, _env| server.connected,
+ Split::columns(
+ Flex::column()
+ .with_child(Label::new("AliasManager"))
+ .with_child(AliasManagerWidget::new().lens(StateTracker::alias_manager)),
+ List::new(|| {
+ Label::new(|item: &Message, _env: &_| format!("{:#?}", item)).padding(10.0)
+ })
+ .scroll()
+ .vertical()
+ .lens(StateTracker::messages),
+ )
+ .expand(),
+ ServerWidget::new()
+ .fix_width(200.0)
+ .lens(StateTracker::server),
+ );
+
+ let layout = Flex::column()
+ .with_flex_child(either, 1.0)
+ .with_spacer(VERTICAL_WIDGET_SPACING);
+
+ Align::centered(layout)
+ }
+}
+
+struct StateTrackerDelegate;
+impl AppDelegate<StateTracker> for StateTrackerDelegate {
+ fn command(
+ &mut self,
+ ctx: &mut druid::DelegateCtx,
+ _target: druid::Target,
+ cmd: &Command,
+ data: &mut StateTracker,
+ _env: &Env,
+ ) -> druid::Handled {
+ if let Some(_) = cmd.get(command::CONNECT) {
+ debug!("got CONNECT command");
+
+ data.connect(ctx.get_external_handle());
+ data.connected = true;
+
+ return druid::Handled::Yes;
+ } else if let Some(msg) = cmd.get(command::ADD_MESSAGE) {
+ debug!("got ADD_MESSAGE command");
+
+ let list = Arc::make_mut(&mut data.messages);
+ list.push(msg.clone());
+ } else if let Some(alias) = cmd.get(command::ALIASMANAGER_ADD_ALIAS) {
+ let mut alias_manager = Arc::make_mut(&mut data.alias_manager).clone();
+ alias_manager.add_alias(alias.to_owned());
+ data.alias_manager = Arc::new(alias_manager);
+ } else if let Some(alias_manager) = cmd.get(command::ALIASMANAGER_INIT) {
+ data.alias_manager = Arc::new(alias_manager.to_owned());
+ }
+
+ druid::Handled::No
+ }
+}
+
+fn main() {
+ // pretty_env_logger::init();
+
+ // describe the main window
+ let main_window = WindowDesc::new(StateTracker::widget())
+ .title(WINDOW_TITLE)
+ .window_size((400.0, 400.0));
+
+ // create the initial app state
+ let initial_state = StateTracker::new();
+
+ // let state = Arc::new(RwLock::new(initial_state));
+
+ // start the application
+ AppLauncher::with_window(main_window)
+ .log_to_console()
+ .delegate(StateTrackerDelegate)
+ .launch(initial_state)
+ .expect("Failed to launch application");
+}
diff --git a/examples/statetracker/src/server.rs b/examples/statetracker/src/server.rs
new file mode 100644
index 0000000..ac9c550
--- /dev/null
+++ b/examples/statetracker/src/server.rs
@@ -0,0 +1,325 @@
+use anyhow::{bail, Error};
+
+use druid::{
+ widget::{
+ Align, Button, Checkbox, Container, Controller, ControllerHost, Flex, Label, TextBox,
+ },
+ Command, Data, ExtEventSink, Lens, Target, Widget, WidgetExt,
+};
+
+use libquassel::{
+ deserialize::Deserialize,
+ frame::QuasselCodec,
+ message::{self, objects, ConnAck, HandshakeMessage, Init},
+};
+
+use futures::{
+ stream::{SplitSink, SplitStream},
+ SinkExt, StreamExt,
+};
+use tokio::{
+ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
+ net::TcpStream,
+};
+use tokio_util::codec::Framed;
+
+use tracing::{debug, info, trace};
+
+use crate::{command, formatter};
+
+#[derive(Clone, Debug, Data, Lens)]
+pub struct ServerSettings {
+ pub tls: bool,
+ pub compression: bool,
+ pub host: String,
+ pub port: u16,
+ pub username: String,
+ pub password: String,
+}
+
+impl Default for ServerSettings {
+ fn default() -> Self {
+ ServerSettings {
+ tls: false,
+ compression: false,
+ host: String::from("localhost"),
+ port: 4242,
+ username: String::default(),
+ password: String::default(),
+ }
+ }
+}
+
+#[derive(Clone, Data, Lens)]
+pub struct Server {
+ pub server_name: String,
+ pub listen_port: u16,
+ pub listen_host: String,
+ pub settings: ServerSettings,
+}
+
+#[derive(Debug)]
+pub enum ClientState {
+ Handshake,
+ Connected,
+}
+
+#[derive(Clone, Debug)]
+pub enum Message {
+ Handshake(HandshakeMessage),
+ SignalProxy(message::Message),
+}
+
+impl Data for Message {
+ fn same(&self, other: &Self) -> bool {
+ if let Self::Handshake(_) = self {
+ if let Self::Handshake(_) = other {
+ return true;
+ } else {
+ return false;
+ }
+ } else if let Self::SignalProxy(_) = self {
+ if let Self::SignalProxy(_) = other {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+}
+
+impl Server {
+ pub async fn init(
+ &self,
+ stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
+ ) -> Result<ConnAck, Error> {
+ let init = Init::new()
+ .tls(self.settings.tls)
+ .compression(self.settings.compression);
+
+ stream.write(&init.serialize()).await?;
+
+ let mut buf = [0; 4];
+ stream.read(&mut buf).await?;
+
+ let (_, connack) = ConnAck::parse(&buf).unwrap();
+ Ok(connack)
+ }
+
+ pub async fn run(
+ mut stream: SplitStream<Framed<TcpStream, QuasselCodec>>,
+ mut sink: SplitSink<Framed<TcpStream, QuasselCodec>, Vec<u8>>,
+ mut state: ClientState,
+ ctx: ExtEventSink,
+ direction: &str,
+ ) {
+ // Start event loop
+ while let Some(msg) = stream.next().await {
+ let msg = msg.unwrap();
+ sink.send(msg.to_vec()).await.unwrap();
+ let msg = match state {
+ ClientState::Handshake => {
+ Server::handle_login_message(&msg, &mut state, direction, ctx.clone())
+ .await
+ .unwrap()
+ }
+ ClientState::Connected => Server::handle_message(&msg, direction, ctx.clone())
+ .await
+ .unwrap(),
+ };
+
+ ctx.submit_command(command::ADD_MESSAGE, msg, Target::Global)
+ .unwrap();
+ }
+ }
+
+ async fn handle_login_message(
+ buf: &[u8],
+ state: &mut ClientState,
+ direction: &str,
+ _ctx: ExtEventSink,
+ ) -> Result<Message, Error> {
+ use libquassel::HandshakeDeserialize;
+
+ trace!(target: "handshakemessage", "Received bytes: {:x?}", buf);
+ match HandshakeMessage::parse(buf) {
+ Ok((_size, res)) => {
+ info!("{}: {:#?}", direction, res);
+
+ match res {
+ HandshakeMessage::SessionInit(_) => *state = ClientState::Connected,
+ HandshakeMessage::ClientLogin(_) => *state = ClientState::Connected,
+ _ => {}
+ }
+
+ return Ok(Message::Handshake(res));
+ }
+ Err(e) => bail!("failed to parse handshake message {}", e),
+ }
+ }
+
+ async fn handle_message(
+ buf: &[u8],
+ _direction: &str,
+ ctx: ExtEventSink,
+ ) -> Result<Message, Error> {
+ use libquassel::deserialize::*;
+ trace!(target: "message", "Received bytes: {:x?}", buf);
+
+ match message::Message::parse(buf) {
+ Ok((_size, res)) => {
+ #[allow(unused_variables)]
+ match &res {
+ message::Message::SyncMessage(msg) => (),
+ message::Message::RpcCall(msg) => (),
+ message::Message::InitRequest(msg) => (),
+ message::Message::InitData(msg) => match &msg.init_data {
+ objects::Types::AliasManager(alias_manager) => ctx
+ .submit_command(
+ command::ALIASMANAGER_INIT,
+ alias_manager.clone(),
+ Target::Global,
+ )
+ .unwrap(),
+ _ => (),
+ },
+ message::Message::HeartBeat(msg) => (),
+ message::Message::HeartBeatReply(msg) => (),
+ }
+
+ return Ok(Message::SignalProxy(res));
+ }
+ Err(e) => {
+ bail!("failed to parse message {}", e);
+ }
+ }
+ }
+}
+
+impl Default for Server {
+ fn default() -> Self {
+ Server {
+ server_name: String::default(),
+ listen_port: 4243,
+ listen_host: String::from("localhost"),
+ settings: ServerSettings::default(),
+ }
+ }
+}
+
+impl std::fmt::Debug for Server {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut fmt = f.debug_struct("Server");
+ fmt.field("settings", &self.settings);
+ fmt.field("name", &self.server_name).finish()
+ }
+}
+
+pub struct ServerWidget {}
+
+impl ServerWidget {
+ pub fn new() -> ControllerHost<impl Widget<Server>, ServerWidget> {
+ let layout = Flex::column()
+ .with_child(Label::new("Connect").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Server Name")
+ .expand_width()
+ .lens(Server::server_name),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Container::new(
+ Flex::column()
+ .with_child(
+ Flex::row()
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Host")
+ .expand_width()
+ .lens(ServerSettings::host),
+ 2.0,
+ )
+ .with_spacer(crate::SPACING)
+ .with_flex_child(
+ TextBox::new()
+ .with_formatter(formatter::U16Formatter)
+ .lens(ServerSettings::port),
+ 1.0,
+ )
+ .expand_width(),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Flex::row()
+ .with_child(Checkbox::new("TLS").lens(ServerSettings::tls))
+ .with_flex_spacer(1.0)
+ .with_child(
+ Checkbox::new("Compression").lens(ServerSettings::compression),
+ ),
+ )
+ .with_spacer(crate::SPACING * 2.0)
+ .with_child(Label::new("Login").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Username")
+ .expand_width()
+ .lens(ServerSettings::username),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ TextBox::new()
+ .with_placeholder("Password")
+ .expand_width()
+ .lens(ServerSettings::password),
+ ),
+ )
+ .lens(Server::settings),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(Label::new("Listen").align_left())
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Flex::row()
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Address")
+ .expand_width()
+ .lens(Server::listen_host),
+ 2.0,
+ )
+ .with_spacer(crate::SPACING)
+ .with_flex_child(
+ TextBox::new()
+ .with_placeholder("Port")
+ .with_formatter(formatter::U16Formatter)
+ .lens(Server::listen_port),
+ 1.0,
+ )
+ .expand_width(),
+ )
+ .with_spacer(crate::SPACING)
+ .with_child(
+ Button::new("Connect")
+ .on_click(move |ctx, _, _| {
+ debug!("connect button pressed, sending command");
+ ctx.submit_command(Command::new(
+ command::CONNECT,
+ (),
+ druid::Target::Global,
+ ))
+ })
+ .align_right(),
+ );
+
+ let widget = Align::centered(layout);
+
+ ControllerHost::new(widget, ServerWidget {})
+ }
+}
+
+impl<T, W: Widget<T>> Controller<T, W> for ServerWidget {}