diff options
| author | Max Audron <audron@cocaine.farm> | 2021-08-20 14:11:54 +0200 |
|---|---|---|
| committer | Max Audron <audron@cocaine.farm> | 2021-09-28 17:59:22 +0200 |
| commit | 04de7e03c5ff792ba5b0a4df6ecbde18c6f25ad9 (patch) | |
| tree | fdbdf8b4dd2f9d52ab7d9da71e277862d85f1d4f /examples/statetracker/src/connect.rs | |
| parent | rename override_type to type (diff) | |
statetracker: first iteration of working rpc object
Diffstat (limited to 'examples/statetracker/src/connect.rs')
| -rw-r--r-- | examples/statetracker/src/connect.rs | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/examples/statetracker/src/connect.rs b/examples/statetracker/src/connect.rs new file mode 100644 index 0000000..9298b15 --- /dev/null +++ b/examples/statetracker/src/connect.rs @@ -0,0 +1,78 @@ +use std::thread; + +use druid::ExtEventSink; +use futures::StreamExt; +use libquassel::frame::QuasselCodec; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; +use tokio_util::codec::Framed; +use tracing::debug; + +use crate::{ + server::{ClientState, Server}, + StateTracker, +}; + +impl StateTracker { + pub fn connect(&mut self, ctx: ExtEventSink) { + debug!("starting connect"); + + let server = self.server.clone(); + + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async move { + let mut s_server = TcpStream::connect(format!( + "{}:{}", + server.settings.host, server.settings.port + )) + .await + .unwrap(); + + let _connack = server.init(&mut s_server).await.unwrap(); + + let codec = QuasselCodec::builder().compression(false).new_codec(); + let framed = Framed::new(s_server, codec); + let (s_sink, s_stream) = framed.split(); + + let listener = TcpListener::bind((server.listen_host, server.listen_port)) + .await + .unwrap(); + let (mut client, _) = listener.accept().await.unwrap(); + + // + // Setup Listener + // + + { + let (mut c_stream, mut c_sink) = client.split(); + + let mut init = [0; 12]; + let n = c_stream.peek(&mut init).await.unwrap(); + c_stream.read(&mut init[..n]).await.unwrap(); + let init = libquassel::message::Init::parse(&init); + debug!("send init bytes: {:?}", init); + + c_sink.write(&[0x0, 0x0, 0x0, 0x2]).await.unwrap(); + } + + let codec = QuasselCodec::builder().compression(false).new_codec(); + let framed = Framed::new(client, codec); + let (c_sink, c_stream) = framed.split(); + + // Start Processing + + let s_state = ClientState::Handshake; + let c_state = ClientState::Handshake; + + tokio::join!( + Server::run(s_stream, c_sink, s_state, ctx.clone(), "server -> client"), + Server::run(c_stream, s_sink, c_state, ctx.clone(), "client -> server") + ); + }); + }); + } +} |
