use futures::{stream, Stream, TryStreamExt};
use futures_timer::Delay;
use serde::de::DeserializeOwned;
use std::marker::PhantomData;
use std::time::Duration;
use std::{fmt, vec};
use crate::api::Namespace;
use crate::helpers;
use crate::types::{Filter, Log, H256};
use crate::{error, rpc, Transport};
fn filter_stream<T: Transport, I: DeserializeOwned>(
    base: BaseFilter<T, I>,
    poll_interval: Duration,
) -> impl Stream<Item = error::Result<I>> {
    let id = helpers::serialize(&base.id);
    stream::unfold((base, id), move |state| async move {
        let (base, id) = state;
        Delay::new(poll_interval).await;
        let response = base.transport.execute("eth_getFilterChanges", vec![id.clone()]).await;
        let items: error::Result<Option<Vec<I>>> = response.and_then(helpers::decode);
        let items = items.map(Option::unwrap_or_default);
        Some((items, (base, id)))
    })
    
    .map_ok(|items| stream::iter(items.into_iter().map(Ok)))
    .try_flatten()
    .into_stream()
}
trait FilterInterface {
    
    type Output;
    
    fn constructor() -> &'static str;
}
#[derive(Debug)]
struct LogsFilter;
impl FilterInterface for LogsFilter {
    type Output = Log;
    fn constructor() -> &'static str {
        "eth_newFilter"
    }
}
#[derive(Debug)]
struct BlocksFilter;
impl FilterInterface for BlocksFilter {
    type Output = H256;
    fn constructor() -> &'static str {
        "eth_newBlockFilter"
    }
}
#[derive(Debug)]
struct PendingTransactionsFilter;
impl FilterInterface for PendingTransactionsFilter {
    type Output = H256;
    fn constructor() -> &'static str {
        "eth_newPendingTransactionFilter"
    }
}
pub struct BaseFilter<T: Transport, I> {
    
    id: String,
    transport: T,
    item: PhantomData<I>,
}
impl<T: Transport, I: 'static> fmt::Debug for BaseFilter<T, I> {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("BaseFilter")
            .field("id", &self.id)
            .field("transport", &self.transport)
            .field("item", &std::any::TypeId::of::<I>())
            .finish()
    }
}
impl<T: Transport, I> Clone for BaseFilter<T, I> {
    fn clone(&self) -> Self {
        BaseFilter {
            id: self.id.clone(),
            transport: self.transport.clone(),
            item: PhantomData::default(),
        }
    }
}
impl<T: Transport, I> BaseFilter<T, I> {
    
    pub async fn uninstall(self) -> error::Result<bool>
    where
        Self: Sized,
    {
        let id = helpers::serialize(&self.id);
        let response = self.transport.execute("eth_uninstallFilter", vec![id]).await?;
        helpers::decode(response)
    }
    
    pub fn transport(&self) -> &T {
        &self.transport
    }
}
impl<T: Transport, I: DeserializeOwned> BaseFilter<T, I> {
    
    
    pub async fn poll(&self) -> error::Result<Option<Vec<I>>> {
        let id = helpers::serialize(&self.id);
        let response = self.transport.execute("eth_getFilterChanges", vec![id]).await?;
        helpers::decode(response)
    }
    
    pub fn stream(self, poll_interval: Duration) -> impl Stream<Item = error::Result<I>> {
        filter_stream(self, poll_interval)
    }
}
impl<T: Transport> BaseFilter<T, Log> {
    
    pub async fn logs(&self) -> error::Result<Vec<Log>> {
        let id = helpers::serialize(&self.id);
        let response = self.transport.execute("eth_getFilterLogs", vec![id]).await?;
        helpers::decode(response)
    }
}
async fn create_filter<T: Transport, F: FilterInterface>(
    transport: T,
    arg: Vec<rpc::Value>,
) -> error::Result<BaseFilter<T, F::Output>> {
    let response = transport.execute(F::constructor(), arg).await?;
    let id = helpers::decode(response)?;
    Ok(BaseFilter {
        id,
        transport,
        item: PhantomData,
    })
}
#[derive(Debug, Clone)]
pub struct EthFilter<T> {
    transport: T,
}
impl<T: Transport> Namespace<T> for EthFilter<T> {
    fn new(transport: T) -> Self
    where
        Self: Sized,
    {
        EthFilter { transport }
    }
    fn transport(&self) -> &T {
        &self.transport
    }
}
impl<T: Transport> EthFilter<T> {
    
    pub async fn create_logs_filter(self, filter: Filter) -> error::Result<BaseFilter<T, Log>> {
        let f = helpers::serialize(&filter);
        create_filter::<_, LogsFilter>(self.transport, vec![f]).await
    }
    
    pub async fn create_blocks_filter(self) -> error::Result<BaseFilter<T, H256>> {
        create_filter::<_, BlocksFilter>(self.transport, vec![]).await
    }
    
    pub async fn create_pending_transactions_filter(self) -> error::Result<BaseFilter<T, H256>> {
        create_filter::<_, PendingTransactionsFilter>(self.transport, vec![]).await
    }
}
#[cfg(test)]
mod tests {
    use super::EthFilter;
    use crate::api::Namespace;
    use crate::rpc::Value;
    use crate::transports::test::TestTransport;
    use crate::types::{Address, FilterBuilder, Log, H256};
    use futures::stream::StreamExt;
    use hex_literal::hex;
    use std::time::Duration;
    #[test]
    fn logs_filter() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        {
            let eth = EthFilter::new(&transport);
            
            let filter = FilterBuilder::default().limit(10).build();
            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
        };
        
