1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::error::{self, Error};
use crate::rpc;
use crate::{BatchTransport, RequestId, Transport};
use futures::channel::oneshot;
use futures::{
task::{Context, Poll},
Future, FutureExt,
};
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
type Pending = oneshot::Sender<error::Result<rpc::Value>>;
type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;
#[derive(Debug, Clone)]
pub struct Batch<T> {
transport: T,
pending: PendingRequests,
batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
}
impl<T> Batch<T>
where
T: BatchTransport,
{
pub fn new(transport: T) -> Self {
Batch {
transport,
pending: Default::default(),
batch: Default::default(),
}
}
pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
let batch = mem::replace(&mut *self.batch.lock(), vec![]);
let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();
let batch = self.transport.send_batch(batch);
let pending = self.pending.clone();
async move {
let res = batch.await;
let mut pending = pending.lock();
for (idx, request_id) in ids.into_iter().enumerate() {
if let Some(rx) = pending.remove(&request_id) {
let _ = match res {
Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
Err(ref err) => rx.send(Err(err.clone())),
_ => rx.send(Err(Error::Internal)),
};
}
}
res
}
}
}
impl<T> Transport for Batch<T>
where
T: BatchTransport,
{
type Out = SingleResult;
fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
self.transport.prepare(method, params)
}
fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
let (tx, rx) = oneshot::channel();
self.pending.lock().insert(id, tx);
self.batch.lock().push((id, request));
SingleResult(rx)
}
}
pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);
impl Future for SingleResult {
type Output = error::Result<rpc::Value>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
}
}