1use std::{error, fmt, io, net, time};
7
8use crate::client::Transport;
9use crate::{Request, Response};
10
11#[derive(Debug, Clone)]
12pub struct TcpTransport {
14 pub addr: net::SocketAddr,
16 pub timeout: Option<time::Duration>,
18}
19
20impl TcpTransport {
21 pub fn new(addr: net::SocketAddr) -> TcpTransport {
23 TcpTransport {
24 addr,
25 timeout: None,
26 }
27 }
28
29 fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
30 where
31 R: for<'a> serde::de::Deserialize<'a>,
32 {
33 let mut sock = net::TcpStream::connect(self.addr)?;
34 sock.set_read_timeout(self.timeout)?;
35 sock.set_write_timeout(self.timeout)?;
36
37 serde_json::to_writer(&mut sock, &req)?;
38
39 let resp: R = serde_json::Deserializer::from_reader(&mut sock)
41 .into_iter()
42 .next()
43 .ok_or(Error::Timeout)??;
44 Ok(resp)
45 }
46}
47
48impl Transport for TcpTransport {
49 fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
50 Ok(self.request(req)?)
51 }
52
53 fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
54 Ok(self.request(reqs)?)
55 }
56
57 fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
58 write!(f, "{}", self.addr)
59 }
60}
61
62#[derive(Debug)]
64pub enum Error {
65 SocketError(io::Error),
67 Timeout,
69 Json(serde_json::Error),
71}
72
73impl fmt::Display for Error {
74 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
75 use Error::*;
76
77 match *self {
78 SocketError(ref e) => write!(f, "couldn't connect to host: {}", e),
79 Timeout => f.write_str("didn't receive response data in time, timed out."),
80 Json(ref e) => write!(f, "JSON error: {}", e),
81 }
82 }
83}
84
85impl error::Error for Error {
86 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
87 use self::Error::*;
88
89 match *self {
90 SocketError(ref e) => Some(e),
91 Timeout => None,
92 Json(ref e) => Some(e),
93 }
94 }
95}
96
97impl From<io::Error> for Error {
98 fn from(e: io::Error) -> Self {
99 Error::SocketError(e)
100 }
101}
102
103impl From<serde_json::Error> for Error {
104 fn from(e: serde_json::Error) -> Self {
105 Error::Json(e)
106 }
107}
108
109impl From<Error> for crate::Error {
110 fn from(e: Error) -> crate::Error {
111 match e {
112 Error::Json(e) => crate::Error::Json(e),
113 e => crate::Error::Transport(Box::new(e)),
114 }
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use std::{
121 io::{Read, Write},
122 thread,
123 };
124
125 use super::*;
126 use crate::Client;
127
128 #[test]
130 fn sanity_check_tcp_transport() {
131 let addr: net::SocketAddr =
132 net::SocketAddrV4::new(net::Ipv4Addr::new(127, 0, 0, 1), 0).into();
133 let server = net::TcpListener::bind(addr).unwrap();
134 let addr = server.local_addr().unwrap();
135 let dummy_req = Request {
136 method: "arandommethod",
137 params: None,
138 id: serde_json::Value::Number(4242242.into()),
139 jsonrpc: Some("2.0"),
140 };
141 let dummy_req_ser = serde_json::to_vec(&dummy_req).unwrap();
142 let dummy_resp = Response {
143 result: None,
144 error: None,
145 id: serde_json::Value::Number(4242242.into()),
146 jsonrpc: Some("2.0".into()),
147 };
148 let dummy_resp_ser = serde_json::to_vec(&dummy_resp).unwrap();
149
150 let client_thread = thread::spawn(move || {
151 let transport = TcpTransport {
152 addr,
153 timeout: Some(time::Duration::from_secs(5)),
154 };
155 let client = Client::with_transport(transport);
156
157 client.send_request(dummy_req.clone()).unwrap()
158 });
159
160 let (mut stream, _) = server.accept().unwrap();
161 stream.set_read_timeout(Some(time::Duration::from_secs(5))).unwrap();
162 let mut recv_req = vec![0; dummy_req_ser.len()];
163 let mut read = 0;
164 while read < dummy_req_ser.len() {
165 read += stream.read(&mut recv_req[read..]).unwrap();
166 }
167 assert_eq!(recv_req, dummy_req_ser);
168
169 stream.write_all(&dummy_resp_ser).unwrap();
170 stream.flush().unwrap();
171 let recv_resp = client_thread.join().unwrap();
172 assert_eq!(serde_json::to_vec(&recv_resp).unwrap(), dummy_resp_ser);
173 }
174}