1use super::Blob;
12use anyhow::Context as _;
13use async_trait::async_trait;
14use http::uri::Scheme;
15use http_body_util::BodyExt;
16use http_body_util::Empty;
17use hyper::Request;
18use hyper::StatusCode;
19use hyper::Uri;
20use hyper_tls::HttpsConnector;
21use hyper_util::client::legacy::Client;
22use hyper_util::client::legacy::connect::HttpConnector;
23use hyper_util::rt::TokioExecutor;
24use inspect::Inspect;
25use once_cell::sync::OnceCell;
26use std::fmt::Debug;
27use std::io;
28
29#[derive(Debug, Inspect)]
31pub struct HttpBlob {
32 #[inspect(skip)]
33 client: Client<HttpsConnector<HttpConnector>, Empty<&'static [u8]>>,
34 #[inspect(debug)]
35 version: http::Version,
36 #[inspect(display)]
37 uri: Uri,
38 len: u64,
39 #[inspect(skip)]
40 tokio_handle: tokio::runtime::Handle,
41}
42
43static TOKIO_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
44
45impl HttpBlob {
46 pub async fn new(url: &str) -> anyhow::Result<Self> {
48 let mut uri: Uri = url.parse()?;
49
50 let connector = HttpsConnector::new();
51 let builder = Client::builder(TokioExecutor::new());
52 let client = builder.build(connector);
53
54 let handle = TOKIO_RUNTIME
55 .get_or_try_init(tokio::runtime::Runtime::new)
56 .context("failed to initialize tokio")?
57 .handle()
58 .clone();
59
60 let mut redirect_count = 0;
61 let response = loop {
62 if redirect_count > 5 {
63 anyhow::bail!("too many redirects");
64 }
65
66 let response = handle
67 .spawn(
68 client.request(
69 Request::builder()
70 .uri(&uri)
71 .method("HEAD")
72 .body(Empty::new())
73 .unwrap(),
74 ),
75 )
76 .await
77 .unwrap()
78 .context("failed to query blob size")?;
79
80 let next_uri: Uri = match response.status() {
81 StatusCode::OK => break response,
82 StatusCode::MOVED_PERMANENTLY
83 | StatusCode::FOUND
84 | StatusCode::TEMPORARY_REDIRECT
85 | StatusCode::PERMANENT_REDIRECT => response
86 .headers()
87 .get("Location")
88 .context("missing redirect URL")?
89 .to_str()
90 .context("couldn't parse redirect URL")?
91 .parse()
92 .context("couldn't parse redirect URL")?,
93 status => {
94 anyhow::bail!("failed to query blob size: {status}");
95 }
96 };
97
98 if uri.scheme() == Some(&Scheme::HTTPS) && next_uri.scheme() != Some(&Scheme::HTTPS) {
99 anyhow::bail!("https redirected to http");
100 }
101
102 uri = next_uri;
103 redirect_count += 1;
104 };
105
106 let len = response
107 .headers()
108 .get("Content-Length")
109 .context("missing blob length")?
110 .to_str()
111 .context("couldn't parse blob length")?
112 .parse()
113 .context("couldn't parse blob length")?;
114
115 let version = response.version();
116
117 Ok(Self {
118 client,
119 version,
120 uri,
121 len,
122 tokio_handle: handle,
123 })
124 }
125}
126
127#[async_trait]
128impl Blob for HttpBlob {
129 async fn read(&self, mut buf: &mut [u8], offset: u64) -> io::Result<()> {
130 let mut response = self
131 .tokio_handle
132 .spawn(
133 self.client.request(
134 Request::builder()
135 .uri(&self.uri)
136 .header(
137 hyper::header::RANGE,
138 format!("bytes={}-{}", offset, offset + buf.len() as u64 - 1,),
139 )
140 .body(Empty::new())
141 .unwrap(),
142 ),
143 )
144 .await
145 .unwrap()
146 .map_err(io::Error::other)?;
147
148 if !response.status().is_success() {
149 return Err(io::Error::other(response.status().to_string()));
150 }
151
152 while let Some(frame) = response.body_mut().frame().await {
153 let frame = frame.map_err(io::Error::other)?;
154 if let Some(data) = frame.data_ref() {
155 let len = data.len();
156 if len > buf.len() {
157 return Err(io::Error::other("server did not respect range query"));
158 }
159 let (this, rest) = buf.split_at_mut(len);
160 this.copy_from_slice(data);
161 buf = rest;
162 }
163 }
164
165 if !buf.is_empty() {
166 return Err(io::ErrorKind::UnexpectedEof.into());
167 }
168
169 Ok(())
170 }
171
172 fn len(&self) -> u64 {
173 self.len
174 }
175}