disk_blob/blob/
http.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! HTTP blob implementation based on [`hyper`], [`tokio`], and
5//! [`hyper_tls`].
6//!
7//! In the future, it may better to use `pal_async` instead. This will require a
8//! new, unreleased version of `hyper`, and a bunch of infrastructure to support
9//! initiating TCP connections the way `hyper` expects.
10
11use super::Blob;
12use anyhow::Context as _;
13use async_trait::async_trait;
14use http::uri::Scheme;
15use http_body_util::BodyExt;
16use http_body_util::Empty;
17use hyper::Request;
18use hyper::StatusCode;
19use hyper::Uri;
20use hyper_tls::HttpsConnector;
21use hyper_util::client::legacy::Client;
22use hyper_util::client::legacy::connect::HttpConnector;
23use hyper_util::rt::TokioExecutor;
24use inspect::Inspect;
25use once_cell::sync::OnceCell;
26use std::fmt::Debug;
27use std::io;
28
29/// A blob backed by an HTTP/HTTPS connection.
30#[derive(Debug, Inspect)]
31pub struct HttpBlob {
32    #[inspect(skip)]
33    client: Client<HttpsConnector<HttpConnector>, Empty<&'static [u8]>>,
34    #[inspect(debug)]
35    version: http::Version,
36    #[inspect(display)]
37    uri: Uri,
38    len: u64,
39    #[inspect(skip)]
40    tokio_handle: tokio::runtime::Handle,
41}
42
43static TOKIO_RUNTIME: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
44
45impl HttpBlob {
46    /// Connects to `url` and returns an object to access it as a blob.
47    pub async fn new(url: &str) -> anyhow::Result<Self> {
48        let mut uri: Uri = url.parse()?;
49
50        let connector = HttpsConnector::new();
51        let builder = Client::builder(TokioExecutor::new());
52        let client = builder.build(connector);
53
54        let handle = TOKIO_RUNTIME
55            .get_or_try_init(tokio::runtime::Runtime::new)
56            .context("failed to initialize tokio")?
57            .handle()
58            .clone();
59
60        let mut redirect_count = 0;
61        let response = loop {
62            if redirect_count > 5 {
63                anyhow::bail!("too many redirects");
64            }
65
66            let response = handle
67                .spawn(
68                    client.request(
69                        Request::builder()
70                            .uri(&uri)
71                            .method("HEAD")
72                            .body(Empty::new())
73                            .unwrap(),
74                    ),
75                )
76                .await
77                .unwrap()
78                .context("failed to query blob size")?;
79
80            let next_uri: Uri = match response.status() {
81                StatusCode::OK => break response,
82                StatusCode::MOVED_PERMANENTLY
83                | StatusCode::FOUND
84                | StatusCode::TEMPORARY_REDIRECT
85                | StatusCode::PERMANENT_REDIRECT => response
86                    .headers()
87                    .get("Location")
88                    .context("missing redirect URL")?
89                    .to_str()
90                    .context("couldn't parse redirect URL")?
91                    .parse()
92                    .context("couldn't parse redirect URL")?,
93                status => {
94                    anyhow::bail!("failed to query blob size: {status}");
95                }
96            };
97
98            if uri.scheme() == Some(&Scheme::HTTPS) && next_uri.scheme() != Some(&Scheme::HTTPS) {
99                anyhow::bail!("https redirected to http");
100            }
101
102            uri = next_uri;
103            redirect_count += 1;
104        };
105
106        let len = response
107            .headers()
108            .get("Content-Length")
109            .context("missing blob length")?
110            .to_str()
111            .context("couldn't parse blob length")?
112            .parse()
113            .context("couldn't parse blob length")?;
114
115        let version = response.version();
116
117        Ok(Self {
118            client,
119            version,
120            uri,
121            len,
122            tokio_handle: handle,
123        })
124    }
125}
126
127#[async_trait]
128impl Blob for HttpBlob {
129    async fn read(&self, mut buf: &mut [u8], offset: u64) -> io::Result<()> {
130        let mut response = self
131            .tokio_handle
132            .spawn(
133                self.client.request(
134                    Request::builder()
135                        .uri(&self.uri)
136                        .header(
137                            hyper::header::RANGE,
138                            format!("bytes={}-{}", offset, offset + buf.len() as u64 - 1,),
139                        )
140                        .body(Empty::new())
141                        .unwrap(),
142                ),
143            )
144            .await
145            .unwrap()
146            .map_err(io::Error::other)?;
147
148        if !response.status().is_success() {
149            return Err(io::Error::other(response.status().to_string()));
150        }
151
152        while let Some(frame) = response.body_mut().frame().await {
153            let frame = frame.map_err(io::Error::other)?;
154            if let Some(data) = frame.data_ref() {
155                let len = data.len();
156                if len > buf.len() {
157                    return Err(io::Error::other("server did not respect range query"));
158                }
159                let (this, rest) = buf.split_at_mut(len);
160                this.copy_from_slice(data);
161                buf = rest;
162            }
163        }
164
165        if !buf.is_empty() {
166            return Err(io::ErrorKind::UnexpectedEof.into());
167        }
168
169        Ok(())
170    }
171
172    fn len(&self) -> u64 {
173        self.len
174    }
175}