1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
//! `Eth` namespace, subscriptions

use crate::api::Namespace;
use crate::helpers;
use crate::types::{BlockHeader, Filter, Log, SyncState, H256};
use crate::{error, DuplexTransport};
use futures::{
    task::{Context, Poll},
    Stream,
};
use pin_project::{pin_project, pinned_drop};
use std::marker::PhantomData;
use std::pin::Pin;

/// `Eth` namespace, subscriptions
#[derive(Debug, Clone)]
pub struct EthSubscribe<T> {
    transport: T,
}

impl<T: DuplexTransport> Namespace<T> for EthSubscribe<T> {
    fn new(transport: T) -> Self
    where
        Self: Sized,
    {
        EthSubscribe { transport }
    }

    fn transport(&self) -> &T {
        &self.transport
    }
}

/// ID of subscription returned from `eth_subscribe`
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub struct SubscriptionId(String);

impl From<String> for SubscriptionId {
    fn from(s: String) -> Self {
        SubscriptionId(s)
    }
}

/// Stream of notifications from a subscription
/// Given a type deserializable from rpc::Value and a subscription id, yields items of that type as
/// notifications are delivered.
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct SubscriptionStream<T: DuplexTransport, I> {
    transport: T,
    id: SubscriptionId,
    #[pin]
    rx: T::NotificationStream,
    _marker: PhantomData<I>,
}

impl<T: DuplexTransport, I> SubscriptionStream<T, I> {
    fn new(transport: T, id: SubscriptionId) -> error::Result<Self> {
        let rx = transport.subscribe(id.clone())?;
        Ok(SubscriptionStream {
            transport,
            id,
            rx,
            _marker: PhantomData,
        })
    }

    /// Return the ID of this subscription
    pub fn id(&self) -> &SubscriptionId {
        &self.id
    }

    /// Unsubscribe from the event represented by this stream
    pub async fn unsubscribe(self) -> error::Result<bool> {
        let &SubscriptionId(ref id) = &self.id;
        let id = helpers::serialize(&id);
        let response = self.transport.execute("eth_unsubscribe", vec![id]).await?;
        helpers::decode(response)
    }
}

impl<T, I> Stream for SubscriptionStream<T, I>
where
    T: DuplexTransport,
    I: serde::de::DeserializeOwned,
{
    type Item = error::Result<I>;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let x = ready!(this.rx.poll_next(ctx));
        Poll::Ready(x.map(|result| serde_json::from_value(result).map_err(Into::into)))
    }
}

#[pinned_drop]
impl<T, I> PinnedDrop for SubscriptionStream<T, I>
where
    T: DuplexTransport,
{
    fn drop(self: Pin<&mut Self>) {
        let _ = self.transport.unsubscribe(self.id().clone());
    }
}

impl<T: DuplexTransport> EthSubscribe<T> {
    /// Create a new heads subscription
    pub async fn subscribe_new_heads(&self) -> error::Result<SubscriptionStream<T, BlockHeader>> {
        let subscription = helpers::serialize(&&"newHeads");
        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
        let id: String = helpers::decode(response)?;
        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
    }

    /// Create a logs subscription
    pub async fn subscribe_logs(&self, filter: Filter) -> error::Result<SubscriptionStream<T, Log>> {
        let subscription = helpers::serialize(&&"logs");
        let filter = helpers::serialize(&filter);
        let response = self
            .transport
            .execute("eth_subscribe", vec![subscription, filter])
            .await?;
        let id: String = helpers::decode(response)?;
        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
    }

    /// Create a pending transactions subscription
    pub async fn subscribe_new_pending_transactions(&self) -> error::Result<SubscriptionStream<T, H256>> {
        let subscription = helpers::serialize(&&"newPendingTransactions");
        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
        let id: String = helpers::decode(response)?;
        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
    }

    /// Create a sync status subscription
    pub async fn subscribe_syncing(&self) -> error::Result<SubscriptionStream<T, SyncState>> {
        let subscription = helpers::serialize(&&"syncing");
        let response = self.transport.execute("eth_subscribe", vec![subscription]).await?;
        let id: String = helpers::decode(response)?;
        SubscriptionStream::new(self.transport.clone(), SubscriptionId(id))
    }
}