        transport.assert_request("eth_newFilter", &[r#"{"limit":10}"#.into()]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn logs_filter_get_logs() {
        
        let log = Log {
            address: Address::from_low_u64_be(1),
            topics: vec![],
            data: hex!("").into(),
            block_hash: Some(H256::from_low_u64_be(2)),
            block_number: Some(1.into()),
            transaction_hash: Some(H256::from_low_u64_be(3)),
            transaction_index: Some(0.into()),
            log_index: Some(0.into()),
            transaction_log_index: Some(0.into()),
            log_type: Some("mined".into()),
            removed: None,
        };
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
        let result = {
            let eth = EthFilter::new(&transport);
            
            let filter = FilterBuilder::default()
                .topics(None, Some(vec![H256::from_low_u64_be(2)]), None, None)
                .build();
            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
            futures::executor::block_on(filter.logs())
        };
        
        assert_eq!(result, Ok(vec![log]));
        transport.assert_request(
            "eth_newFilter",
            &[r#"{"topics":[null,"0x0000000000000000000000000000000000000000000000000000000000000002"]}"#.into()],
        );
        transport.assert_request("eth_getFilterLogs", &[r#""0x123""#.into()]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn logs_filter_poll() {
        
        let log = Log {
            address: Address::from_low_u64_be(1),
            topics: vec![],
            data: hex!("").into(),
            block_hash: Some(H256::from_low_u64_be(2)),
            block_number: Some(1.into()),
            transaction_hash: Some(H256::from_low_u64_be(3)),
            transaction_index: Some(0.into()),
            log_index: Some(0.into()),
            transaction_log_index: Some(0.into()),
            log_type: Some("mined".into()),
            removed: None,
        };
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        transport.add_response(Value::Array(vec![serde_json::to_value(&log).unwrap()]));
        let result = {
            let eth = EthFilter::new(&transport);
            
            let filter = FilterBuilder::default()
                .address(vec![Address::from_low_u64_be(2)])
                .build();
            let filter = futures::executor::block_on(eth.create_logs_filter(filter)).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
            futures::executor::block_on(filter.poll())
        };
        
        assert_eq!(result, Ok(Some(vec![log])));
        transport.assert_request(
            "eth_newFilter",
            &[r#"{"address":"0x0000000000000000000000000000000000000002"}"#.into()],
        );
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn blocks_filter() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        {
            let eth = EthFilter::new(&transport);
            
            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
        };
        
        transport.assert_request("eth_newBlockFilter", &[]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn blocks_filter_poll() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        transport.add_response(Value::Array(vec![Value::String(
            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
        )]));
        let result = {
            let eth = EthFilter::new(&transport);
            
            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
            futures::executor::block_on(filter.poll())
        };
        
        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
        transport.assert_request("eth_newBlockFilter", &[]);
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn blocks_filter_stream() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        transport.add_response(Value::Array(vec![Value::String(
            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
        )]));
        transport.add_response(Value::Array(vec![
            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000457"#.into()),
            Value::String(r#"0x0000000000000000000000000000000000000000000000000000000000000458"#.into()),
        ]));
        transport.add_response(Value::Array(vec![Value::String(
            r#"0x0000000000000000000000000000000000000000000000000000000000000459"#.into(),
        )]));
        let result: Vec<_> = {
            let eth = EthFilter::new(&transport);
            
            let filter = futures::executor::block_on(eth.create_blocks_filter()).unwrap();
            futures::executor::block_on_stream(filter.stream(Duration::from_secs(0)).boxed_local())
                .take(4)
                .collect()
        };
        
        assert_eq!(
            result,
            [0x456, 0x457, 0x458, 0x459]
                .iter()
                .copied()
                .map(H256::from_low_u64_be)
                .map(Ok)
                .collect::<Vec<_>>()
        );
        transport.assert_request("eth_newBlockFilter", &[]);
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
    }
    #[test]
    fn pending_transactions_filter() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        {
            let eth = EthFilter::new(&transport);
            
            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
        };
        
        transport.assert_request("eth_newPendingTransactionFilter", &[]);
        transport.assert_no_more_requests();
    }
    #[test]
    fn create_pending_transactions_filter_poll() {
        
        let mut transport = TestTransport::default();
        transport.set_response(Value::String("0x123".into()));
        transport.add_response(Value::Array(vec![Value::String(
            r#"0x0000000000000000000000000000000000000000000000000000000000000456"#.into(),
        )]));
        let result = {
            let eth = EthFilter::new(&transport);
            
            let filter = futures::executor::block_on(eth.create_pending_transactions_filter()).unwrap();
            assert_eq!(filter.id, "0x123".to_owned());
            futures::executor::block_on(filter.poll())
        };
        
        assert_eq!(result, Ok(Some(vec![H256::from_low_u64_be(0x456)])));
        transport.assert_request("eth_newPendingTransactionFilter", &[]);
        transport.assert_request("eth_getFilterChanges", &[r#""0x123""#.into()]);
        transport.assert_no_more_requests();
    }
}