1#[cfg(feature = "proxy")]
8use socks::Socks5Stream;
9use std::io::{BufRead, BufReader, Read, Write};
10#[cfg(not(jsonrpc_fuzz))]
11use std::net::TcpStream;
12use std::net::{SocketAddr, ToSocketAddrs};
13use std::sync::{Arc, Mutex, MutexGuard};
14use std::time::Duration;
15use std::{error, fmt, io, net, num};
16
17use crate::client::Transport;
18use crate::http::DEFAULT_PORT;
19#[cfg(feature = "proxy")]
20use crate::http::DEFAULT_PROXY_PORT;
21use crate::{Request, Response};
22
23const FINAL_RESP_ALLOC: u64 = 1024 * 1024 * 1024;
25
26#[cfg(not(jsonrpc_fuzz))]
27const DEFAULT_TIMEOUT: Duration = Duration::from_secs(15);
28
29#[cfg(jsonrpc_fuzz)]
30const DEFAULT_TIMEOUT: Duration = Duration::from_millis(1);
31
32#[derive(Clone, Debug)]
35pub struct SimpleHttpTransport {
36 addr: net::SocketAddr,
37 path: String,
38 timeout: Duration,
39 basic_auth: Option<String>,
41 #[cfg(feature = "proxy")]
42 proxy_addr: net::SocketAddr,
43 #[cfg(feature = "proxy")]
44 proxy_auth: Option<(String, String)>,
45 sock: Arc<Mutex<Option<BufReader<TcpStream>>>>,
46}
47
48impl Default for SimpleHttpTransport {
49 fn default() -> Self {
50 SimpleHttpTransport {
51 addr: net::SocketAddr::new(
52 net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)),
53 DEFAULT_PORT,
54 ),
55 path: "/".to_owned(),
56 timeout: DEFAULT_TIMEOUT,
57 basic_auth: None,
58 #[cfg(feature = "proxy")]
59 proxy_addr: net::SocketAddr::new(
60 net::IpAddr::V4(net::Ipv4Addr::new(127, 0, 0, 1)),
61 DEFAULT_PROXY_PORT,
62 ),
63 #[cfg(feature = "proxy")]
64 proxy_auth: None,
65 sock: Arc::new(Mutex::new(None)),
66 }
67 }
68}
69
70impl SimpleHttpTransport {
71 pub fn new() -> Self {
73 SimpleHttpTransport::default()
74 }
75
76 pub fn builder() -> Builder {
78 Builder::new()
79 }
80
81 pub fn set_url(&mut self, url: &str) -> Result<(), Error> {
83 let url = check_url(url)?;
84 self.addr = url.0;
85 self.path = url.1;
86 Ok(())
87 }
88
89 pub fn set_url_path(&mut self, path: String) {
91 self.path = path;
92 }
93
94 fn request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
95 where
96 R: for<'a> serde::de::Deserialize<'a>,
97 {
98 match self.try_request(req) {
99 Ok(response) => Ok(response),
100 Err(err) => {
101 *self.sock.lock().expect("poisoned mutex") = None;
103 Err(err)
104 }
105 }
106 }
107
108 #[cfg(feature = "proxy")]
109 fn fresh_socket(&self) -> Result<TcpStream, Error> {
110 let stream = if let Some((username, password)) = &self.proxy_auth {
111 Socks5Stream::connect_with_password(
112 self.proxy_addr,
113 self.addr,
114 username.as_str(),
115 password.as_str(),
116 )?
117 } else {
118 Socks5Stream::connect(self.proxy_addr, self.addr)?
119 };
120 Ok(stream.into_inner())
121 }
122
123 #[cfg(not(feature = "proxy"))]
124 fn fresh_socket(&self) -> Result<TcpStream, Error> {
125 let stream = TcpStream::connect_timeout(&self.addr, self.timeout)?;
126 stream.set_read_timeout(Some(self.timeout))?;
127 stream.set_write_timeout(Some(self.timeout))?;
128 Ok(stream)
129 }
130
131 fn try_request<R>(&self, req: impl serde::Serialize) -> Result<R, Error>
132 where
133 R: for<'a> serde::de::Deserialize<'a>,
134 {
135 let mut sock_lock: MutexGuard<Option<_>> = self.sock.lock().expect("poisoned mutex");
137 if sock_lock.is_none() {
138 *sock_lock = Some(BufReader::new(self.fresh_socket()?));
139 };
140 let sock: &mut BufReader<_> = sock_lock.as_mut().unwrap();
143
144 let body = serde_json::to_vec(&req)?;
146
147 let mut request_bytes = Vec::new();
148
149 request_bytes.write_all(b"POST ")?;
150 request_bytes.write_all(self.path.as_bytes())?;
151 request_bytes.write_all(b" HTTP/1.1\r\n")?;
152 request_bytes.write_all(b"host: ")?;
154 request_bytes.write_all(self.addr.to_string().as_bytes())?;
155 request_bytes.write_all(b"\r\n")?;
156 request_bytes.write_all(b"Content-Type: application/json\r\n")?;
157 request_bytes.write_all(b"Content-Length: ")?;
158 request_bytes.write_all(body.len().to_string().as_bytes())?;
159 request_bytes.write_all(b"\r\n")?;
160 if let Some(ref auth) = self.basic_auth {
161 request_bytes.write_all(b"Authorization: ")?;
162 request_bytes.write_all(auth.as_ref())?;
163 request_bytes.write_all(b"\r\n")?;
164 }
165 request_bytes.write_all(b"\r\n")?;
167 request_bytes.write_all(&body)?;
168
169 let write_success = sock.get_mut().write_all(request_bytes.as_slice()).is_ok()
171 && sock.get_mut().flush().is_ok();
172
173 if !write_success {
175 *sock.get_mut() = self.fresh_socket()?;
176 sock.get_mut().write_all(request_bytes.as_slice())?;
177 sock.get_mut().flush()?;
178 }
179
180 let mut header_buf = String::new();
182 let read_success = sock.read_line(&mut header_buf).is_ok();
183
184 if (!read_success || header_buf.is_empty()) && write_success {
187 *sock.get_mut() = self.fresh_socket()?;
188 sock.get_mut().write_all(request_bytes.as_slice())?;
189 sock.get_mut().flush()?;
190
191 sock.read_line(&mut header_buf)?;
192 }
193
194 if header_buf.len() < 12 {
195 return Err(Error::HttpResponseTooShort {
196 actual: header_buf.len(),
197 needed: 12,
198 });
199 }
200 if !header_buf.as_bytes()[..12].is_ascii() {
201 return Err(Error::HttpResponseNonAsciiHello(header_buf.as_bytes()[..12].to_vec()));
202 }
203 if !header_buf.starts_with("HTTP/1.1 ") {
204 return Err(Error::HttpResponseBadHello {
205 actual: header_buf[0..9].into(),
206 expected: "HTTP/1.1 ".into(),
207 });
208 }
209 let response_code = match header_buf[9..12].parse::<u16>() {
210 Ok(n) => n,
211 Err(e) => return Err(Error::HttpResponseBadStatus(header_buf[9..12].into(), e)),
212 };
213
214 let mut content_length = None;
216 loop {
217 header_buf.clear();
218 sock.read_line(&mut header_buf)?;
219 if header_buf == "\r\n" {
220 break;
221 }
222 header_buf.make_ascii_lowercase();
223
224 const CONTENT_LENGTH: &str = "content-length: ";
225 if let Some(s) = header_buf.strip_prefix(CONTENT_LENGTH) {
226 content_length = Some(
227 s.trim()
228 .parse::<u64>()
229 .map_err(|e| Error::HttpResponseBadContentLength(s.into(), e))?,
230 );
231 }
232
233 const TRANSFER_ENCODING: &str = "transfer-encoding: ";
234 if let Some(s) = header_buf.strip_prefix(TRANSFER_ENCODING) {
235 const CHUNKED: &str = "chunked";
236 if s.trim() == CHUNKED {
237 return Err(Error::HttpResponseChunked);
238 }
239 }
240 }
241
242 if response_code == 401 {
243 return Err(Error::HttpErrorCode(response_code));
245 }
246
247 let mut reader = match content_length {
251 None => sock.take(FINAL_RESP_ALLOC),
252 Some(n) if n > FINAL_RESP_ALLOC => {
253 return Err(Error::HttpResponseContentLengthTooLarge {
254 length: n,
255 max: FINAL_RESP_ALLOC,
256 });
257 }
258 Some(n) => sock.take(n),
259 };
260
261 match serde_json::from_reader(&mut reader) {
265 Ok(s) => {
266 if content_length.is_some() {
267 reader.bytes().count(); }
269 Ok(s)
270 }
271 Err(e) => {
272 if response_code != 200 {
274 Err(Error::HttpErrorCode(response_code))
275 } else {
276 Err(e.into())
278 }
279 }
280 }
281 }
282}
283
284fn check_url(url: &str) -> Result<(SocketAddr, String), Error> {
287 let mut fallback_port = DEFAULT_PORT;
290
291 let after_scheme = {
294 let mut split = url.splitn(2, "://");
295 let s = split.next().unwrap();
296 match split.next() {
297 None => s, Some(after) => {
299 if s == "http" {
301 fallback_port = 80;
302 } else if s == "https" {
303 fallback_port = 443;
304 } else {
305 return Err(Error::url(url, "scheme should be http or https"));
306 }
307 after
308 }
309 }
310 };
311 let (before_path, path) = {
313 if let Some(slash) = after_scheme.find('/') {
314 (&after_scheme[0..slash], &after_scheme[slash..])
315 } else {
316 (after_scheme, "/")
317 }
318 };
319 let after_auth = {
321 let mut split = before_path.splitn(2, '@');
322 let s = split.next().unwrap();
323 split.next().unwrap_or(s)
324 };
325
326 let mut addr = match after_auth.to_socket_addrs() {
330 Ok(addr) => addr,
331 Err(_) => {
332 format!("{}:{}", after_auth, fallback_port).to_socket_addrs()?
334 }
335 };
336
337 match addr.next() {
338 Some(a) => Ok((a, path.to_owned())),
339 None => Err(Error::url(url, "invalid hostname: error extracting socket address")),
340 }
341}
342
343impl Transport for SimpleHttpTransport {
344 fn send_request(&self, req: Request) -> Result<Response, crate::Error> {
345 Ok(self.request(req)?)
346 }
347
348 fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, crate::Error> {
349 Ok(self.request(reqs)?)
350 }
351
352 fn fmt_target(&self, f: &mut fmt::Formatter) -> fmt::Result {
353 write!(f, "http://{}:{}{}", self.addr.ip(), self.addr.port(), self.path)
354 }
355}
356
357#[derive(Clone, Debug)]
359pub struct Builder {
360 tp: SimpleHttpTransport,
361}
362
363impl Builder {
364 pub fn new() -> Builder {
366 Builder {
367 tp: SimpleHttpTransport::new(),
368 }
369 }
370
371 pub fn timeout(mut self, timeout: Duration) -> Self {
373 self.tp.timeout = timeout;
374 self
375 }
376
377 pub fn url(mut self, url: &str) -> Result<Self, Error> {
379 self.tp.set_url(url)?;
380 Ok(self)
381 }
382
383 pub fn auth<S: AsRef<str>>(mut self, user: S, pass: Option<S>) -> Self {
385 let mut auth = user.as_ref().to_owned();
386 auth.push(':');
387 if let Some(ref pass) = pass {
388 auth.push_str(pass.as_ref());
389 }
390 self.tp.basic_auth = Some(format!("Basic {}", &base64::encode(auth.as_bytes())));
391 self
392 }
393
394 pub fn cookie_auth<S: AsRef<str>>(mut self, cookie: S) -> Self {
396 self.tp.basic_auth = Some(format!("Basic {}", &base64::encode(cookie.as_ref().as_bytes())));
397 self
398 }
399
400 #[cfg(feature = "proxy")]
402 pub fn proxy_addr<S: AsRef<str>>(mut self, proxy_addr: S) -> Result<Self, Error> {
403 self.tp.proxy_addr = check_url(proxy_addr.as_ref())?.0;
405 Ok(self)
406 }
407
408 #[cfg(feature = "proxy")]
410 pub fn proxy_auth<S: AsRef<str>>(mut self, user: S, pass: S) -> Self {
411 self.tp.proxy_auth =
412 Some((user, pass)).map(|(u, p)| (u.as_ref().to_string(), p.as_ref().to_string()));
413 self
414 }
415
416 pub fn build(self) -> SimpleHttpTransport {
418 self.tp
419 }
420}
421
422impl Default for Builder {
423 fn default() -> Self {
424 Builder::new()
425 }
426}
427
428impl crate::Client {
429 pub fn simple_http(
431 url: &str,
432 user: Option<String>,
433 pass: Option<String>,
434 ) -> Result<crate::Client, Error> {
435 let mut builder = Builder::new().url(url)?;
436 if let Some(user) = user {
437 builder = builder.auth(user, pass);
438 }
439 Ok(crate::Client::with_transport(builder.build()))
440 }
441
442 #[cfg(feature = "proxy")]
444 pub fn http_proxy(
445 url: &str,
446 user: Option<String>,
447 pass: Option<String>,
448 proxy_addr: &str,
449 proxy_auth: Option<(&str, &str)>,
450 ) -> Result<crate::Client, Error> {
451 let mut builder = Builder::new().url(url)?;
452 if let Some(user) = user {
453 builder = builder.auth(user, pass);
454 }
455 builder = builder.proxy_addr(proxy_addr)?;
456 if let Some((user, pass)) = proxy_auth {
457 builder = builder.proxy_auth(user, pass);
458 }
459 let tp = builder.build();
460 Ok(crate::Client::with_transport(tp))
461 }
462}
463
464#[derive(Debug)]
466pub enum Error {
467 InvalidUrl {
469 url: String,
471 reason: &'static str,
473 },
474 SocketError(io::Error),
476 HttpResponseTooShort {
478 actual: usize,
480 needed: usize,
482 },
483 HttpResponseNonAsciiHello(Vec<u8>),
485 HttpResponseBadHello {
487 actual: String,
489 expected: String,
491 },
492 HttpResponseBadStatus(String, num::ParseIntError),
494 HttpResponseBadContentLength(String, num::ParseIntError),
496 HttpResponseContentLengthTooLarge {
498 length: u64,
500 max: u64,
502 },
503 HttpResponseChunked,
505 HttpErrorCode(u16),
507 IncompleteResponse {
509 content_length: u64,
511 n_read: u64,
513 },
514 Json(serde_json::Error),
516}
517
518impl Error {
519 fn url<U: Into<String>>(url: U, reason: &'static str) -> Error {
521 Error::InvalidUrl {
522 url: url.into(),
523 reason,
524 }
525 }
526}
527
528impl fmt::Display for Error {
529 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
530 use Error::*;
531
532 match *self {
533 InvalidUrl {
534 ref url,
535 ref reason,
536 } => write!(f, "invalid URL '{}': {}", url, reason),
537 SocketError(ref e) => write!(f, "Couldn't connect to host: {}", e),
538 HttpResponseTooShort {
539 ref actual,
540 ref needed,
541 } => {
542 write!(f, "HTTP response too short: length {}, needed {}.", actual, needed)
543 }
544 HttpResponseNonAsciiHello(ref bytes) => {
545 write!(f, "HTTP response started with non-ASCII {:?}", bytes)
546 }
547 HttpResponseBadHello {
548 ref actual,
549 ref expected,
550 } => {
551 write!(f, "HTTP response started with `{}`; expected `{}`.", actual, expected)
552 }
553 HttpResponseBadStatus(ref status, ref err) => {
554 write!(f, "HTTP response had bad status code `{}`: {}.", status, err)
555 }
556 HttpResponseBadContentLength(ref len, ref err) => {
557 write!(f, "HTTP response had bad content length `{}`: {}.", len, err)
558 }
559 HttpResponseContentLengthTooLarge {
560 length,
561 max,
562 } => {
563 write!(f, "HTTP response content length {} exceeds our max {}.", length, max)
564 }
565 HttpErrorCode(c) => write!(f, "unexpected HTTP code: {}", c),
566 IncompleteResponse {
567 content_length,
568 n_read,
569 } => {
570 write!(
571 f,
572 "read {} bytes but HTTP response content-length header was {}.",
573 n_read, content_length
574 )
575 }
576 Json(ref e) => write!(f, "JSON error: {}", e),
577 HttpResponseChunked => {
578 write!(f, "The server replied with a chunked response which is not supported")
579 }
580 }
581 }
582}
583
584impl error::Error for Error {
585 fn source(&self) -> Option<&(dyn error::Error + 'static)> {
586 use self::Error::*;
587
588 match *self {
589 InvalidUrl {
590 ..
591 }
592 | HttpResponseTooShort {
593 ..
594 }
595 | HttpResponseNonAsciiHello(..)
596 | HttpResponseBadHello {
597 ..
598 }
599 | HttpResponseBadStatus(..)
600 | HttpResponseBadContentLength(..)
601 | HttpResponseContentLengthTooLarge {
602 ..
603 }
604 | HttpErrorCode(_)
605 | IncompleteResponse {
606 ..
607 }
608 | HttpResponseChunked => None,
609 SocketError(ref e) => Some(e),
610 Json(ref e) => Some(e),
611 }
612 }
613}
614
615impl From<io::Error> for Error {
616 fn from(e: io::Error) -> Self {
617 Error::SocketError(e)
618 }
619}
620
621impl From<serde_json::Error> for Error {
622 fn from(e: serde_json::Error) -> Self {
623 Error::Json(e)
624 }
625}
626
627impl From<Error> for crate::Error {
628 fn from(e: Error) -> crate::Error {
629 match e {
630 Error::Json(e) => crate::Error::Json(e),
631 e => crate::Error::Transport(Box::new(e)),
632 }
633 }
634}
635
636#[cfg(jsonrpc_fuzz)]
638pub static FUZZ_TCP_SOCK: Mutex<Option<io::Cursor<Vec<u8>>>> = Mutex::new(None);
639
640#[cfg(jsonrpc_fuzz)]
641#[derive(Clone, Debug)]
642struct TcpStream;
643
644#[cfg(jsonrpc_fuzz)]
645mod impls {
646 use super::*;
647 impl Read for TcpStream {
648 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
649 match *FUZZ_TCP_SOCK.lock().unwrap() {
650 Some(ref mut cursor) => io::Read::read(cursor, buf),
651 None => Ok(0),
652 }
653 }
654 }
655 impl Write for TcpStream {
656 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
657 io::sink().write(buf)
658 }
659 fn flush(&mut self) -> io::Result<()> {
660 Ok(())
661 }
662 }
663
664 impl TcpStream {
665 pub fn connect_timeout(_: &SocketAddr, _: Duration) -> io::Result<Self> {
666 Ok(TcpStream)
667 }
668 pub fn set_read_timeout(&self, _: Option<Duration>) -> io::Result<()> {
669 Ok(())
670 }
671 pub fn set_write_timeout(&self, _: Option<Duration>) -> io::Result<()> {
672 Ok(())
673 }
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use std::net;
680 #[cfg(feature = "proxy")]
681 use std::str::FromStr;
682
683 use super::*;
684 use crate::Client;
685
686 #[test]
687 fn test_urls() {
688 let addr: net::SocketAddr = ("localhost", 22).to_socket_addrs().unwrap().next().unwrap();
689 let urls = [
690 "localhost:22",
691 "http://localhost:22/",
692 "https://localhost:22/walletname/stuff?it=working",
693 "http://me:weak@localhost:22/wallet",
694 ];
695 for u in &urls {
696 let tp = Builder::new().url(u).unwrap().build();
697 assert_eq!(tp.addr, addr);
698 }
699
700 let addr: net::SocketAddr = ("localhost", 80).to_socket_addrs().unwrap().next().unwrap();
702 let tp = Builder::new().url("http://localhost/").unwrap().build();
703 assert_eq!(tp.addr, addr);
704 let addr: net::SocketAddr = ("localhost", 443).to_socket_addrs().unwrap().next().unwrap();
705 let tp = Builder::new().url("https://localhost/").unwrap().build();
706 assert_eq!(tp.addr, addr);
707 let addr: net::SocketAddr =
708 ("localhost", super::DEFAULT_PORT).to_socket_addrs().unwrap().next().unwrap();
709 let tp = Builder::new().url("localhost").unwrap().build();
710 assert_eq!(tp.addr, addr);
711
712 let valid_urls = [
713 "localhost",
714 "127.0.0.1:8080",
715 "http://127.0.0.1:8080/",
716 "http://127.0.0.1:8080/rpc/test",
717 "https://127.0.0.1/rpc/test",
718 "http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:8300",
719 "http://[2001:0db8:85a3:0000:0000:8a2e:0370:7334]",
720 ];
721 for u in &valid_urls {
722 let (addr, path) = check_url(u).unwrap();
723 let builder = Builder::new().url(u).unwrap_or_else(|_| panic!("error for: {}", u));
724 assert_eq!(builder.tp.addr, addr);
725 assert_eq!(builder.tp.path, path);
726 assert_eq!(builder.tp.timeout, DEFAULT_TIMEOUT);
727 assert_eq!(builder.tp.basic_auth, None);
728 #[cfg(feature = "proxy")]
729 assert_eq!(builder.tp.proxy_addr, SocketAddr::from_str("127.0.0.1:9050").unwrap());
730 }
731
732 let invalid_urls = [
733 "127.0.0.1.0:8080",
734 "httpx://127.0.0.1:8080/",
735 "ftp://127.0.0.1:8080/rpc/test",
736 "http://127.0.0./rpc/test",
737 ];
739 for u in &invalid_urls {
740 if let Ok(b) = Builder::new().url(u) {
741 let tp = b.build();
742 panic!("expected error for url {}, got {:?}", u, tp);
743 }
744 }
745 }
746
747 #[test]
748 fn construct() {
749 let tp = Builder::new()
750 .timeout(Duration::from_millis(100))
751 .url("localhost:22")
752 .unwrap()
753 .auth("user", None)
754 .build();
755 let _ = Client::with_transport(tp);
756
757 let _ = Client::simple_http("localhost:22", None, None).unwrap();
758 }
759
760 #[cfg(feature = "proxy")]
761 #[test]
762 fn construct_with_proxy() {
763 let tp = Builder::new()
764 .timeout(Duration::from_millis(100))
765 .url("localhost:22")
766 .unwrap()
767 .auth("user", None)
768 .proxy_addr("127.0.0.1:9050")
769 .unwrap()
770 .build();
771 let _ = Client::with_transport(tp);
772
773 let _ = Client::http_proxy(
774 "localhost:22",
775 None,
776 None,
777 "127.0.0.1:9050",
778 Some(("user", "password")),
779 )
780 .unwrap();
781 }
782
783 #[cfg(all(not(feature = "proxy"), not(jsonrpc_fuzz)))]
786 #[test]
787 fn request_to_closed_socket() {
788 use serde_json::{Number, Value};
789 use std::net::{Shutdown, TcpListener};
790 use std::sync::mpsc;
791 use std::thread;
792
793 let (tx, rx) = mpsc::sync_channel(1);
794
795 thread::spawn(move || {
796 let server = TcpListener::bind("localhost:0").expect("Binding a Tcp Listener");
797 tx.send(server.local_addr().unwrap().port()).unwrap();
798 for (request_id, stream) in server.incoming().enumerate() {
799 let mut stream = stream.unwrap();
800
801 let buf_reader = BufReader::new(&mut stream);
802
803 let _http_request: Vec<_> = buf_reader
804 .lines()
805 .map(|result| result.unwrap())
806 .take_while(|line| !line.is_empty())
807 .collect();
808
809 let response = Response {
810 result: None,
811 error: None,
812 id: Value::Number(Number::from(request_id)),
813 jsonrpc: Some(String::from("2.0")),
814 };
815 let response_str = serde_json::to_string(&response).unwrap();
816
817 stream.write_all(b"HTTP/1.1 200\r\n").unwrap();
818 stream.write_all(b"Content-Length: ").unwrap();
819 stream.write_all(response_str.len().to_string().as_bytes()).unwrap();
820 stream.write_all(b"\r\n").unwrap();
821 stream.write_all(b"\r\n").unwrap();
822 stream.write_all(response_str.as_bytes()).unwrap();
823 stream.flush().unwrap();
824
825 stream.shutdown(Shutdown::Both).unwrap();
826 }
827 });
828
829 thread::sleep(Duration::from_secs(1));
831
832 let port = rx.recv().unwrap();
833 let client =
834 Client::simple_http(format!("localhost:{}", port).as_str(), None, None).unwrap();
835 let request = client.build_request("test_request", None);
836 let result = client.send_request(request).unwrap();
837 assert_eq!(result.id, Value::Number(Number::from(0)));
838 thread::sleep(Duration::from_secs(1));
839 let request = client.build_request("test_request2", None);
840 let result2 = client.send_request(request)
841 .expect("This second request should not be an Err like `Err(Transport(HttpResponseTooShort { actual: 0, needed: 12 }))`");
842 assert_eq!(result2.id, Value::Number(Number::from(1)));
843 }
844}