Skip to main content

diag_server/
diag_service.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! RPC service for diagnostics.
5
6use crate::grpc_result;
7use anyhow::Context;
8use azure_profiler_proto::AzureProfiler;
9use azure_profiler_proto::ProfileRequest;
10use diag_proto::ExecRequest;
11use diag_proto::ExecResponse;
12use diag_proto::FILE_LINE_MAX;
13use diag_proto::FileRequest;
14use diag_proto::KmsgRequest;
15use diag_proto::MemoryProfileTraceRequest;
16use diag_proto::NetworkPacketCaptureRequest;
17use diag_proto::NetworkPacketCaptureResponse;
18use diag_proto::OpenhclDiag;
19use diag_proto::StartRequest;
20use diag_proto::UnderhillDiag;
21use diag_proto::WaitRequest;
22use diag_proto::WaitResponse;
23use diag_proto::network_packet_capture_request::Operation;
24use futures::AsyncRead;
25use futures::AsyncReadExt;
26use futures::AsyncWrite;
27use futures::AsyncWriteExt;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::future::join_all;
31use futures::io::AllowStdIo;
32use futures_concurrency::stream::Merge;
33use inspect::InspectionBuilder;
34use inspect_proto::InspectRequest;
35use inspect_proto::InspectResponse2;
36use inspect_proto::InspectService;
37use inspect_proto::UpdateRequest;
38use inspect_proto::UpdateResponse2;
39use mesh::CancelContext;
40use mesh::rpc::FailableRpc;
41use mesh::rpc::RpcSend;
42use mesh_rpc::server::RpcReceiver;
43use net_packet_capture::OperationData;
44use net_packet_capture::PacketCaptureOperation;
45use net_packet_capture::PacketCaptureParams;
46use net_packet_capture::StartData;
47use pal::unix::process::Stdio;
48use pal_async::driver::Driver;
49use pal_async::interest::InterestSlot;
50use pal_async::interest::PollEvents;
51use pal_async::pipe::PolledPipe;
52use pal_async::socket::AsSockRef;
53use pal_async::socket::PollReady;
54use pal_async::socket::PollReadyExt;
55use pal_async::socket::PolledSocket;
56use pal_async::task::Spawn;
57use pal_async::task::Task;
58use parking_lot::Mutex;
59use socket2::Socket;
60use std::collections::HashMap;
61use std::fs::File;
62use std::future::poll_fn;
63use std::io;
64use std::io::Read;
65use std::os::unix::fs::FileTypeExt;
66use std::os::unix::prelude::*;
67use std::process::ExitStatus;
68use std::sync::Arc;
69
70/// A diagnostics request.
71#[derive(Debug, mesh::MeshPayload)]
72pub enum DiagRequest {
73    /// Start the VM, if it has not already been started.
74    Start(FailableRpc<StartParams, ()>),
75    /// Inspect the VM.
76    Inspect(inspect::Deferred),
77    /// Crash the VM
78    Crash(i32),
79    /// Restart the worker.
80    Restart(FailableRpc<(), ()>),
81    /// Pause VTL0
82    Pause(FailableRpc<(), ()>),
83    /// Resume VTL0
84    Resume(FailableRpc<(), ()>),
85    /// Save VTL2 state
86    Save(FailableRpc<(), Vec<u8>>),
87    /// Setup network trace
88    PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
89    /// Profile VTL2
90    #[cfg(feature = "profiler")]
91    Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
92    /// Get memory profile trace
93    #[cfg(feature = "mem-profile-tracing")]
94    MemoryProfileTrace(FailableRpc<i32, Vec<u8>>),
95}
96
97/// Additional parameters provided as part of a delayed start request.
98#[derive(Debug, mesh::MeshPayload)]
99pub struct StartParams {
100    /// Environment variables to set or remove.
101    pub env: Vec<(String, Option<String>)>,
102    /// Command line arguments to append.
103    pub args: Vec<String>,
104}
105
106pub(crate) struct DiagServiceHandler {
107    request_send: mesh::Sender<DiagRequest>,
108    children: Mutex<HashMap<i32, Task<ExitStatus>>>,
109    inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
110    inner: Arc<crate::Inner>,
111}
112
113impl DiagServiceHandler {
114    pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
115        Self {
116            children: Default::default(),
117            request_send,
118            // On CVMs only allow inspecting nodes defined as safe.
119            inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
120            ) {
121                Some(inspect::SensitivityLevel::Safe)
122            } else {
123                None
124            },
125            // TODO: use a remotable type for `Inner`, which is just used to get
126            // data connection sockets.
127            inner,
128        }
129    }
130
131    pub async fn process_requests(
132        self: &Arc<Self>,
133        driver: &(impl Driver + Spawn + Clone),
134        diag_recv: RpcReceiver<UnderhillDiag>,
135        diag2_recv: RpcReceiver<OpenhclDiag>,
136        inspect_recv: RpcReceiver<InspectService>,
137        profile_recv: RpcReceiver<AzureProfiler>,
138    ) -> anyhow::Result<()> {
139        enum Event {
140            Diag(UnderhillDiag),
141            Diag2(OpenhclDiag),
142            Inspect(InspectService),
143            Profile(AzureProfiler),
144        }
145        let mut s = (
146            diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
147            diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
148            inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
149            profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
150        )
151            .merge();
152
153        while let Some((ctx, req)) = s.next().await {
154            driver
155                .spawn("diag request", {
156                    let driver = driver.clone();
157                    let this = self.clone();
158                    async move {
159                        match req {
160                            Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
161                            Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
162                            Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
163                            Event::Profile(req) => this.handle_profile_request(req, ctx).await,
164                        }
165                    }
166                })
167                .detach();
168        }
169        Ok(())
170    }
171
172    async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
173        self.inner.take_connection(id).await
174    }
175
176    async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
177        match req {
178            InspectService::Inspect(request, response) => {
179                let inspect_response = self.handle_inspect(&request, ctx).await;
180                response.send(grpc_result(Ok(Ok(inspect_response))));
181            }
182            InspectService::Update(request, response) => {
183                response.send(grpc_result(
184                    ctx.until_cancelled(self.handle_update(&request)).await,
185                ));
186            }
187        }
188    }
189
190    async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
191        match req {
192            AzureProfiler::Profile(request, response) => response.send(grpc_result(
193                ctx.until_cancelled(self.handle_profile(request)).await,
194            )),
195        }
196    }
197
198    async fn handle_diag_request(
199        &self,
200        driver: &(impl Driver + Spawn + Clone),
201        req: UnderhillDiag,
202        mut ctx: CancelContext,
203    ) {
204        match req {
205            UnderhillDiag::Exec(request, response) => response.send(grpc_result(
206                ctx.until_cancelled(self.handle_exec(driver, &request))
207                    .await,
208            )),
209            UnderhillDiag::Wait(request, response) => response.send(grpc_result(
210                ctx.until_cancelled(self.handle_wait(&request)).await,
211            )),
212            UnderhillDiag::Start(request, response) => {
213                response.send(grpc_result(
214                    ctx.until_cancelled(self.handle_start(request)).await,
215                ));
216            }
217            UnderhillDiag::Kmsg(request, response) => {
218                response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
219            }
220            UnderhillDiag::Crash(request, response) => {
221                response.send(grpc_result(
222                    ctx.until_cancelled(self.handle_crash(request)).await,
223                ));
224            }
225            UnderhillDiag::Restart(_, response) => {
226                response.send(grpc_result(
227                    ctx.until_cancelled(self.handle_restart()).await,
228                ));
229            }
230            UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
231                .handle_read_file(driver, &request)
232                .await))),
233            UnderhillDiag::Pause(_, response) => {
234                response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
235            }
236            UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
237                ctx.until_cancelled(self.handle_packet_capture(&request))
238                    .await,
239            )),
240            UnderhillDiag::Resume(_, response) => {
241                response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
242            }
243            UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
244                ctx.until_cancelled(self.handle_dump_saved_state()).await,
245            )),
246        }
247    }
248
249    async fn handle_diag2_request(
250        &self,
251        _driver: &(impl Driver + Spawn + Clone),
252        req: OpenhclDiag,
253        mut ctx: CancelContext,
254    ) {
255        match req {
256            OpenhclDiag::Ping((), response) => {
257                response.send(Ok(()));
258            }
259            OpenhclDiag::MemoryProfileTrace(request, response) => response.send(grpc_result(
260                ctx.until_cancelled(self.handle_memory_profile_trace(&request))
261                    .await,
262            )),
263        }
264    }
265
266    async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
267        let params = StartParams {
268            env: request
269                .env
270                .into_iter()
271                .map(|pair| (pair.name, pair.value))
272                .collect(),
273            args: request.args,
274        };
275        self.request_send
276            .call_failable(DiagRequest::Start, params)
277            .await?;
278        Ok(())
279    }
280
281    async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
282        self.request_send.send(DiagRequest::Crash(request.pid));
283
284        Ok(())
285    }
286
287    async fn handle_exec(
288        &self,
289        driver: &(impl Driver + Spawn + Clone),
290        request: &ExecRequest,
291    ) -> anyhow::Result<ExecResponse> {
292        tracing::info!(
293            command = %request.command,
294            stdin = request.stdin,
295            stdout = request.stdout,
296            stderr = request.stderr,
297            tty = request.tty,
298            "exec request"
299        );
300
301        let stdin = if request.stdin != 0 {
302            Some(
303                self.take_connection(request.stdin)
304                    .await
305                    .context("failed to get stdin conn")?,
306            )
307        } else {
308            None
309        };
310        let stdout = if request.stdout != 0 {
311            Some(
312                self.take_connection(request.stdout)
313                    .await
314                    .context("failed to get stdout conn")?,
315            )
316        } else {
317            None
318        };
319        let stderr = if request.stderr != 0 {
320            Some(
321                self.take_connection(request.stderr)
322                    .await
323                    .context("failed to get stderr conn")?,
324            )
325        } else {
326            None
327        };
328
329        let mut builder = pal::unix::process::Builder::new(&request.command);
330        builder.args(&request.args);
331        if request.clear_env {
332            builder.env_clear();
333        }
334        for diag_proto::EnvPair { name, value } in &request.env {
335            if let Some(value) = value {
336                builder.env(name, value);
337            } else {
338                builder.env_remove(name);
339            }
340        }
341
342        // HACK: A hack to fix segfault caused by glibc bug in L1 TDX VMM.
343        // Should be removed after glibc update or a clean CPUID virtualization solution.
344        // Please refer to https://github.com/microsoft/openvmm-deps/issues/21 for more information.
345        // xtask-fmt allow-target-arch cpu-intrinsic
346        #[cfg(target_arch = "x86_64")]
347        {
348            let result =
349                safe_intrinsics::cpuid(hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION, 0);
350            // Value 3 means TDX.
351            let tdx_isolated = (result.ebx & 0xF) == 3;
352            if tdx_isolated {
353                builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
354            }
355        };
356
357        let mut stdin_relay = None;
358        let mut stdout_relay = None;
359        let mut stderr_relay = None;
360        let mut raw_stdout = None;
361        let mut raw_stderr = None;
362        let mut child = {
363            let (stdin_pipes, stdout_pipes, stderr_pipes);
364            let stdin_socket;
365            let stdout_socket;
366            let stderr_socket;
367            let pty;
368            if request.tty {
369                pty = term::open_pty().context("failed to create pty")?;
370
371                let primary = PolledPipe::new(driver, pty.0)
372                    .context("failed to create polled pty primary")?;
373
374                let secondary = &pty.1;
375
376                let (primary_read, primary_write) = primary.split();
377                if let Some(stdin) = stdin {
378                    stdin_relay = Some(driver.spawn("pty stdin relay", async move {
379                        relay(stdin, primary_write).await;
380                    }));
381                }
382                if let Some(stdout) = stdout {
383                    stdout_relay =
384                        Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
385                }
386
387                builder
388                    .setsid(true)
389                    .controlling_terminal(secondary.as_fd())
390                    .stdin(Stdio::Fd(secondary.as_fd()))
391                    .stdout(Stdio::Fd(secondary.as_fd()))
392                    .stderr(Stdio::Fd(secondary.as_fd()));
393            } else if request.raw_socket_io {
394                if let Some(stdin) = stdin {
395                    stdin_socket = stdin.into_inner();
396                    builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
397                }
398                if let Some(stdout) = stdout {
399                    stdout_socket = raw_stdout.insert(stdout.into_inner());
400                    builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
401                    if request.combine_stderr {
402                        builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
403                    }
404                }
405                if let Some(stderr) = stderr {
406                    stderr_socket = raw_stderr.insert(stderr.into_inner());
407                    builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
408                }
409            } else {
410                if let Some(stdin) = stdin {
411                    stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
412                    let pipe = PolledPipe::new(driver, stdin_pipes.1)
413                        .context("failed to create polled pipe")?;
414                    stdin_relay = Some(driver.spawn("stdin relay", async move {
415                        relay(stdin, pipe).await;
416                    }));
417                    builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
418                }
419                if let Some(stdout) = stdout {
420                    stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
421                    let pipe = PolledPipe::new(driver, stdout_pipes.0)
422                        .context("failed to create polled pipe")?;
423                    stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
424                    builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
425                    if request.combine_stderr {
426                        builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
427                    }
428                }
429                if let Some(stderr) = stderr {
430                    stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
431                    let pipe = PolledPipe::new(driver, stderr_pipes.0)
432                        .context("failed to create polled pipe")?;
433                    stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
434                    builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
435                }
436            }
437
438            builder
439                .spawn()
440                .with_context(|| format!("failed to launch {}", &request.command))?
441        };
442
443        let pid = child.id();
444
445        tracing::info!(pid, "spawned child");
446
447        let mut child_ready = driver
448            .new_dyn_fd_ready(child.as_fd().as_raw_fd())
449            .expect("failed creating child poll");
450
451        let status = driver.spawn("diag child wait", {
452            let driver = driver.clone();
453            async move {
454                poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
455                    .await;
456                let status = child.try_wait().unwrap().unwrap();
457                tracing::info!(pid, ?status, "child exited");
458
459                // The process is gone, so the stdin relay's job is done.
460                drop(stdin_relay);
461
462                // Shut down raw stdout and stderr to notify the host that there
463                // is no more data.
464                let finish_raw = |raw: Option<Socket>| {
465                    raw.and_then(|raw| {
466                        let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
467                        PolledSocket::new(&driver, raw).ok()
468                    })
469                };
470                let raw_stdout = finish_raw(raw_stdout);
471                let raw_stderr = finish_raw(raw_stderr);
472
473                // Wait for the host to finish with the stdout and stderr
474                // sockets, but don't block the process exit notification.
475                driver
476                    .spawn("socket-wait", async move {
477                        let await_output_relay = async |task, raw| {
478                            let socket = if let Some(task) = task {
479                                Some(task.await)
480                            } else {
481                                raw
482                            };
483                            if let Some(socket) = socket {
484                                // Wait for the host to close the socket to ensure that all
485                                // the data is written.
486                                let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
487                            }
488                        };
489
490                        await_output_relay(stdout_relay, raw_stdout).await;
491                        await_output_relay(stderr_relay, raw_stderr).await;
492                    })
493                    .detach();
494
495                status
496            }
497        });
498
499        self.children.lock().insert(pid, status);
500        Ok(ExecResponse { pid })
501    }
502
503    async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
504        tracing::debug!(pid = request.pid, "wait request");
505        let channel = self
506            .children
507            .lock()
508            .remove(&request.pid)
509            .context("pid not found")?;
510
511        let status = channel.await;
512        let exit_code = status.code().unwrap_or(255);
513
514        tracing::debug!(pid = request.pid, exit_code, "wait complete");
515
516        Ok(WaitResponse { exit_code })
517    }
518
519    async fn handle_inspect(
520        &self,
521        request: &InspectRequest,
522        mut ctx: CancelContext,
523    ) -> InspectResponse2 {
524        tracing::debug!(
525            path = request.path.as_str(),
526            depth = request.depth,
527            "inspect request"
528        );
529        let mut inspection = InspectionBuilder::new(&request.path)
530            .depth(Some(request.depth as usize))
531            .sensitivity(self.inspect_sensitivity_level)
532            .inspect(inspect::send(&self.request_send, DiagRequest::Inspect));
533
534        // Don't return early on cancel, as we want to return the partial
535        // inspection results.
536        let _ = ctx.until_cancelled(inspection.resolve()).await;
537
538        let result = inspection.results();
539        InspectResponse2 { result }
540    }
541
542    async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
543        tracing::debug!(
544            path = request.path.as_str(),
545            value = request.value.as_str(),
546            "update request"
547        );
548        let new_value = InspectionBuilder::new(&request.path)
549            .sensitivity(self.inspect_sensitivity_level)
550            .update(
551                &request.value,
552                inspect::send(&self.request_send, DiagRequest::Inspect),
553            )
554            .await?;
555        Ok(UpdateResponse2 { new_value })
556    }
557
558    async fn handle_kmsg(
559        &self,
560        driver: &(impl Driver + Spawn + Clone),
561        request: &KmsgRequest,
562    ) -> anyhow::Result<()> {
563        self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
564            .await
565    }
566
567    async fn handle_read_file(
568        &self,
569        driver: &(impl Driver + Spawn + Clone),
570        request: &FileRequest,
571    ) -> anyhow::Result<()> {
572        self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
573            .await
574    }
575
576    async fn handle_packet_capture(
577        &self,
578        request: &NetworkPacketCaptureRequest,
579    ) -> anyhow::Result<NetworkPacketCaptureResponse> {
580        let operation = if request.operation == Operation::Query as i32 {
581            PacketCaptureOperation::Query
582        } else if request.operation == Operation::Start as i32 {
583            PacketCaptureOperation::Start
584        } else if request.operation == Operation::Stop as i32 {
585            PacketCaptureOperation::Stop
586        } else {
587            anyhow::bail!("unsupported request type {}", request.operation);
588        };
589
590        let op_data = match operation {
591            // Query the number of streams needed, starting with a value of 0.
592            PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
593            PacketCaptureOperation::Start => {
594                let Some(op_data) = &request.op_data else {
595                    anyhow::bail!("missing start operation parameters");
596                };
597
598                match op_data {
599                    diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
600                        let writers = join_all(start_data.conns.iter().map(async |c| {
601                            let conn = self.take_connection(*c).await?;
602                            Ok(conn.into_inner())
603                        }))
604                        .await
605                        .into_iter()
606                        .collect::<anyhow::Result<Vec<Socket>>>()?;
607                        Some(OperationData::OpStartData(StartData {
608                            writers,
609                            snaplen: start_data.snaplen,
610                        }))
611                    }
612                }
613            }
614            _ => None,
615        };
616
617        let params = PacketCaptureParams { operation, op_data };
618        let params = self
619            .request_send
620            .call_failable(DiagRequest::PacketCapture, params)
621            .await?;
622        let num_streams = match params.op_data {
623            Some(OperationData::OpQueryData(num_streams)) => num_streams,
624            _ => 0,
625        };
626        Ok(NetworkPacketCaptureResponse { num_streams })
627    }
628
629    async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
630        let conn = self.take_connection(request.conn).await?;
631        #[cfg(feature = "profiler")]
632        {
633            let profiler_request = profiler_worker::ProfilerRequest {
634                profiler_args: request.profiler_args,
635                duration: request.duration,
636                conn: conn.into_inner(),
637            };
638
639            self.request_send
640                .call_failable(DiagRequest::Profile, profiler_request)
641                .await?;
642        }
643        #[cfg(not(feature = "profiler"))]
644        {
645            // Profiler feature disabled, drop the connection.
646            drop(conn);
647            tracing::error!("Profiler feature disabled");
648        }
649        Ok(())
650    }
651
652    async fn handle_read_file_request(
653        &self,
654        driver: &(impl Driver + Spawn + Clone),
655        conn: u64,
656        follow: bool,
657        file_path: &str,
658    ) -> anyhow::Result<()> {
659        let mut conn = self.take_connection(conn).await?;
660        let file = fs_err::File::open(file_path).context("failed to open file")?;
661
662        let file_meta = file.metadata()?;
663
664        if file_meta.file_type().is_char_device() {
665            let file =
666                PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
667
668            driver
669                .spawn("read file relay", async move {
670                    if let Err(err) = relay_read_file(file, conn, follow).await {
671                        tracing::warn!(
672                            error = &*err as &dyn std::error::Error,
673                            "read file relay failed"
674                        );
675                    }
676                })
677                .detach();
678        } else if file_meta.file_type().is_file() {
679            driver
680                .spawn("read file relay", async move {
681                    // Since this is a file, and in Underhill files are backed
682                    // by RAM, allow blocking reads directly on this thread,
683                    // since the reads should be satisfied instantly.
684                    //
685                    // (If this becomes a problem, we can spawn a thread to do
686                    // this, or use io-uring.)
687                    if let Err(err) =
688                        futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
689                    {
690                        tracing::warn!(
691                            error = &err as &dyn std::error::Error,
692                            "read file relay failed"
693                        );
694                    }
695                })
696                .detach();
697        } else {
698            anyhow::bail!("cannot read directory");
699        }
700
701        Ok(())
702    }
703
704    async fn handle_restart(&self) -> anyhow::Result<()> {
705        self.request_send
706            .call_failable(DiagRequest::Restart, ())
707            .await?;
708        Ok(())
709    }
710
711    async fn handle_pause(&self) -> anyhow::Result<()> {
712        self.request_send
713            .call_failable(DiagRequest::Pause, ())
714            .await?;
715        Ok(())
716    }
717
718    async fn handle_resume(&self) -> anyhow::Result<()> {
719        self.request_send
720            .call_failable(DiagRequest::Resume, ())
721            .await?;
722        Ok(())
723    }
724
725    async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
726        let data = self
727            .request_send
728            .call_failable(DiagRequest::Save, ())
729            .await?;
730
731        Ok(diag_proto::DumpSavedStateResponse { data })
732    }
733
734    async fn handle_memory_profile_trace(
735        &self,
736        request: &MemoryProfileTraceRequest,
737    ) -> anyhow::Result<diag_proto::MemoryProfileTraceResponse> {
738        cfg_if::cfg_if! {
739            if #[cfg(feature = "mem-profile-tracing")]
740            {
741                let data = self
742                    .request_send
743                    .call_failable(DiagRequest::MemoryProfileTrace, request.pid)
744                    .await?;
745
746                Ok(diag_proto::MemoryProfileTraceResponse { data })
747            } else {
748                let _ = request;
749                anyhow::bail!(
750                    "Profiler tracing feature disabled: rebuild with the `mem-profile-tracing` feature enabled"
751                )
752            }
753        }
754    }
755}
756
757async fn relay<
758    R: 'static + AsyncRead + Unpin + Send,
759    W: 'static + AsyncWrite + PollReady + Unpin + Send,
760>(
761    mut read: R,
762    mut write: W,
763) -> W {
764    let mut buffer = [0; 1024];
765    let result: anyhow::Result<_> = async {
766        loop {
767            let n = futures::select! { // merge semantics
768                n = read.read(&mut buffer).fuse() => n.context("read failed")?,
769                _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
770                    // RDHUP indicates the connection is closed or shut down.
771                    // Although generically this does not indicate that the
772                    // connection does not want to _read_ any more data, for our
773                    // use cases it does (either we are using a unidirectional
774                    // pipe/socket, or we are using a pty, which never returns
775                    // RDHUP but does return HUP, which is just as good).
776                    //
777                    // Stop this relay to propagate the close notification to
778                    // the other endpoint.
779                    break;
780                }
781            };
782            if n == 0 {
783                break;
784            }
785            write
786                .write_all(&buffer[..n])
787                .await
788                .context("write failed")?;
789        }
790        Ok(())
791    }
792    .await;
793    let _ = write.close().await;
794    if let Err(err) = result {
795        tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
796    }
797    write
798}
799
800async fn relay_read_file(
801    mut file: PolledPipe,
802    mut conn: PolledSocket<Socket>,
803    follow: bool,
804) -> anyhow::Result<()> {
805    let mut buffer = [0; FILE_LINE_MAX];
806    loop {
807        let n = if follow {
808            futures::select! { // race semantics
809                _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
810                n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
811            }
812        } else {
813            // The caller just wants the current contents of file, so issue a
814            // nonblocking, non-async read, and handle EAGAIN below.
815            file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
816        };
817        let n = match n {
818            Ok(0) => break,
819            Ok(count) => count,
820            Err(e) => {
821                match e.kind() {
822                    io::ErrorKind::BrokenPipe => {
823                        // The kmsg interface returns EPIPE if an entry has overwritten another in the ring.
824                        // Retry the read which has the kernel move the seek position to the next available record.
825                        continue;
826                    }
827                    io::ErrorKind::WouldBlock => {
828                        // There are no more messages.
829                        assert!(!follow);
830                        break;
831                    }
832                    _ => return Err(e).context("file read failed"),
833                }
834            }
835        };
836        assert!(
837            n < buffer.len(),
838            "the file returned a line bigger than its maximum"
839        );
840        // Add a null terminator.
841        buffer[n] = 0;
842        // Write the message followed by a null terminator.
843        conn.write_all(&buffer[..n + 1])
844            .await
845            .context("socket write failed")?;
846    }
847    Ok(())
848}