jsonrpc/
simple_tcp.rs

1// SPDX-License-Identifier: CC0-1.0
2
3//! This module implements a synchronous transport over a raw [`std::net::TcpListener`].
4//! Note that it does not handle TCP over Unix Domain Sockets, see `simple_uds` for this.
5
6use std::{error, fmt, io, net, time};
7
8use crate::client::Transport;
9use crate::{Request, Response};
10
11#[derive(Debug, Clone)]
12/// Simple synchronous TCP transport.
13pub struct TcpTransport {
14    /// The internet socket address to connect to.
15    pub addr: net::SocketAddr,
16    /// The read and write timeout to use for this connection.
17    pub timeout: Option<time::Duration>,
18}
19
20impl TcpTransport {
21    /// Creates a new `TcpTransport` without timeouts.
22    pub fn new(addr: net::SocketAddr) -> TcpTransport {
23        TcpTransport {
24            addr,
25            timeout: None,
26        }
27    }
28
29    fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
30    where
31        R: for<'a> serde::de::Deserialize<'a>,
32    {
33        let mut sock = net::TcpStream::connect(self.addr)?;
34        sock.set_read_timeout(self.timeout)?;
35        sock.set_write_timeout(self.timeout)?;
36
37        serde_json::to_writer(&mut sock, &req)?;
38
39        // NOTE: we don't check the id there, so it *must* be synchronous
40        let resp: R = serde_json::Deserializer::from_reader(&mut sock)
41            .into_iter()
42            .next()
43            .ok_or(Error::Timeout)??;
44        Ok(resp)
45    }
46}
47
48impl Transport for TcpTransport {
49    fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
50        Ok(self.request(req)?)
51    }
52
53    fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
54        Ok(self.request(reqs)?)
55    }
56
57    fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
58        write!(f, "{}", self.addr)
59    }
60}
61
62/// Error that can occur while using the TCP transport.
63#[derive(Debug)]
64pub enum Error {
65    /// An error occurred on the socket layer.
66    SocketError(io::Error),
67    /// We didn't receive a complete response till the deadline ran out.
68    Timeout,
69    /// JSON parsing error.
70    Json(serde_json::Error),
71}
72
73impl fmt::Display for Error {
74    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
75        use Error::*;
76
77        match *self {
78            SocketError(ref e) => write!(f, "couldn't connect to host: {}", e),
79            Timeout => f.write_str("didn't receive response data in time, timed out."),
80            Json(ref e) => write!(f, "JSON error: {}", e),
81        }
82    }
83}
84
85impl error::Error for Error {
86    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
87        use self::Error::*;
88
89        match *self {
90            SocketError(ref e) => Some(e),
91            Timeout => None,
92            Json(ref e) => Some(e),
93        }
94    }
95}
96
97impl From<io::Error> for Error {
98    fn from(e: io::Error) -> Self {
99        Error::SocketError(e)
100    }
101}
102
103impl From<serde_json::Error> for Error {
104    fn from(e: serde_json::Error) -> Self {
105        Error::Json(e)
106    }
107}
108
109impl From<Error> for crate::Error {
110    fn from(e: Error) -> crate::Error {
111        match e {
112            Error::Json(e) => crate::Error::Json(e),
113            e => crate::Error::Transport(Box::new(e)),
114        }
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use std::{
121        io::{Read, Write},
122        thread,
123    };
124
125    use super::*;
126    use crate::Client;
127
128    // Test a dummy request / response over a raw TCP transport
129    #[test]
130    fn sanity_check_tcp_transport() {
131        let addr: net::SocketAddr =
132            net::SocketAddrV4::new(net::Ipv4Addr::new(127, 0, 0, 1), 0).into();
133        let server = net::TcpListener::bind(addr).unwrap();
134        let addr = server.local_addr().unwrap();
135        let dummy_req = Request {
136            method: "arandommethod",
137            params: None,
138            id: serde_json::Value::Number(4242242.into()),
139            jsonrpc: Some("2.0"),
140        };
141        let dummy_req_ser = serde_json::to_vec(&dummy_req).unwrap();
142        let dummy_resp = Response {
143            result: None,
144            error: None,
145            id: serde_json::Value::Number(4242242.into()),
146            jsonrpc: Some("2.0".into()),
147        };
148        let dummy_resp_ser = serde_json::to_vec(&dummy_resp).unwrap();
149
150        let client_thread = thread::spawn(move || {
151            let transport = TcpTransport {
152                addr,
153                timeout: Some(time::Duration::from_secs(5)),
154            };
155            let client = Client::with_transport(transport);
156
157            client.send_request(dummy_req.clone()).unwrap()
158        });
159
160        let (mut stream, _) = server.accept().unwrap();
161        stream.set_read_timeout(Some(time::Duration::from_secs(5))).unwrap();
162        let mut recv_req = vec![0; dummy_req_ser.len()];
163        let mut read = 0;
164        while read < dummy_req_ser.len() {
165            read += stream.read(&mut recv_req[read..]).unwrap();
166        }
167        assert_eq!(recv_req, dummy_req_ser);
168
169        stream.write_all(&dummy_resp_ser).unwrap();
170        stream.flush().unwrap();
171        let recv_resp = client_thread.join().unwrap();
172        assert_eq!(serde_json::to_vec(&recv_resp).unwrap(), dummy_resp_ser);
173    }
174}