1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
use super::*;
use log::error;
use std::marker::{Send, Sync};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
impl<'a, NetworkBlock, FrontierBlock, TF, NCOps>
NetworkMessageProcessor<NetworkBlock, FrontierBlock, TF, NCOps>
where
TF: TransitionFrontier<Block = FrontierBlock>,
NCOps: NonConsensusNetworkingOps<Block = NetworkBlock>,
NetworkBlock: Send + Sync,
FrontierBlock: TryFrom<NetworkBlock>,
<FrontierBlock as TryFrom<NetworkBlock>>::Error: std::fmt::Display,
{
pub fn new(mut transition_frontier: TF, mut nonconsensus_ops: NCOps) -> Self {
let (sender, block_receiver) = mpsc::channel(1);
nonconsensus_ops.set_block_responder(sender);
let (sender, query_block_request_receiver) = mpsc::channel::<QueryBlockRequest>(1);
transition_frontier.set_block_requester(sender);
Self {
transition_frontier: RwLock::new(transition_frontier),
nonconsensus_ops: RwLock::new(nonconsensus_ops),
block_receiver: RwLock::new(block_receiver),
query_block_request_receiver: RwLock::new(query_block_request_receiver),
}
}
pub async fn transition_frontier(&'a self) -> RwLockReadGuard<'a, TF> {
self.transition_frontier.read().await
}
pub async fn nonconsensus_ops(&'a self) -> RwLockReadGuard<'a, NCOps> {
self.nonconsensus_ops.read().await
}
pub async fn nonconsensus_ops_mut(&'a self) -> RwLockWriteGuard<'a, NCOps> {
self.nonconsensus_ops.write().await
}
pub async fn run(&self) {
tokio::join!(self.run_recv_block_loop(), self.run_query_block_loop());
}
async fn run_recv_block_loop(&self) {
let mut block_receiver = self.block_receiver.write().await;
while let Some(block) = block_receiver.recv().await {
let mut transition_frontier = self.transition_frontier.write().await;
match block.try_into() {
Ok(block) => {
if let Err(err) = transition_frontier.add_block(block).await {
error!("{err}");
}
}
Err(err) => error!("{err}"),
}
}
}
async fn run_query_block_loop(&self) {
let mut query_block_request_receiver = self.query_block_request_receiver.write().await;
while let Some(request) = query_block_request_receiver.recv().await {
let mut nonconsensus_ops = self.nonconsensus_ops.write().await;
if let Err(err) = nonconsensus_ops.query_block(&request).await {
error!("{err}");
}
}
}
}