aboutsummaryrefslogtreecommitdiff
path: root/examples/statetracker/src/connect.rs
blob: 9298b154e737d6d8c40380989dfad9e2e88c53d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::thread;

use druid::ExtEventSink;
use futures::StreamExt;
use libquassel::frame::QuasselCodec;
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream},
};
use tokio_util::codec::Framed;
use tracing::debug;

use crate::{
    server::{ClientState, Server},
    StateTracker,
};

impl StateTracker {
    pub fn connect(&mut self, ctx: ExtEventSink) {
        debug!("starting connect");

        let server = self.server.clone();

        thread::spawn(move || {
            let rt = tokio::runtime::Runtime::new().unwrap();

            rt.block_on(async move {
                let mut s_server = TcpStream::connect(format!(
                    "{}:{}",
                    server.settings.host, server.settings.port
                ))
                .await
                .unwrap();

                let _connack = server.init(&mut s_server).await.unwrap();

                let codec = QuasselCodec::builder().compression(false).new_codec();
                let framed = Framed::new(s_server, codec);
                let (s_sink, s_stream) = framed.split();

                let listener = TcpListener::bind((server.listen_host, server.listen_port))
                    .await
                    .unwrap();
                let (mut client, _) = listener.accept().await.unwrap();

                //
                // Setup Listener
                //

                {
                    let (mut c_stream, mut c_sink) = client.split();

                    let mut init = [0; 12];
                    let n = c_stream.peek(&mut init).await.unwrap();
                    c_stream.read(&mut init[..n]).await.unwrap();
                    let init = libquassel::message::Init::parse(&init);
                    debug!("send init bytes: {:?}", init);

                    c_sink.write(&[0x0, 0x0, 0x0, 0x2]).await.unwrap();
                }

                let codec = QuasselCodec::builder().compression(false).new_codec();
                let framed = Framed::new(client, codec);
                let (c_sink, c_stream) = framed.split();

                // Start Processing

                let s_state = ClientState::Handshake;
                let c_state = ClientState::Handshake;

                tokio::join!(
                    Server::run(s_stream, c_sink, s_state, ctx.clone(), "server -> client"),
                    Server::run(c_stream, s_sink, c_state, ctx.clone(), "client -> server")
                );
            });
        });
    }
}