1use crate::grpc_result;
7use crate::new_pty;
8use anyhow::Context;
9use azure_profiler_proto::AzureProfiler;
10use azure_profiler_proto::ProfileRequest;
11use diag_proto::ExecRequest;
12use diag_proto::ExecResponse;
13use diag_proto::FILE_LINE_MAX;
14use diag_proto::FileRequest;
15use diag_proto::KmsgRequest;
16use diag_proto::NetworkPacketCaptureRequest;
17use diag_proto::NetworkPacketCaptureResponse;
18use diag_proto::OpenhclDiag;
19use diag_proto::StartRequest;
20use diag_proto::UnderhillDiag;
21use diag_proto::WaitRequest;
22use diag_proto::WaitResponse;
23use diag_proto::network_packet_capture_request::Operation;
24use futures::AsyncRead;
25use futures::AsyncReadExt;
26use futures::AsyncWrite;
27use futures::AsyncWriteExt;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::future::join_all;
31use futures::io::AllowStdIo;
32use futures_concurrency::stream::Merge;
33use inspect::InspectionBuilder;
34use inspect_proto::InspectRequest;
35use inspect_proto::InspectResponse2;
36use inspect_proto::InspectService;
37use inspect_proto::UpdateRequest;
38use inspect_proto::UpdateResponse2;
39use mesh::CancelContext;
40use mesh::rpc::FailableRpc;
41use mesh::rpc::RpcSend;
42use mesh_rpc::server::RpcReceiver;
43use net_packet_capture::OperationData;
44use net_packet_capture::PacketCaptureOperation;
45use net_packet_capture::PacketCaptureParams;
46use net_packet_capture::StartData;
47use pal::unix::process::Stdio;
48use pal_async::driver::Driver;
49use pal_async::interest::InterestSlot;
50use pal_async::interest::PollEvents;
51use pal_async::pipe::PolledPipe;
52use pal_async::socket::AsSockRef;
53use pal_async::socket::PollReady;
54use pal_async::socket::PollReadyExt;
55use pal_async::socket::PolledSocket;
56use pal_async::task::Spawn;
57use pal_async::task::Task;
58use parking_lot::Mutex;
59use socket2::Socket;
60use std::collections::HashMap;
61use std::fs::File;
62use std::future::poll_fn;
63use std::io;
64use std::io::Read;
65use std::os::unix::fs::FileTypeExt;
66use std::os::unix::prelude::*;
67use std::process::ExitStatus;
68use std::sync::Arc;
69
70#[derive(Debug, mesh::MeshPayload)]
72pub enum DiagRequest {
73 Start(FailableRpc<StartParams, ()>),
75 Inspect(inspect::Deferred),
77 Crash(i32),
79 Restart(FailableRpc<(), ()>),
81 Pause(FailableRpc<(), ()>),
83 Resume(FailableRpc<(), ()>),
85 Save(FailableRpc<(), Vec<u8>>),
87 PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
89 #[cfg(feature = "profiler")]
91 Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
92}
93
94#[derive(Debug, mesh::MeshPayload)]
96pub struct StartParams {
97 pub env: Vec<(String, Option<String>)>,
99 pub args: Vec<String>,
101}
102
103pub(crate) struct DiagServiceHandler {
104 request_send: mesh::Sender<DiagRequest>,
105 children: Mutex<HashMap<i32, Task<ExitStatus>>>,
106 inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
107 inner: Arc<crate::Inner>,
108}
109
110impl DiagServiceHandler {
111 pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
112 Self {
113 children: Default::default(),
114 request_send,
115 inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
117 ) {
118 Some(inspect::SensitivityLevel::Safe)
119 } else {
120 None
121 },
122 inner,
125 }
126 }
127
128 pub async fn process_requests(
129 self: &Arc<Self>,
130 driver: &(impl Driver + Spawn + Clone),
131 diag_recv: RpcReceiver<UnderhillDiag>,
132 diag2_recv: RpcReceiver<OpenhclDiag>,
133 inspect_recv: RpcReceiver<InspectService>,
134 profile_recv: RpcReceiver<AzureProfiler>,
135 ) -> anyhow::Result<()> {
136 enum Event {
137 Diag(UnderhillDiag),
138 Diag2(OpenhclDiag),
139 Inspect(InspectService),
140 Profile(AzureProfiler),
141 }
142 let mut s = (
143 diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
144 diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
145 inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
146 profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
147 )
148 .merge();
149
150 while let Some((ctx, req)) = s.next().await {
151 driver
152 .spawn("diag request", {
153 let driver = driver.clone();
154 let this = self.clone();
155 async move {
156 match req {
157 Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
158 Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
159 Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
160 Event::Profile(req) => this.handle_profile_request(req, ctx).await,
161 }
162 }
163 })
164 .detach();
165 }
166 Ok(())
167 }
168
169 async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
170 self.inner.take_connection(id).await
171 }
172
173 async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
174 match req {
175 InspectService::Inspect(request, response) => {
176 let inspect_response = self.handle_inspect(&request, ctx).await;
177 response.send(grpc_result(Ok(Ok(inspect_response))));
178 }
179 InspectService::Update(request, response) => {
180 response.send(grpc_result(
181 ctx.until_cancelled(self.handle_update(&request)).await,
182 ));
183 }
184 }
185 }
186
187 async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
188 match req {
189 AzureProfiler::Profile(request, response) => response.send(grpc_result(
190 ctx.until_cancelled(self.handle_profile(request)).await,
191 )),
192 }
193 }
194
195 async fn handle_diag_request(
196 &self,
197 driver: &(impl Driver + Spawn + Clone),
198 req: UnderhillDiag,
199 mut ctx: CancelContext,
200 ) {
201 match req {
202 UnderhillDiag::Exec(request, response) => response.send(grpc_result(
203 ctx.until_cancelled(self.handle_exec(driver, &request))
204 .await,
205 )),
206 UnderhillDiag::Wait(request, response) => response.send(grpc_result(
207 ctx.until_cancelled(self.handle_wait(&request)).await,
208 )),
209 UnderhillDiag::Start(request, response) => {
210 response.send(grpc_result(
211 ctx.until_cancelled(self.handle_start(request)).await,
212 ));
213 }
214 UnderhillDiag::Kmsg(request, response) => {
215 response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
216 }
217 UnderhillDiag::Crash(request, response) => {
218 response.send(grpc_result(
219 ctx.until_cancelled(self.handle_crash(request)).await,
220 ));
221 }
222 UnderhillDiag::Restart(_, response) => {
223 response.send(grpc_result(
224 ctx.until_cancelled(self.handle_restart()).await,
225 ));
226 }
227 UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
228 .handle_read_file(driver, &request)
229 .await))),
230 UnderhillDiag::Pause(_, response) => {
231 response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
232 }
233 UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
234 ctx.until_cancelled(self.handle_packet_capture(&request))
235 .await,
236 )),
237 UnderhillDiag::Resume(_, response) => {
238 response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
239 }
240 UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
241 ctx.until_cancelled(self.handle_dump_saved_state()).await,
242 )),
243 }
244 }
245
246 async fn handle_diag2_request(
247 &self,
248 _driver: &(impl Driver + Spawn + Clone),
249 req: OpenhclDiag,
250 _ctx: CancelContext,
251 ) {
252 match req {
253 OpenhclDiag::Ping((), response) => {
254 response.send(Ok(()));
255 }
256 }
257 }
258
259 async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
260 let params = StartParams {
261 env: request
262 .env
263 .into_iter()
264 .map(|pair| (pair.name, pair.value))
265 .collect(),
266 args: request.args,
267 };
268 self.request_send
269 .call_failable(DiagRequest::Start, params)
270 .await?;
271 Ok(())
272 }
273
274 async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
275 self.request_send.send(DiagRequest::Crash(request.pid));
276
277 Ok(())
278 }
279
280 async fn handle_exec(
281 &self,
282 driver: &(impl Driver + Spawn + Clone),
283 request: &ExecRequest,
284 ) -> anyhow::Result<ExecResponse> {
285 tracing::info!(
286 command = %request.command,
287 stdin = request.stdin,
288 stdout = request.stdout,
289 stderr = request.stderr,
290 tty = request.tty,
291 "exec request"
292 );
293
294 let stdin = if request.stdin != 0 {
295 Some(
296 self.take_connection(request.stdin)
297 .await
298 .context("failed to get stdin conn")?,
299 )
300 } else {
301 None
302 };
303 let stdout = if request.stdout != 0 {
304 Some(
305 self.take_connection(request.stdout)
306 .await
307 .context("failed to get stdout conn")?,
308 )
309 } else {
310 None
311 };
312 let stderr = if request.stderr != 0 {
313 Some(
314 self.take_connection(request.stderr)
315 .await
316 .context("failed to get stderr conn")?,
317 )
318 } else {
319 None
320 };
321
322 let mut builder = pal::unix::process::Builder::new(&request.command);
323 builder.args(&request.args);
324 if request.clear_env {
325 builder.env_clear();
326 }
327 for diag_proto::EnvPair { name, value } in &request.env {
328 if let Some(value) = value {
329 builder.env(name, value);
330 } else {
331 builder.env_remove(name);
332 }
333 }
334
335 let tdx_isolated = if cfg!(guest_arch = "x86_64") {
339 #[cfg(target_arch = "x86_64")]
341 {
342 let result = safe_intrinsics::cpuid(
343 hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION,
344 0,
345 );
346 (result.ebx & 0xF) == 3
348 }
349 #[cfg(not(target_arch = "x86_64"))]
351 {
352 false
353 }
354 } else {
355 false
356 };
357 if tdx_isolated {
358 builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
359 }
360
361 let mut stdin_relay = None;
362 let mut stdout_relay = None;
363 let mut stderr_relay = None;
364 let mut raw_stdout = None;
365 let mut raw_stderr = None;
366 let mut child = {
367 let (stdin_pipes, stdout_pipes, stderr_pipes);
368 let stdin_socket;
369 let stdout_socket;
370 let stderr_socket;
371 let pty;
372 if request.tty {
373 pty = new_pty::new_pty().context("failed to create pty")?;
374
375 let primary = PolledPipe::new(driver, pty.0)
376 .context("failed to create polled pty primary")?;
377
378 let secondary = &pty.1;
379
380 let (primary_read, primary_write) = primary.split();
381 if let Some(stdin) = stdin {
382 stdin_relay = Some(driver.spawn("pty stdin relay", async move {
383 relay(stdin, primary_write).await;
384 }));
385 }
386 if let Some(stdout) = stdout {
387 stdout_relay =
388 Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
389 }
390
391 builder
392 .setsid(true)
393 .controlling_terminal(secondary.as_fd())
394 .stdin(Stdio::Fd(secondary.as_fd()))
395 .stdout(Stdio::Fd(secondary.as_fd()))
396 .stderr(Stdio::Fd(secondary.as_fd()));
397 } else if request.raw_socket_io {
398 if let Some(stdin) = stdin {
399 stdin_socket = stdin.into_inner();
400 builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
401 }
402 if let Some(stdout) = stdout {
403 stdout_socket = raw_stdout.insert(stdout.into_inner());
404 builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
405 if request.combine_stderr {
406 builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
407 }
408 }
409 if let Some(stderr) = stderr {
410 stderr_socket = raw_stderr.insert(stderr.into_inner());
411 builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
412 }
413 } else {
414 if let Some(stdin) = stdin {
415 stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
416 let pipe = PolledPipe::new(driver, stdin_pipes.1)
417 .context("failed to create polled pipe")?;
418 stdin_relay = Some(driver.spawn("stdin relay", async move {
419 relay(stdin, pipe).await;
420 }));
421 builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
422 }
423 if let Some(stdout) = stdout {
424 stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
425 let pipe = PolledPipe::new(driver, stdout_pipes.0)
426 .context("failed to create polled pipe")?;
427 stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
428 builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
429 if request.combine_stderr {
430 builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
431 }
432 }
433 if let Some(stderr) = stderr {
434 stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
435 let pipe = PolledPipe::new(driver, stderr_pipes.0)
436 .context("failed to create polled pipe")?;
437 stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
438 builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
439 }
440 }
441
442 builder
443 .spawn()
444 .with_context(|| format!("failed to launch {}", &request.command))?
445 };
446
447 let pid = child.id();
448
449 tracing::info!(pid, "spawned child");
450
451 let mut child_ready = driver
452 .new_dyn_fd_ready(child.as_fd().as_raw_fd())
453 .expect("failed creating child poll");
454
455 let status = driver.spawn("diag child wait", {
456 let driver = driver.clone();
457 async move {
458 poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
459 .await;
460 let status = child.try_wait().unwrap().unwrap();
461 tracing::info!(pid, ?status, "child exited");
462
463 drop(stdin_relay);
465
466 let finish_raw = |raw: Option<Socket>| {
469 raw.and_then(|raw| {
470 let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
471 PolledSocket::new(&driver, raw).ok()
472 })
473 };
474 let raw_stdout = finish_raw(raw_stdout);
475 let raw_stderr = finish_raw(raw_stderr);
476
477 driver
480 .spawn("socket-wait", async move {
481 let await_output_relay = async |task, raw| {
482 let socket = if let Some(task) = task {
483 Some(task.await)
484 } else {
485 raw
486 };
487 if let Some(socket) = socket {
488 let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
491 }
492 };
493
494 await_output_relay(stdout_relay, raw_stdout).await;
495 await_output_relay(stderr_relay, raw_stderr).await;
496 })
497 .detach();
498
499 status
500 }
501 });
502
503 self.children.lock().insert(pid, status);
504 Ok(ExecResponse { pid })
505 }
506
507 async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
508 tracing::debug!(pid = request.pid, "wait request");
509 let channel = self
510 .children
511 .lock()
512 .remove(&request.pid)
513 .context("pid not found")?;
514
515 let status = channel.await;
516 let exit_code = status.code().unwrap_or(255);
517
518 tracing::debug!(pid = request.pid, exit_code, "wait complete");
519
520 Ok(WaitResponse { exit_code })
521 }
522
523 async fn handle_inspect(
524 &self,
525 request: &InspectRequest,
526 mut ctx: CancelContext,
527 ) -> InspectResponse2 {
528 tracing::debug!(
529 path = request.path.as_str(),
530 depth = request.depth,
531 "inspect request"
532 );
533 let mut inspection = InspectionBuilder::new(&request.path)
534 .depth(Some(request.depth as usize))
535 .sensitivity(self.inspect_sensitivity_level)
536 .inspect(inspect::adhoc(|req| {
537 self.request_send.send(DiagRequest::Inspect(req.defer()));
538 }));
539
540 let _ = ctx.until_cancelled(inspection.resolve()).await;
543
544 let result = inspection.results();
545 InspectResponse2 { result }
546 }
547
548 async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
549 tracing::debug!(
550 path = request.path.as_str(),
551 value = request.value.as_str(),
552 "update request"
553 );
554 let new_value = InspectionBuilder::new(&request.path)
555 .sensitivity(self.inspect_sensitivity_level)
556 .update(
557 &request.value,
558 inspect::adhoc(|req| {
559 self.request_send.send(DiagRequest::Inspect(req.defer()));
560 }),
561 )
562 .await?;
563 Ok(UpdateResponse2 { new_value })
564 }
565
566 async fn handle_kmsg(
567 &self,
568 driver: &(impl Driver + Spawn + Clone),
569 request: &KmsgRequest,
570 ) -> anyhow::Result<()> {
571 self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
572 .await
573 }
574
575 async fn handle_read_file(
576 &self,
577 driver: &(impl Driver + Spawn + Clone),
578 request: &FileRequest,
579 ) -> anyhow::Result<()> {
580 self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
581 .await
582 }
583
584 async fn handle_packet_capture(
585 &self,
586 request: &NetworkPacketCaptureRequest,
587 ) -> anyhow::Result<NetworkPacketCaptureResponse> {
588 let operation = if request.operation == Operation::Query as i32 {
589 PacketCaptureOperation::Query
590 } else if request.operation == Operation::Start as i32 {
591 PacketCaptureOperation::Start
592 } else if request.operation == Operation::Stop as i32 {
593 PacketCaptureOperation::Stop
594 } else {
595 anyhow::bail!("unsupported request type {}", request.operation);
596 };
597
598 let op_data = match operation {
599 PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
601 PacketCaptureOperation::Start => {
602 let Some(op_data) = &request.op_data else {
603 anyhow::bail!("missing start operation parameters");
604 };
605
606 match op_data {
607 diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
608 let writers = join_all(start_data.conns.iter().map(async |c| {
609 let conn = self.take_connection(*c).await?;
610 Ok(conn.into_inner())
611 }))
612 .await
613 .into_iter()
614 .collect::<anyhow::Result<Vec<Socket>>>()?;
615 Some(OperationData::OpStartData(StartData {
616 writers,
617 snaplen: start_data.snaplen,
618 }))
619 }
620 }
621 }
622 _ => None,
623 };
624
625 let params = PacketCaptureParams { operation, op_data };
626 let params = self
627 .request_send
628 .call_failable(DiagRequest::PacketCapture, params)
629 .await?;
630 let num_streams = match params.op_data {
631 Some(OperationData::OpQueryData(num_streams)) => num_streams,
632 _ => 0,
633 };
634 Ok(NetworkPacketCaptureResponse { num_streams })
635 }
636
637 async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
638 let conn = self.take_connection(request.conn).await?;
639 #[cfg(feature = "profiler")]
640 {
641 let profiler_request = profiler_worker::ProfilerRequest {
642 profiler_args: request.profiler_args,
643 duration: request.duration,
644 conn: conn.into_inner(),
645 };
646
647 self.request_send
648 .call_failable(DiagRequest::Profile, profiler_request)
649 .await?;
650 }
651 #[cfg(not(feature = "profiler"))]
652 {
653 drop(conn);
655 tracing::error!("Profiler feature disabled");
656 }
657 Ok(())
658 }
659
660 async fn handle_read_file_request(
661 &self,
662 driver: &(impl Driver + Spawn + Clone),
663 conn: u64,
664 follow: bool,
665 file_path: &str,
666 ) -> anyhow::Result<()> {
667 let mut conn = self.take_connection(conn).await?;
668 let file = fs_err::File::open(file_path).context("failed to open file")?;
669
670 let file_meta = file.metadata()?;
671
672 if file_meta.file_type().is_char_device() {
673 let file =
674 PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
675
676 driver
677 .spawn("read file relay", async move {
678 if let Err(err) = relay_read_file(file, conn, follow).await {
679 tracing::warn!(
680 error = &*err as &dyn std::error::Error,
681 "read file relay failed"
682 );
683 }
684 })
685 .detach();
686 } else if file_meta.file_type().is_file() {
687 driver
688 .spawn("read file relay", async move {
689 if let Err(err) =
696 futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
697 {
698 tracing::warn!(
699 error = &err as &dyn std::error::Error,
700 "read file relay failed"
701 );
702 }
703 })
704 .detach();
705 } else {
706 anyhow::bail!("cannot read directory");
707 }
708
709 Ok(())
710 }
711
712 async fn handle_restart(&self) -> anyhow::Result<()> {
713 self.request_send
714 .call_failable(DiagRequest::Restart, ())
715 .await?;
716 Ok(())
717 }
718
719 async fn handle_pause(&self) -> anyhow::Result<()> {
720 self.request_send
721 .call_failable(DiagRequest::Pause, ())
722 .await?;
723 Ok(())
724 }
725
726 async fn handle_resume(&self) -> anyhow::Result<()> {
727 self.request_send
728 .call_failable(DiagRequest::Resume, ())
729 .await?;
730 Ok(())
731 }
732
733 async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
734 let data = self
735 .request_send
736 .call_failable(DiagRequest::Save, ())
737 .await?;
738
739 Ok(diag_proto::DumpSavedStateResponse { data })
740 }
741}
742
743async fn relay<
744 R: 'static + AsyncRead + Unpin + Send,
745 W: 'static + AsyncWrite + PollReady + Unpin + Send,
746>(
747 mut read: R,
748 mut write: W,
749) -> W {
750 let mut buffer = [0; 1024];
751 let result: anyhow::Result<_> = async {
752 loop {
753 let n = futures::select! { n = read.read(&mut buffer).fuse() => n.context("read failed")?,
755 _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
756 break;
766 }
767 };
768 if n == 0 {
769 break;
770 }
771 write
772 .write_all(&buffer[..n])
773 .await
774 .context("write failed")?;
775 }
776 Ok(())
777 }
778 .await;
779 let _ = write.close().await;
780 if let Err(err) = result {
781 tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
782 }
783 write
784}
785
786async fn relay_read_file(
787 mut file: PolledPipe,
788 mut conn: PolledSocket<Socket>,
789 follow: bool,
790) -> anyhow::Result<()> {
791 let mut buffer = [0; FILE_LINE_MAX];
792 loop {
793 let n = if follow {
794 futures::select! { _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
796 n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
797 }
798 } else {
799 file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
802 };
803 let n = match n {
804 Ok(0) => break,
805 Ok(count) => count,
806 Err(e) => {
807 match e.kind() {
808 io::ErrorKind::BrokenPipe => {
809 continue;
812 }
813 io::ErrorKind::WouldBlock => {
814 assert!(!follow);
816 break;
817 }
818 _ => return Err(e).context("file read failed"),
819 }
820 }
821 };
822 assert!(
823 n < buffer.len(),
824 "the file returned a line bigger than its maximum"
825 );
826 buffer[n] = 0;
828 conn.write_all(&buffer[..n + 1])
830 .await
831 .context("socket write failed")?;
832 }
833 Ok(())
834}