diag_server/
diag_service.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT License.
3
4//! RPC service for diagnostics.
5
6use crate::grpc_result;
7use crate::new_pty;
8use anyhow::Context;
9use azure_profiler_proto::AzureProfiler;
10use azure_profiler_proto::ProfileRequest;
11use diag_proto::ExecRequest;
12use diag_proto::ExecResponse;
13use diag_proto::FILE_LINE_MAX;
14use diag_proto::FileRequest;
15use diag_proto::KmsgRequest;
16use diag_proto::NetworkPacketCaptureRequest;
17use diag_proto::NetworkPacketCaptureResponse;
18use diag_proto::OpenhclDiag;
19use diag_proto::StartRequest;
20use diag_proto::UnderhillDiag;
21use diag_proto::WaitRequest;
22use diag_proto::WaitResponse;
23use diag_proto::network_packet_capture_request::Operation;
24use futures::AsyncRead;
25use futures::AsyncReadExt;
26use futures::AsyncWrite;
27use futures::AsyncWriteExt;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::future::join_all;
31use futures::io::AllowStdIo;
32use futures_concurrency::stream::Merge;
33use inspect::InspectionBuilder;
34use inspect_proto::InspectRequest;
35use inspect_proto::InspectResponse2;
36use inspect_proto::InspectService;
37use inspect_proto::UpdateRequest;
38use inspect_proto::UpdateResponse2;
39use mesh::CancelContext;
40use mesh::rpc::FailableRpc;
41use mesh::rpc::RpcSend;
42use mesh_rpc::server::RpcReceiver;
43use net_packet_capture::OperationData;
44use net_packet_capture::PacketCaptureOperation;
45use net_packet_capture::PacketCaptureParams;
46use net_packet_capture::StartData;
47use pal::unix::process::Stdio;
48use pal_async::driver::Driver;
49use pal_async::interest::InterestSlot;
50use pal_async::interest::PollEvents;
51use pal_async::pipe::PolledPipe;
52use pal_async::socket::AsSockRef;
53use pal_async::socket::PollReady;
54use pal_async::socket::PollReadyExt;
55use pal_async::socket::PolledSocket;
56use pal_async::task::Spawn;
57use pal_async::task::Task;
58use parking_lot::Mutex;
59use socket2::Socket;
60use std::collections::HashMap;
61use std::fs::File;
62use std::future::poll_fn;
63use std::io;
64use std::io::Read;
65use std::os::unix::fs::FileTypeExt;
66use std::os::unix::prelude::*;
67use std::process::ExitStatus;
68use std::sync::Arc;
69
70/// A diagnostics request.
71#[derive(Debug, mesh::MeshPayload)]
72pub enum DiagRequest {
73    /// Start the VM, if it has not already been started.
74    Start(FailableRpc<StartParams, ()>),
75    /// Inspect the VM.
76    Inspect(inspect::Deferred),
77    /// Crash the VM
78    Crash(i32),
79    /// Restart the worker.
80    Restart(FailableRpc<(), ()>),
81    /// Pause VTL0
82    Pause(FailableRpc<(), ()>),
83    /// Resume VTL0
84    Resume(FailableRpc<(), ()>),
85    /// Save VTL2 state
86    Save(FailableRpc<(), Vec<u8>>),
87    /// Setup network trace
88    PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
89    /// Profile VTL2
90    #[cfg(feature = "profiler")]
91    Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
92}
93
94/// Additional parameters provided as part of a delayed start request.
95#[derive(Debug, mesh::MeshPayload)]
96pub struct StartParams {
97    /// Environment variables to set or remove.
98    pub env: Vec<(String, Option<String>)>,
99    /// Command line arguments to append.
100    pub args: Vec<String>,
101}
102
103pub(crate) struct DiagServiceHandler {
104    request_send: mesh::Sender<DiagRequest>,
105    children: Mutex<HashMap<i32, Task<ExitStatus>>>,
106    inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
107    inner: Arc<crate::Inner>,
108}
109
110impl DiagServiceHandler {
111    pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
112        Self {
113            children: Default::default(),
114            request_send,
115            // On CVMs only allow inspecting nodes defined as safe.
116            inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
117            ) {
118                Some(inspect::SensitivityLevel::Safe)
119            } else {
120                None
121            },
122            // TODO: use a remotable type for `Inner`, which is just used to get
123            // data connection sockets.
124            inner,
125        }
126    }
127
128    pub async fn process_requests(
129        self: &Arc<Self>,
130        driver: &(impl Driver + Spawn + Clone),
131        diag_recv: RpcReceiver<UnderhillDiag>,
132        diag2_recv: RpcReceiver<OpenhclDiag>,
133        inspect_recv: RpcReceiver<InspectService>,
134        profile_recv: RpcReceiver<AzureProfiler>,
135    ) -> anyhow::Result<()> {
136        enum Event {
137            Diag(UnderhillDiag),
138            Diag2(OpenhclDiag),
139            Inspect(InspectService),
140            Profile(AzureProfiler),
141        }
142        let mut s = (
143            diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
144            diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
145            inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
146            profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
147        )
148            .merge();
149
150        while let Some((ctx, req)) = s.next().await {
151            driver
152                .spawn("diag request", {
153                    let driver = driver.clone();
154                    let this = self.clone();
155                    async move {
156                        match req {
157                            Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
158                            Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
159                            Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
160                            Event::Profile(req) => this.handle_profile_request(req, ctx).await,
161                        }
162                    }
163                })
164                .detach();
165        }
166        Ok(())
167    }
168
169    async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
170        self.inner.take_connection(id).await
171    }
172
173    async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
174        match req {
175            InspectService::Inspect(request, response) => {
176                let inspect_response = self.handle_inspect(&request, ctx).await;
177                response.send(grpc_result(Ok(Ok(inspect_response))));
178            }
179            InspectService::Update(request, response) => {
180                response.send(grpc_result(
181                    ctx.until_cancelled(self.handle_update(&request)).await,
182                ));
183            }
184        }
185    }
186
187    async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
188        match req {
189            AzureProfiler::Profile(request, response) => response.send(grpc_result(
190                ctx.until_cancelled(self.handle_profile(request)).await,
191            )),
192        }
193    }
194
195    async fn handle_diag_request(
196        &self,
197        driver: &(impl Driver + Spawn + Clone),
198        req: UnderhillDiag,
199        mut ctx: CancelContext,
200    ) {
201        match req {
202            UnderhillDiag::Exec(request, response) => response.send(grpc_result(
203                ctx.until_cancelled(self.handle_exec(driver, &request))
204                    .await,
205            )),
206            UnderhillDiag::Wait(request, response) => response.send(grpc_result(
207                ctx.until_cancelled(self.handle_wait(&request)).await,
208            )),
209            UnderhillDiag::Start(request, response) => {
210                response.send(grpc_result(
211                    ctx.until_cancelled(self.handle_start(request)).await,
212                ));
213            }
214            UnderhillDiag::Kmsg(request, response) => {
215                response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
216            }
217            UnderhillDiag::Crash(request, response) => {
218                response.send(grpc_result(
219                    ctx.until_cancelled(self.handle_crash(request)).await,
220                ));
221            }
222            UnderhillDiag::Restart(_, response) => {
223                response.send(grpc_result(
224                    ctx.until_cancelled(self.handle_restart()).await,
225                ));
226            }
227            UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
228                .handle_read_file(driver, &request)
229                .await))),
230            UnderhillDiag::Pause(_, response) => {
231                response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
232            }
233            UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
234                ctx.until_cancelled(self.handle_packet_capture(&request))
235                    .await,
236            )),
237            UnderhillDiag::Resume(_, response) => {
238                response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
239            }
240            UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
241                ctx.until_cancelled(self.handle_dump_saved_state()).await,
242            )),
243        }
244    }
245
246    async fn handle_diag2_request(
247        &self,
248        _driver: &(impl Driver + Spawn + Clone),
249        req: OpenhclDiag,
250        _ctx: CancelContext,
251    ) {
252        match req {
253            OpenhclDiag::Ping((), response) => {
254                response.send(Ok(()));
255            }
256        }
257    }
258
259    async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
260        let params = StartParams {
261            env: request
262                .env
263                .into_iter()
264                .map(|pair| (pair.name, pair.value))
265                .collect(),
266            args: request.args,
267        };
268        self.request_send
269            .call_failable(DiagRequest::Start, params)
270            .await?;
271        Ok(())
272    }
273
274    async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
275        self.request_send.send(DiagRequest::Crash(request.pid));
276
277        Ok(())
278    }
279
280    async fn handle_exec(
281        &self,
282        driver: &(impl Driver + Spawn + Clone),
283        request: &ExecRequest,
284    ) -> anyhow::Result<ExecResponse> {
285        tracing::info!(
286            command = %request.command,
287            stdin = request.stdin,
288            stdout = request.stdout,
289            stderr = request.stderr,
290            tty = request.tty,
291            "exec request"
292        );
293
294        let stdin = if request.stdin != 0 {
295            Some(
296                self.take_connection(request.stdin)
297                    .await
298                    .context("failed to get stdin conn")?,
299            )
300        } else {
301            None
302        };
303        let stdout = if request.stdout != 0 {
304            Some(
305                self.take_connection(request.stdout)
306                    .await
307                    .context("failed to get stdout conn")?,
308            )
309        } else {
310            None
311        };
312        let stderr = if request.stderr != 0 {
313            Some(
314                self.take_connection(request.stderr)
315                    .await
316                    .context("failed to get stderr conn")?,
317            )
318        } else {
319            None
320        };
321
322        let mut builder = pal::unix::process::Builder::new(&request.command);
323        builder.args(&request.args);
324        if request.clear_env {
325            builder.env_clear();
326        }
327        for diag_proto::EnvPair { name, value } in &request.env {
328            if let Some(value) = value {
329                builder.env(name, value);
330            } else {
331                builder.env_remove(name);
332            }
333        }
334
335        // HACK: A hack to fix segfault caused by glibc bug in L1 TDX VMM.
336        // Should be removed after glibc update or a clean CPUID virtualization solution.
337        // Please refer to https://github.com/microsoft/HvLite/issues/872 for more information.
338        let tdx_isolated = if cfg!(guest_arch = "x86_64") {
339            // xtask-fmt allow-target-arch cpu-intrinsic
340            #[cfg(target_arch = "x86_64")]
341            {
342                let result = safe_intrinsics::cpuid(
343                    hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION,
344                    0,
345                );
346                // Value 3 means TDX.
347                (result.ebx & 0xF) == 3
348            }
349            // xtask-fmt allow-target-arch cpu-intrinsic
350            #[cfg(not(target_arch = "x86_64"))]
351            {
352                false
353            }
354        } else {
355            false
356        };
357        if tdx_isolated {
358            builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
359        }
360
361        let mut stdin_relay = None;
362        let mut stdout_relay = None;
363        let mut stderr_relay = None;
364        let mut raw_stdout = None;
365        let mut raw_stderr = None;
366        let mut child = {
367            let (stdin_pipes, stdout_pipes, stderr_pipes);
368            let stdin_socket;
369            let stdout_socket;
370            let stderr_socket;
371            let pty;
372            if request.tty {
373                pty = new_pty::new_pty().context("failed to create pty")?;
374
375                let primary = PolledPipe::new(driver, pty.0)
376                    .context("failed to create polled pty primary")?;
377
378                let secondary = &pty.1;
379
380                let (primary_read, primary_write) = primary.split();
381                if let Some(stdin) = stdin {
382                    stdin_relay = Some(driver.spawn("pty stdin relay", async move {
383                        relay(stdin, primary_write).await;
384                    }));
385                }
386                if let Some(stdout) = stdout {
387                    stdout_relay =
388                        Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
389                }
390
391                builder
392                    .setsid(true)
393                    .controlling_terminal(secondary.as_fd())
394                    .stdin(Stdio::Fd(secondary.as_fd()))
395                    .stdout(Stdio::Fd(secondary.as_fd()))
396                    .stderr(Stdio::Fd(secondary.as_fd()));
397            } else if request.raw_socket_io {
398                if let Some(stdin) = stdin {
399                    stdin_socket = stdin.into_inner();
400                    builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
401                }
402                if let Some(stdout) = stdout {
403                    stdout_socket = raw_stdout.insert(stdout.into_inner());
404                    builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
405                    if request.combine_stderr {
406                        builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
407                    }
408                }
409                if let Some(stderr) = stderr {
410                    stderr_socket = raw_stderr.insert(stderr.into_inner());
411                    builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
412                }
413            } else {
414                if let Some(stdin) = stdin {
415                    stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
416                    let pipe = PolledPipe::new(driver, stdin_pipes.1)
417                        .context("failed to create polled pipe")?;
418                    stdin_relay = Some(driver.spawn("stdin relay", async move {
419                        relay(stdin, pipe).await;
420                    }));
421                    builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
422                }
423                if let Some(stdout) = stdout {
424                    stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
425                    let pipe = PolledPipe::new(driver, stdout_pipes.0)
426                        .context("failed to create polled pipe")?;
427                    stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
428                    builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
429                    if request.combine_stderr {
430                        builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
431                    }
432                }
433                if let Some(stderr) = stderr {
434                    stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
435                    let pipe = PolledPipe::new(driver, stderr_pipes.0)
436                        .context("failed to create polled pipe")?;
437                    stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
438                    builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
439                }
440            }
441
442            builder
443                .spawn()
444                .with_context(|| format!("failed to launch {}", &request.command))?
445        };
446
447        let pid = child.id();
448
449        tracing::info!(pid, "spawned child");
450
451        let mut child_ready = driver
452            .new_dyn_fd_ready(child.as_fd().as_raw_fd())
453            .expect("failed creating child poll");
454
455        let status = driver.spawn("diag child wait", {
456            let driver = driver.clone();
457            async move {
458                poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
459                    .await;
460                let status = child.try_wait().unwrap().unwrap();
461                tracing::info!(pid, ?status, "child exited");
462
463                // The process is gone, so the stdin relay's job is done.
464                drop(stdin_relay);
465
466                // Shut down raw stdout and stderr to notify the host that there
467                // is no more data.
468                let finish_raw = |raw: Option<Socket>| {
469                    raw.and_then(|raw| {
470                        let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
471                        PolledSocket::new(&driver, raw).ok()
472                    })
473                };
474                let raw_stdout = finish_raw(raw_stdout);
475                let raw_stderr = finish_raw(raw_stderr);
476
477                // Wait for the host to finish with the stdout and stderr
478                // sockets, but don't block the process exit notification.
479                driver
480                    .spawn("socket-wait", async move {
481                        let await_output_relay = async |task, raw| {
482                            let socket = if let Some(task) = task {
483                                Some(task.await)
484                            } else {
485                                raw
486                            };
487                            if let Some(socket) = socket {
488                                // Wait for the host to close the socket to ensure that all
489                                // the data is written.
490                                let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
491                            }
492                        };
493
494                        await_output_relay(stdout_relay, raw_stdout).await;
495                        await_output_relay(stderr_relay, raw_stderr).await;
496                    })
497                    .detach();
498
499                status
500            }
501        });
502
503        self.children.lock().insert(pid, status);
504        Ok(ExecResponse { pid })
505    }
506
507    async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
508        tracing::debug!(pid = request.pid, "wait request");
509        let channel = self
510            .children
511            .lock()
512            .remove(&request.pid)
513            .context("pid not found")?;
514
515        let status = channel.await;
516        let exit_code = status.code().unwrap_or(255);
517
518        tracing::debug!(pid = request.pid, exit_code, "wait complete");
519
520        Ok(WaitResponse { exit_code })
521    }
522
523    async fn handle_inspect(
524        &self,
525        request: &InspectRequest,
526        mut ctx: CancelContext,
527    ) -> InspectResponse2 {
528        tracing::debug!(
529            path = request.path.as_str(),
530            depth = request.depth,
531            "inspect request"
532        );
533        let mut inspection = InspectionBuilder::new(&request.path)
534            .depth(Some(request.depth as usize))
535            .sensitivity(self.inspect_sensitivity_level)
536            .inspect(inspect::adhoc(|req| {
537                self.request_send.send(DiagRequest::Inspect(req.defer()));
538            }));
539
540        // Don't return early on cancel, as we want to return the partial
541        // inspection results.
542        let _ = ctx.until_cancelled(inspection.resolve()).await;
543
544        let result = inspection.results();
545        InspectResponse2 { result }
546    }
547
548    async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
549        tracing::debug!(
550            path = request.path.as_str(),
551            value = request.value.as_str(),
552            "update request"
553        );
554        let new_value = InspectionBuilder::new(&request.path)
555            .sensitivity(self.inspect_sensitivity_level)
556            .update(
557                &request.value,
558                inspect::adhoc(|req| {
559                    self.request_send.send(DiagRequest::Inspect(req.defer()));
560                }),
561            )
562            .await?;
563        Ok(UpdateResponse2 { new_value })
564    }
565
566    async fn handle_kmsg(
567        &self,
568        driver: &(impl Driver + Spawn + Clone),
569        request: &KmsgRequest,
570    ) -> anyhow::Result<()> {
571        self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
572            .await
573    }
574
575    async fn handle_read_file(
576        &self,
577        driver: &(impl Driver + Spawn + Clone),
578        request: &FileRequest,
579    ) -> anyhow::Result<()> {
580        self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
581            .await
582    }
583
584    async fn handle_packet_capture(
585        &self,
586        request: &NetworkPacketCaptureRequest,
587    ) -> anyhow::Result<NetworkPacketCaptureResponse> {
588        let operation = if request.operation == Operation::Query as i32 {
589            PacketCaptureOperation::Query
590        } else if request.operation == Operation::Start as i32 {
591            PacketCaptureOperation::Start
592        } else if request.operation == Operation::Stop as i32 {
593            PacketCaptureOperation::Stop
594        } else {
595            anyhow::bail!("unsupported request type {}", request.operation);
596        };
597
598        let op_data = match operation {
599            // Query the number of streams needed, starting with a value of 0.
600            PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
601            PacketCaptureOperation::Start => {
602                let Some(op_data) = &request.op_data else {
603                    anyhow::bail!("missing start operation parameters");
604                };
605
606                match op_data {
607                    diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
608                        let writers = join_all(start_data.conns.iter().map(async |c| {
609                            let conn = self.take_connection(*c).await?;
610                            Ok(conn.into_inner())
611                        }))
612                        .await
613                        .into_iter()
614                        .collect::<anyhow::Result<Vec<Socket>>>()?;
615                        Some(OperationData::OpStartData(StartData {
616                            writers,
617                            snaplen: start_data.snaplen,
618                        }))
619                    }
620                }
621            }
622            _ => None,
623        };
624
625        let params = PacketCaptureParams { operation, op_data };
626        let params = self
627            .request_send
628            .call_failable(DiagRequest::PacketCapture, params)
629            .await?;
630        let num_streams = match params.op_data {
631            Some(OperationData::OpQueryData(num_streams)) => num_streams,
632            _ => 0,
633        };
634        Ok(NetworkPacketCaptureResponse { num_streams })
635    }
636
637    async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
638        let conn = self.take_connection(request.conn).await?;
639        #[cfg(feature = "profiler")]
640        {
641            let profiler_request = profiler_worker::ProfilerRequest {
642                profiler_args: request.profiler_args,
643                duration: request.duration,
644                conn: conn.into_inner(),
645            };
646
647            self.request_send
648                .call_failable(DiagRequest::Profile, profiler_request)
649                .await?;
650        }
651        #[cfg(not(feature = "profiler"))]
652        {
653            // Profiler feature disabled, drop the connection.
654            drop(conn);
655            tracing::error!("Profiler feature disabled");
656        }
657        Ok(())
658    }
659
660    async fn handle_read_file_request(
661        &self,
662        driver: &(impl Driver + Spawn + Clone),
663        conn: u64,
664        follow: bool,
665        file_path: &str,
666    ) -> anyhow::Result<()> {
667        let mut conn = self.take_connection(conn).await?;
668        let file = fs_err::File::open(file_path).context("failed to open file")?;
669
670        let file_meta = file.metadata()?;
671
672        if file_meta.file_type().is_char_device() {
673            let file =
674                PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
675
676            driver
677                .spawn("read file relay", async move {
678                    if let Err(err) = relay_read_file(file, conn, follow).await {
679                        tracing::warn!(
680                            error = &*err as &dyn std::error::Error,
681                            "read file relay failed"
682                        );
683                    }
684                })
685                .detach();
686        } else if file_meta.file_type().is_file() {
687            driver
688                .spawn("read file relay", async move {
689                    // Since this is a file, and in Underhill files are backed
690                    // by RAM, allow blocking reads directly on this thread,
691                    // since the reads should be satisfied instantly.
692                    //
693                    // (If this becomes a problem, we can spawn a thread to do
694                    // this, or use io-uring.)
695                    if let Err(err) =
696                        futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
697                    {
698                        tracing::warn!(
699                            error = &err as &dyn std::error::Error,
700                            "read file relay failed"
701                        );
702                    }
703                })
704                .detach();
705        } else {
706            anyhow::bail!("cannot read directory");
707        }
708
709        Ok(())
710    }
711
712    async fn handle_restart(&self) -> anyhow::Result<()> {
713        self.request_send
714            .call_failable(DiagRequest::Restart, ())
715            .await?;
716        Ok(())
717    }
718
719    async fn handle_pause(&self) -> anyhow::Result<()> {
720        self.request_send
721            .call_failable(DiagRequest::Pause, ())
722            .await?;
723        Ok(())
724    }
725
726    async fn handle_resume(&self) -> anyhow::Result<()> {
727        self.request_send
728            .call_failable(DiagRequest::Resume, ())
729            .await?;
730        Ok(())
731    }
732
733    async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
734        let data = self
735            .request_send
736            .call_failable(DiagRequest::Save, ())
737            .await?;
738
739        Ok(diag_proto::DumpSavedStateResponse { data })
740    }
741}
742
743async fn relay<
744    R: 'static + AsyncRead + Unpin + Send,
745    W: 'static + AsyncWrite + PollReady + Unpin + Send,
746>(
747    mut read: R,
748    mut write: W,
749) -> W {
750    let mut buffer = [0; 1024];
751    let result: anyhow::Result<_> = async {
752        loop {
753            let n = futures::select! { // merge semantics
754                n = read.read(&mut buffer).fuse() => n.context("read failed")?,
755                _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
756                    // RDHUP indicates the connection is closed or shut down.
757                    // Although generically this does not indicate that the
758                    // connection does not want to _read_ any more data, for our
759                    // use cases it does (either we are using a unidirectional
760                    // pipe/socket, or we are using a pty, which never returns
761                    // RDHUP but does return HUP, which is just as good).
762                    //
763                    // Stop this relay to propagate the close notification to
764                    // the other endpoint.
765                    break;
766                }
767            };
768            if n == 0 {
769                break;
770            }
771            write
772                .write_all(&buffer[..n])
773                .await
774                .context("write failed")?;
775        }
776        Ok(())
777    }
778    .await;
779    let _ = write.close().await;
780    if let Err(err) = result {
781        tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
782    }
783    write
784}
785
786async fn relay_read_file(
787    mut file: PolledPipe,
788    mut conn: PolledSocket<Socket>,
789    follow: bool,
790) -> anyhow::Result<()> {
791    let mut buffer = [0; FILE_LINE_MAX];
792    loop {
793        let n = if follow {
794            futures::select! { // race semantics
795                _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
796                n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
797            }
798        } else {
799            // The caller just wants the current contents of file, so issue a
800            // nonblocking, non-async read, and handle EAGAIN below.
801            file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
802        };
803        let n = match n {
804            Ok(0) => break,
805            Ok(count) => count,
806            Err(e) => {
807                match e.kind() {
808                    io::ErrorKind::BrokenPipe => {
809                        // The kmsg interface returns EPIPE if an entry has overwritten another in the ring.
810                        // Retry the read which has the kernel move the seek position to the next available record.
811                        continue;
812                    }
813                    io::ErrorKind::WouldBlock => {
814                        // There are no more messages.
815                        assert!(!follow);
816                        break;
817                    }
818                    _ => return Err(e).context("file read failed"),
819                }
820            }
821        };
822        assert!(
823            n < buffer.len(),
824            "the file returned a line bigger than its maximum"
825        );
826        // Add a null terminator.
827        buffer[n] = 0;
828        // Write the message followed by a null terminator.
829        conn.write_all(&buffer[..n + 1])
830            .await
831            .context("socket write failed")?;
832    }
833    Ok(())
834}