1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//! Batching Transport

use crate::error::{self, Error};
use crate::rpc;
use crate::{BatchTransport, RequestId, Transport};
use futures::channel::oneshot;
use futures::{
    task::{Context, Poll},
    Future, FutureExt,
};
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;

type Pending = oneshot::Sender<error::Result<rpc::Value>>;
type PendingRequests = Arc<Mutex<BTreeMap<RequestId, Pending>>>;

/// Transport allowing to batch queries together.
#[derive(Debug, Clone)]
pub struct Batch<T> {
    transport: T,
    pending: PendingRequests,
    batch: Arc<Mutex<Vec<(RequestId, rpc::Call)>>>,
}

impl<T> Batch<T>
where
    T: BatchTransport,
{
    /// Creates new Batch transport given existing transport supporing batch requests.
    pub fn new(transport: T) -> Self {
        Batch {
            transport,
            pending: Default::default(),
            batch: Default::default(),
        }
    }

    /// Sends all requests as a batch.
    pub fn submit_batch(&self) -> impl Future<Output = error::Result<Vec<error::Result<rpc::Value>>>> {
        let batch = mem::replace(&mut *self.batch.lock(), vec![]);
        let ids = batch.iter().map(|&(id, _)| id).collect::<Vec<_>>();

        let batch = self.transport.send_batch(batch);
        let pending = self.pending.clone();

        async move {
            let res = batch.await;
            let mut pending = pending.lock();
            for (idx, request_id) in ids.into_iter().enumerate() {
                if let Some(rx) = pending.remove(&request_id) {
                    // Ignore sending error
                    let _ = match res {
                        Ok(ref results) if results.len() > idx => rx.send(results[idx].clone()),
                        Err(ref err) => rx.send(Err(err.clone())),
                        _ => rx.send(Err(Error::Internal)),
                    };
                }
            }
            res
        }
    }
}

impl<T> Transport for Batch<T>
where
    T: BatchTransport,
{
    type Out = SingleResult;

    fn prepare(&self, method: &str, params: Vec<rpc::Value>) -> (RequestId, rpc::Call) {
        self.transport.prepare(method, params)
    }

    fn send(&self, id: RequestId, request: rpc::Call) -> Self::Out {
        let (tx, rx) = oneshot::channel();
        self.pending.lock().insert(id, tx);
        self.batch.lock().push((id, request));

        SingleResult(rx)
    }
}

/// Result of calling a single method that will be part of the batch.
/// Converts `oneshot::Receiver` error into `Error::Internal`
pub struct SingleResult(oneshot::Receiver<error::Result<rpc::Value>>);

impl Future for SingleResult {
    type Output = error::Result<rpc::Value>;

    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        Poll::Ready(ready!(self.0.poll_unpin(ctx)).map_err(|_| Error::Internal)?)
    }
}