aboutsummaryrefslogtreecommitdiff
path: root/examples/statetracker/src/connect.rs
diff options
context:
space:
mode:
authorMax Audron <audron@cocaine.farm>2021-08-20 14:11:54 +0200
committerMax Audron <audron@cocaine.farm>2021-09-28 18:15:50 +0200
commit8dba16270a31e8574aada487b2b14c27f20c63eb (patch)
tree2aeaf3e36068d870b3143b85458b5ca57885bdcc /examples/statetracker/src/connect.rs
parentrename override_type to type (diff)
statetracker: first iteration of working rpc object
Diffstat (limited to 'examples/statetracker/src/connect.rs')
-rw-r--r--examples/statetracker/src/connect.rs78
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/statetracker/src/connect.rs b/examples/statetracker/src/connect.rs
new file mode 100644
index 0000000..9298b15
--- /dev/null
+++ b/examples/statetracker/src/connect.rs
@@ -0,0 +1,78 @@
+use std::thread;
+
+use druid::ExtEventSink;
+use futures::StreamExt;
+use libquassel::frame::QuasselCodec;
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt},
+ net::{TcpListener, TcpStream},
+};
+use tokio_util::codec::Framed;
+use tracing::debug;
+
+use crate::{
+ server::{ClientState, Server},
+ StateTracker,
+};
+
+impl StateTracker {
+ pub fn connect(&mut self, ctx: ExtEventSink) {
+ debug!("starting connect");
+
+ let server = self.server.clone();
+
+ thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+
+ rt.block_on(async move {
+ let mut s_server = TcpStream::connect(format!(
+ "{}:{}",
+ server.settings.host, server.settings.port
+ ))
+ .await
+ .unwrap();
+
+ let _connack = server.init(&mut s_server).await.unwrap();
+
+ let codec = QuasselCodec::builder().compression(false).new_codec();
+ let framed = Framed::new(s_server, codec);
+ let (s_sink, s_stream) = framed.split();
+
+ let listener = TcpListener::bind((server.listen_host, server.listen_port))
+ .await
+ .unwrap();
+ let (mut client, _) = listener.accept().await.unwrap();
+
+ //
+ // Setup Listener
+ //
+
+ {
+ let (mut c_stream, mut c_sink) = client.split();
+
+ let mut init = [0; 12];
+ let n = c_stream.peek(&mut init).await.unwrap();
+ c_stream.read(&mut init[..n]).await.unwrap();
+ let init = libquassel::message::Init::parse(&init);
+ debug!("send init bytes: {:?}", init);
+
+ c_sink.write(&[0x0, 0x0, 0x0, 0x2]).await.unwrap();
+ }
+
+ let codec = QuasselCodec::builder().compression(false).new_codec();
+ let framed = Framed::new(client, codec);
+ let (c_sink, c_stream) = framed.split();
+
+ // Start Processing
+
+ let s_state = ClientState::Handshake;
+ let c_state = ClientState::Handshake;
+
+ tokio::join!(
+ Server::run(s_stream, c_sink, s_state, ctx.clone(), "server -> client"),
+ Server::run(c_stream, s_sink, c_state, ctx.clone(), "client -> server")
+ );
+ });
+ });
+ }
+}