diag_server/
diag_service.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! RPC service for diagnostics.
5
6use crate::grpc_result;
7use crate::new_pty;
8use anyhow::Context;
9use azure_profiler_proto::AzureProfiler;
10use azure_profiler_proto::ProfileRequest;
11use diag_proto::ExecRequest;
12use diag_proto::ExecResponse;
13use diag_proto::FILE_LINE_MAX;
14use diag_proto::FileRequest;
15use diag_proto::KmsgRequest;
16use diag_proto::MemoryProfileTraceRequest;
17use diag_proto::NetworkPacketCaptureRequest;
18use diag_proto::NetworkPacketCaptureResponse;
19use diag_proto::OpenhclDiag;
20use diag_proto::StartRequest;
21use diag_proto::UnderhillDiag;
22use diag_proto::WaitRequest;
23use diag_proto::WaitResponse;
24use diag_proto::network_packet_capture_request::Operation;
25use futures::AsyncRead;
26use futures::AsyncReadExt;
27use futures::AsyncWrite;
28use futures::AsyncWriteExt;
29use futures::FutureExt;
30use futures::StreamExt;
31use futures::future::join_all;
32use futures::io::AllowStdIo;
33use futures_concurrency::stream::Merge;
34use inspect::InspectionBuilder;
35use inspect_proto::InspectRequest;
36use inspect_proto::InspectResponse2;
37use inspect_proto::InspectService;
38use inspect_proto::UpdateRequest;
39use inspect_proto::UpdateResponse2;
40use mesh::CancelContext;
41use mesh::rpc::FailableRpc;
42use mesh::rpc::RpcSend;
43use mesh_rpc::server::RpcReceiver;
44use net_packet_capture::OperationData;
45use net_packet_capture::PacketCaptureOperation;
46use net_packet_capture::PacketCaptureParams;
47use net_packet_capture::StartData;
48use pal::unix::process::Stdio;
49use pal_async::driver::Driver;
50use pal_async::interest::InterestSlot;
51use pal_async::interest::PollEvents;
52use pal_async::pipe::PolledPipe;
53use pal_async::socket::AsSockRef;
54use pal_async::socket::PollReady;
55use pal_async::socket::PollReadyExt;
56use pal_async::socket::PolledSocket;
57use pal_async::task::Spawn;
58use pal_async::task::Task;
59use parking_lot::Mutex;
60use socket2::Socket;
61use std::collections::HashMap;
62use std::fs::File;
63use std::future::poll_fn;
64use std::io;
65use std::io::Read;
66use std::os::unix::fs::FileTypeExt;
67use std::os::unix::prelude::*;
68use std::process::ExitStatus;
69use std::sync::Arc;
70
71/// A diagnostics request.
72#[derive(Debug, mesh::MeshPayload)]
73pub enum DiagRequest {
74    /// Start the VM, if it has not already been started.
75    Start(FailableRpc<StartParams, ()>),
76    /// Inspect the VM.
77    Inspect(inspect::Deferred),
78    /// Crash the VM
79    Crash(i32),
80    /// Restart the worker.
81    Restart(FailableRpc<(), ()>),
82    /// Pause VTL0
83    Pause(FailableRpc<(), ()>),
84    /// Resume VTL0
85    Resume(FailableRpc<(), ()>),
86    /// Save VTL2 state
87    Save(FailableRpc<(), Vec<u8>>),
88    /// Setup network trace
89    PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
90    /// Profile VTL2
91    #[cfg(feature = "profiler")]
92    Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
93    /// Get memory profile trace
94    #[cfg(feature = "mem-profile-tracing")]
95    MemoryProfileTrace(FailableRpc<i32, Vec<u8>>),
96}
97
98/// Additional parameters provided as part of a delayed start request.
99#[derive(Debug, mesh::MeshPayload)]
100pub struct StartParams {
101    /// Environment variables to set or remove.
102    pub env: Vec<(String, Option<String>)>,
103    /// Command line arguments to append.
104    pub args: Vec<String>,
105}
106
107pub(crate) struct DiagServiceHandler {
108    request_send: mesh::Sender<DiagRequest>,
109    children: Mutex<HashMap<i32, Task<ExitStatus>>>,
110    inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
111    inner: Arc<crate::Inner>,
112}
113
114impl DiagServiceHandler {
115    pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
116        Self {
117            children: Default::default(),
118            request_send,
119            // On CVMs only allow inspecting nodes defined as safe.
120            inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
121            ) {
122                Some(inspect::SensitivityLevel::Safe)
123            } else {
124                None
125            },
126            // TODO: use a remotable type for `Inner`, which is just used to get
127            // data connection sockets.
128            inner,
129        }
130    }
131
132    pub async fn process_requests(
133        self: &Arc<Self>,
134        driver: &(impl Driver + Spawn + Clone),
135        diag_recv: RpcReceiver<UnderhillDiag>,
136        diag2_recv: RpcReceiver<OpenhclDiag>,
137        inspect_recv: RpcReceiver<InspectService>,
138        profile_recv: RpcReceiver<AzureProfiler>,
139    ) -> anyhow::Result<()> {
140        enum Event {
141            Diag(UnderhillDiag),
142            Diag2(OpenhclDiag),
143            Inspect(InspectService),
144            Profile(AzureProfiler),
145        }
146        let mut s = (
147            diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
148            diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
149            inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
150            profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
151        )
152            .merge();
153
154        while let Some((ctx, req)) = s.next().await {
155            driver
156                .spawn("diag request", {
157                    let driver = driver.clone();
158                    let this = self.clone();
159                    async move {
160                        match req {
161                            Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
162                            Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
163                            Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
164                            Event::Profile(req) => this.handle_profile_request(req, ctx).await,
165                        }
166                    }
167                })
168                .detach();
169        }
170        Ok(())
171    }
172
173    async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
174        self.inner.take_connection(id).await
175    }
176
177    async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
178        match req {
179            InspectService::Inspect(request, response) => {
180                let inspect_response = self.handle_inspect(&request, ctx).await;
181                response.send(grpc_result(Ok(Ok(inspect_response))));
182            }
183            InspectService::Update(request, response) => {
184                response.send(grpc_result(
185                    ctx.until_cancelled(self.handle_update(&request)).await,
186                ));
187            }
188        }
189    }
190
191    async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
192        match req {
193            AzureProfiler::Profile(request, response) => response.send(grpc_result(
194                ctx.until_cancelled(self.handle_profile(request)).await,
195            )),
196        }
197    }
198
199    async fn handle_diag_request(
200        &self,
201        driver: &(impl Driver + Spawn + Clone),
202        req: UnderhillDiag,
203        mut ctx: CancelContext,
204    ) {
205        match req {
206            UnderhillDiag::Exec(request, response) => response.send(grpc_result(
207                ctx.until_cancelled(self.handle_exec(driver, &request))
208                    .await,
209            )),
210            UnderhillDiag::Wait(request, response) => response.send(grpc_result(
211                ctx.until_cancelled(self.handle_wait(&request)).await,
212            )),
213            UnderhillDiag::Start(request, response) => {
214                response.send(grpc_result(
215                    ctx.until_cancelled(self.handle_start(request)).await,
216                ));
217            }
218            UnderhillDiag::Kmsg(request, response) => {
219                response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
220            }
221            UnderhillDiag::Crash(request, response) => {
222                response.send(grpc_result(
223                    ctx.until_cancelled(self.handle_crash(request)).await,
224                ));
225            }
226            UnderhillDiag::Restart(_, response) => {
227                response.send(grpc_result(
228                    ctx.until_cancelled(self.handle_restart()).await,
229                ));
230            }
231            UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
232                .handle_read_file(driver, &request)
233                .await))),
234            UnderhillDiag::Pause(_, response) => {
235                response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
236            }
237            UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
238                ctx.until_cancelled(self.handle_packet_capture(&request))
239                    .await,
240            )),
241            UnderhillDiag::Resume(_, response) => {
242                response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
243            }
244            UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
245                ctx.until_cancelled(self.handle_dump_saved_state()).await,
246            )),
247        }
248    }
249
250    async fn handle_diag2_request(
251        &self,
252        _driver: &(impl Driver + Spawn + Clone),
253        req: OpenhclDiag,
254        mut ctx: CancelContext,
255    ) {
256        match req {
257            OpenhclDiag::Ping((), response) => {
258                response.send(Ok(()));
259            }
260            OpenhclDiag::MemoryProfileTrace(request, response) => response.send(grpc_result(
261                ctx.until_cancelled(self.handle_memory_profile_trace(&request))
262                    .await,
263            )),
264        }
265    }
266
267    async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
268        let params = StartParams {
269            env: request
270                .env
271                .into_iter()
272                .map(|pair| (pair.name, pair.value))
273                .collect(),
274            args: request.args,
275        };
276        self.request_send
277            .call_failable(DiagRequest::Start, params)
278            .await?;
279        Ok(())
280    }
281
282    async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
283        self.request_send.send(DiagRequest::Crash(request.pid));
284
285        Ok(())
286    }
287
288    async fn handle_exec(
289        &self,
290        driver: &(impl Driver + Spawn + Clone),
291        request: &ExecRequest,
292    ) -> anyhow::Result<ExecResponse> {
293        tracing::info!(
294            command = %request.command,
295            stdin = request.stdin,
296            stdout = request.stdout,
297            stderr = request.stderr,
298            tty = request.tty,
299            "exec request"
300        );
301
302        let stdin = if request.stdin != 0 {
303            Some(
304                self.take_connection(request.stdin)
305                    .await
306                    .context("failed to get stdin conn")?,
307            )
308        } else {
309            None
310        };
311        let stdout = if request.stdout != 0 {
312            Some(
313                self.take_connection(request.stdout)
314                    .await
315                    .context("failed to get stdout conn")?,
316            )
317        } else {
318            None
319        };
320        let stderr = if request.stderr != 0 {
321            Some(
322                self.take_connection(request.stderr)
323                    .await
324                    .context("failed to get stderr conn")?,
325            )
326        } else {
327            None
328        };
329
330        let mut builder = pal::unix::process::Builder::new(&request.command);
331        builder.args(&request.args);
332        if request.clear_env {
333            builder.env_clear();
334        }
335        for diag_proto::EnvPair { name, value } in &request.env {
336            if let Some(value) = value {
337                builder.env(name, value);
338            } else {
339                builder.env_remove(name);
340            }
341        }
342
343        // HACK: A hack to fix segfault caused by glibc bug in L1 TDX VMM.
344        // Should be removed after glibc update or a clean CPUID virtualization solution.
345        // Please refer to https://github.com/microsoft/openvmm-deps/issues/21 for more information.
346        // xtask-fmt allow-target-arch cpu-intrinsic
347        #[cfg(target_arch = "x86_64")]
348        {
349            let result =
350                safe_intrinsics::cpuid(hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION, 0);
351            // Value 3 means TDX.
352            let tdx_isolated = (result.ebx & 0xF) == 3;
353            if tdx_isolated {
354                builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
355            }
356        };
357
358        let mut stdin_relay = None;
359        let mut stdout_relay = None;
360        let mut stderr_relay = None;
361        let mut raw_stdout = None;
362        let mut raw_stderr = None;
363        let mut child = {
364            let (stdin_pipes, stdout_pipes, stderr_pipes);
365            let stdin_socket;
366            let stdout_socket;
367            let stderr_socket;
368            let pty;
369            if request.tty {
370                pty = new_pty::new_pty().context("failed to create pty")?;
371
372                let primary = PolledPipe::new(driver, pty.0)
373                    .context("failed to create polled pty primary")?;
374
375                let secondary = &pty.1;
376
377                let (primary_read, primary_write) = primary.split();
378                if let Some(stdin) = stdin {
379                    stdin_relay = Some(driver.spawn("pty stdin relay", async move {
380                        relay(stdin, primary_write).await;
381                    }));
382                }
383                if let Some(stdout) = stdout {
384                    stdout_relay =
385                        Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
386                }
387
388                builder
389                    .setsid(true)
390                    .controlling_terminal(secondary.as_fd())
391                    .stdin(Stdio::Fd(secondary.as_fd()))
392                    .stdout(Stdio::Fd(secondary.as_fd()))
393                    .stderr(Stdio::Fd(secondary.as_fd()));
394            } else if request.raw_socket_io {
395                if let Some(stdin) = stdin {
396                    stdin_socket = stdin.into_inner();
397                    builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
398                }
399                if let Some(stdout) = stdout {
400                    stdout_socket = raw_stdout.insert(stdout.into_inner());
401                    builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
402                    if request.combine_stderr {
403                        builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
404                    }
405                }
406                if let Some(stderr) = stderr {
407                    stderr_socket = raw_stderr.insert(stderr.into_inner());
408                    builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
409                }
410            } else {
411                if let Some(stdin) = stdin {
412                    stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
413                    let pipe = PolledPipe::new(driver, stdin_pipes.1)
414                        .context("failed to create polled pipe")?;
415                    stdin_relay = Some(driver.spawn("stdin relay", async move {
416                        relay(stdin, pipe).await;
417                    }));
418                    builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
419                }
420                if let Some(stdout) = stdout {
421                    stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
422                    let pipe = PolledPipe::new(driver, stdout_pipes.0)
423                        .context("failed to create polled pipe")?;
424                    stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
425                    builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
426                    if request.combine_stderr {
427                        builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
428                    }
429                }
430                if let Some(stderr) = stderr {
431                    stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
432                    let pipe = PolledPipe::new(driver, stderr_pipes.0)
433                        .context("failed to create polled pipe")?;
434                    stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
435                    builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
436                }
437            }
438
439            builder
440                .spawn()
441                .with_context(|| format!("failed to launch {}", &request.command))?
442        };
443
444        let pid = child.id();
445
446        tracing::info!(pid, "spawned child");
447
448        let mut child_ready = driver
449            .new_dyn_fd_ready(child.as_fd().as_raw_fd())
450            .expect("failed creating child poll");
451
452        let status = driver.spawn("diag child wait", {
453            let driver = driver.clone();
454            async move {
455                poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
456                    .await;
457                let status = child.try_wait().unwrap().unwrap();
458                tracing::info!(pid, ?status, "child exited");
459
460                // The process is gone, so the stdin relay's job is done.
461                drop(stdin_relay);
462
463                // Shut down raw stdout and stderr to notify the host that there
464                // is no more data.
465                let finish_raw = |raw: Option<Socket>| {
466                    raw.and_then(|raw| {
467                        let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
468                        PolledSocket::new(&driver, raw).ok()
469                    })
470                };
471                let raw_stdout = finish_raw(raw_stdout);
472                let raw_stderr = finish_raw(raw_stderr);
473
474                // Wait for the host to finish with the stdout and stderr
475                // sockets, but don't block the process exit notification.
476                driver
477                    .spawn("socket-wait", async move {
478                        let await_output_relay = async |task, raw| {
479                            let socket = if let Some(task) = task {
480                                Some(task.await)
481                            } else {
482                                raw
483                            };
484                            if let Some(socket) = socket {
485                                // Wait for the host to close the socket to ensure that all
486                                // the data is written.
487                                let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
488                            }
489                        };
490
491                        await_output_relay(stdout_relay, raw_stdout).await;
492                        await_output_relay(stderr_relay, raw_stderr).await;
493                    })
494                    .detach();
495
496                status
497            }
498        });
499
500        self.children.lock().insert(pid, status);
501        Ok(ExecResponse { pid })
502    }
503
504    async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
505        tracing::debug!(pid = request.pid, "wait request");
506        let channel = self
507            .children
508            .lock()
509            .remove(&request.pid)
510            .context("pid not found")?;
511
512        let status = channel.await;
513        let exit_code = status.code().unwrap_or(255);
514
515        tracing::debug!(pid = request.pid, exit_code, "wait complete");
516
517        Ok(WaitResponse { exit_code })
518    }
519
520    async fn handle_inspect(
521        &self,
522        request: &InspectRequest,
523        mut ctx: CancelContext,
524    ) -> InspectResponse2 {
525        tracing::debug!(
526            path = request.path.as_str(),
527            depth = request.depth,
528            "inspect request"
529        );
530        let mut inspection = InspectionBuilder::new(&request.path)
531            .depth(Some(request.depth as usize))
532            .sensitivity(self.inspect_sensitivity_level)
533            .inspect(inspect::send(&self.request_send, DiagRequest::Inspect));
534
535        // Don't return early on cancel, as we want to return the partial
536        // inspection results.
537        let _ = ctx.until_cancelled(inspection.resolve()).await;
538
539        let result = inspection.results();
540        InspectResponse2 { result }
541    }
542
543    async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
544        tracing::debug!(
545            path = request.path.as_str(),
546            value = request.value.as_str(),
547            "update request"
548        );
549        let new_value = InspectionBuilder::new(&request.path)
550            .sensitivity(self.inspect_sensitivity_level)
551            .update(
552                &request.value,
553                inspect::send(&self.request_send, DiagRequest::Inspect),
554            )
555            .await?;
556        Ok(UpdateResponse2 { new_value })
557    }
558
559    async fn handle_kmsg(
560        &self,
561        driver: &(impl Driver + Spawn + Clone),
562        request: &KmsgRequest,
563    ) -> anyhow::Result<()> {
564        self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
565            .await
566    }
567
568    async fn handle_read_file(
569        &self,
570        driver: &(impl Driver + Spawn + Clone),
571        request: &FileRequest,
572    ) -> anyhow::Result<()> {
573        self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
574            .await
575    }
576
577    async fn handle_packet_capture(
578        &self,
579        request: &NetworkPacketCaptureRequest,
580    ) -> anyhow::Result<NetworkPacketCaptureResponse> {
581        let operation = if request.operation == Operation::Query as i32 {
582            PacketCaptureOperation::Query
583        } else if request.operation == Operation::Start as i32 {
584            PacketCaptureOperation::Start
585        } else if request.operation == Operation::Stop as i32 {
586            PacketCaptureOperation::Stop
587        } else {
588            anyhow::bail!("unsupported request type {}", request.operation);
589        };
590
591        let op_data = match operation {
592            // Query the number of streams needed, starting with a value of 0.
593            PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
594            PacketCaptureOperation::Start => {
595                let Some(op_data) = &request.op_data else {
596                    anyhow::bail!("missing start operation parameters");
597                };
598
599                match op_data {
600                    diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
601                        let writers = join_all(start_data.conns.iter().map(async |c| {
602                            let conn = self.take_connection(*c).await?;
603                            Ok(conn.into_inner())
604                        }))
605                        .await
606                        .into_iter()
607                        .collect::<anyhow::Result<Vec<Socket>>>()?;
608                        Some(OperationData::OpStartData(StartData {
609                            writers,
610                            snaplen: start_data.snaplen,
611                        }))
612                    }
613                }
614            }
615            _ => None,
616        };
617
618        let params = PacketCaptureParams { operation, op_data };
619        let params = self
620            .request_send
621            .call_failable(DiagRequest::PacketCapture, params)
622            .await?;
623        let num_streams = match params.op_data {
624            Some(OperationData::OpQueryData(num_streams)) => num_streams,
625            _ => 0,
626        };
627        Ok(NetworkPacketCaptureResponse { num_streams })
628    }
629
630    async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
631        let conn = self.take_connection(request.conn).await?;
632        #[cfg(feature = "profiler")]
633        {
634            let profiler_request = profiler_worker::ProfilerRequest {
635                profiler_args: request.profiler_args,
636                duration: request.duration,
637                conn: conn.into_inner(),
638            };
639
640            self.request_send
641                .call_failable(DiagRequest::Profile, profiler_request)
642                .await?;
643        }
644        #[cfg(not(feature = "profiler"))]
645        {
646            // Profiler feature disabled, drop the connection.
647            drop(conn);
648            tracing::error!("Profiler feature disabled");
649        }
650        Ok(())
651    }
652
653    async fn handle_read_file_request(
654        &self,
655        driver: &(impl Driver + Spawn + Clone),
656        conn: u64,
657        follow: bool,
658        file_path: &str,
659    ) -> anyhow::Result<()> {
660        let mut conn = self.take_connection(conn).await?;
661        let file = fs_err::File::open(file_path).context("failed to open file")?;
662
663        let file_meta = file.metadata()?;
664
665        if file_meta.file_type().is_char_device() {
666            let file =
667                PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
668
669            driver
670                .spawn("read file relay", async move {
671                    if let Err(err) = relay_read_file(file, conn, follow).await {
672                        tracing::warn!(
673                            error = &*err as &dyn std::error::Error,
674                            "read file relay failed"
675                        );
676                    }
677                })
678                .detach();
679        } else if file_meta.file_type().is_file() {
680            driver
681                .spawn("read file relay", async move {
682                    // Since this is a file, and in Underhill files are backed
683                    // by RAM, allow blocking reads directly on this thread,
684                    // since the reads should be satisfied instantly.
685                    //
686                    // (If this becomes a problem, we can spawn a thread to do
687                    // this, or use io-uring.)
688                    if let Err(err) =
689                        futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
690                    {
691                        tracing::warn!(
692                            error = &err as &dyn std::error::Error,
693                            "read file relay failed"
694                        );
695                    }
696                })
697                .detach();
698        } else {
699            anyhow::bail!("cannot read directory");
700        }
701
702        Ok(())
703    }
704
705    async fn handle_restart(&self) -> anyhow::Result<()> {
706        self.request_send
707            .call_failable(DiagRequest::Restart, ())
708            .await?;
709        Ok(())
710    }
711
712    async fn handle_pause(&self) -> anyhow::Result<()> {
713        self.request_send
714            .call_failable(DiagRequest::Pause, ())
715            .await?;
716        Ok(())
717    }
718
719    async fn handle_resume(&self) -> anyhow::Result<()> {
720        self.request_send
721            .call_failable(DiagRequest::Resume, ())
722            .await?;
723        Ok(())
724    }
725
726    async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
727        let data = self
728            .request_send
729            .call_failable(DiagRequest::Save, ())
730            .await?;
731
732        Ok(diag_proto::DumpSavedStateResponse { data })
733    }
734
735    async fn handle_memory_profile_trace(
736        &self,
737        request: &MemoryProfileTraceRequest,
738    ) -> anyhow::Result<diag_proto::MemoryProfileTraceResponse> {
739        cfg_if::cfg_if! {
740            if #[cfg(feature = "mem-profile-tracing")]
741            {
742                let data = self
743                    .request_send
744                    .call_failable(DiagRequest::MemoryProfileTrace, request.pid)
745                    .await?;
746
747                Ok(diag_proto::MemoryProfileTraceResponse { data })
748            } else {
749                let _ = request;
750                anyhow::bail!(
751                    "Profiler tracing feature disabled: rebuild with the `mem-profile-tracing` feature enabled"
752                )
753            }
754        }
755    }
756}
757
758async fn relay<
759    R: 'static + AsyncRead + Unpin + Send,
760    W: 'static + AsyncWrite + PollReady + Unpin + Send,
761>(
762    mut read: R,
763    mut write: W,
764) -> W {
765    let mut buffer = [0; 1024];
766    let result: anyhow::Result<_> = async {
767        loop {
768            let n = futures::select! { // merge semantics
769                n = read.read(&mut buffer).fuse() => n.context("read failed")?,
770                _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
771                    // RDHUP indicates the connection is closed or shut down.
772                    // Although generically this does not indicate that the
773                    // connection does not want to _read_ any more data, for our
774                    // use cases it does (either we are using a unidirectional
775                    // pipe/socket, or we are using a pty, which never returns
776                    // RDHUP but does return HUP, which is just as good).
777                    //
778                    // Stop this relay to propagate the close notification to
779                    // the other endpoint.
780                    break;
781                }
782            };
783            if n == 0 {
784                break;
785            }
786            write
787                .write_all(&buffer[..n])
788                .await
789                .context("write failed")?;
790        }
791        Ok(())
792    }
793    .await;
794    let _ = write.close().await;
795    if let Err(err) = result {
796        tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
797    }
798    write
799}
800
801async fn relay_read_file(
802    mut file: PolledPipe,
803    mut conn: PolledSocket<Socket>,
804    follow: bool,
805) -> anyhow::Result<()> {
806    let mut buffer = [0; FILE_LINE_MAX];
807    loop {
808        let n = if follow {
809            futures::select! { // race semantics
810                _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
811                n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
812            }
813        } else {
814            // The caller just wants the current contents of file, so issue a
815            // nonblocking, non-async read, and handle EAGAIN below.
816            file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
817        };
818        let n = match n {
819            Ok(0) => break,
820            Ok(count) => count,
821            Err(e) => {
822                match e.kind() {
823                    io::ErrorKind::BrokenPipe => {
824                        // The kmsg interface returns EPIPE if an entry has overwritten another in the ring.
825                        // Retry the read which has the kernel move the seek position to the next available record.
826                        continue;
827                    }
828                    io::ErrorKind::WouldBlock => {
829                        // There are no more messages.
830                        assert!(!follow);
831                        break;
832                    }
833                    _ => return Err(e).context("file read failed"),
834                }
835            }
836        };
837        assert!(
838            n < buffer.len(),
839            "the file returned a line bigger than its maximum"
840        );
841        // Add a null terminator.
842        buffer[n] = 0;
843        // Write the message followed by a null terminator.
844        conn.write_all(&buffer[..n + 1])
845            .await
846            .context("socket write failed")?;
847    }
848    Ok(())
849}