1use crate::grpc_result;
7use anyhow::Context;
8use azure_profiler_proto::AzureProfiler;
9use azure_profiler_proto::ProfileRequest;
10use diag_proto::ExecRequest;
11use diag_proto::ExecResponse;
12use diag_proto::FILE_LINE_MAX;
13use diag_proto::FileRequest;
14use diag_proto::KmsgRequest;
15use diag_proto::MemoryProfileTraceRequest;
16use diag_proto::NetworkPacketCaptureRequest;
17use diag_proto::NetworkPacketCaptureResponse;
18use diag_proto::OpenhclDiag;
19use diag_proto::StartRequest;
20use diag_proto::UnderhillDiag;
21use diag_proto::WaitRequest;
22use diag_proto::WaitResponse;
23use diag_proto::network_packet_capture_request::Operation;
24use futures::AsyncRead;
25use futures::AsyncReadExt;
26use futures::AsyncWrite;
27use futures::AsyncWriteExt;
28use futures::FutureExt;
29use futures::StreamExt;
30use futures::future::join_all;
31use futures::io::AllowStdIo;
32use futures_concurrency::stream::Merge;
33use inspect::InspectionBuilder;
34use inspect_proto::InspectRequest;
35use inspect_proto::InspectResponse2;
36use inspect_proto::InspectService;
37use inspect_proto::UpdateRequest;
38use inspect_proto::UpdateResponse2;
39use mesh::CancelContext;
40use mesh::rpc::FailableRpc;
41use mesh::rpc::RpcSend;
42use mesh_rpc::server::RpcReceiver;
43use net_packet_capture::OperationData;
44use net_packet_capture::PacketCaptureOperation;
45use net_packet_capture::PacketCaptureParams;
46use net_packet_capture::StartData;
47use pal::unix::process::Stdio;
48use pal_async::driver::Driver;
49use pal_async::interest::InterestSlot;
50use pal_async::interest::PollEvents;
51use pal_async::pipe::PolledPipe;
52use pal_async::socket::AsSockRef;
53use pal_async::socket::PollReady;
54use pal_async::socket::PollReadyExt;
55use pal_async::socket::PolledSocket;
56use pal_async::task::Spawn;
57use pal_async::task::Task;
58use parking_lot::Mutex;
59use socket2::Socket;
60use std::collections::HashMap;
61use std::fs::File;
62use std::future::poll_fn;
63use std::io;
64use std::io::Read;
65use std::os::unix::fs::FileTypeExt;
66use std::os::unix::prelude::*;
67use std::process::ExitStatus;
68use std::sync::Arc;
69
70#[derive(Debug, mesh::MeshPayload)]
72pub enum DiagRequest {
73 Start(FailableRpc<StartParams, ()>),
75 Inspect(inspect::Deferred),
77 Crash(i32),
79 Restart(FailableRpc<(), ()>),
81 Pause(FailableRpc<(), ()>),
83 Resume(FailableRpc<(), ()>),
85 Save(FailableRpc<(), Vec<u8>>),
87 PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
89 #[cfg(feature = "profiler")]
91 Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
92 #[cfg(feature = "mem-profile-tracing")]
94 MemoryProfileTrace(FailableRpc<i32, Vec<u8>>),
95}
96
97#[derive(Debug, mesh::MeshPayload)]
99pub struct StartParams {
100 pub env: Vec<(String, Option<String>)>,
102 pub args: Vec<String>,
104}
105
106pub(crate) struct DiagServiceHandler {
107 request_send: mesh::Sender<DiagRequest>,
108 children: Mutex<HashMap<i32, Task<ExitStatus>>>,
109 inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
110 inner: Arc<crate::Inner>,
111}
112
113impl DiagServiceHandler {
114 pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
115 Self {
116 children: Default::default(),
117 request_send,
118 inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
120 ) {
121 Some(inspect::SensitivityLevel::Safe)
122 } else {
123 None
124 },
125 inner,
128 }
129 }
130
131 pub async fn process_requests(
132 self: &Arc<Self>,
133 driver: &(impl Driver + Spawn + Clone),
134 diag_recv: RpcReceiver<UnderhillDiag>,
135 diag2_recv: RpcReceiver<OpenhclDiag>,
136 inspect_recv: RpcReceiver<InspectService>,
137 profile_recv: RpcReceiver<AzureProfiler>,
138 ) -> anyhow::Result<()> {
139 enum Event {
140 Diag(UnderhillDiag),
141 Diag2(OpenhclDiag),
142 Inspect(InspectService),
143 Profile(AzureProfiler),
144 }
145 let mut s = (
146 diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
147 diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
148 inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
149 profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
150 )
151 .merge();
152
153 while let Some((ctx, req)) = s.next().await {
154 driver
155 .spawn("diag request", {
156 let driver = driver.clone();
157 let this = self.clone();
158 async move {
159 match req {
160 Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
161 Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
162 Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
163 Event::Profile(req) => this.handle_profile_request(req, ctx).await,
164 }
165 }
166 })
167 .detach();
168 }
169 Ok(())
170 }
171
172 async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
173 self.inner.take_connection(id).await
174 }
175
176 async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
177 match req {
178 InspectService::Inspect(request, response) => {
179 let inspect_response = self.handle_inspect(&request, ctx).await;
180 response.send(grpc_result(Ok(Ok(inspect_response))));
181 }
182 InspectService::Update(request, response) => {
183 response.send(grpc_result(
184 ctx.until_cancelled(self.handle_update(&request)).await,
185 ));
186 }
187 }
188 }
189
190 async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
191 match req {
192 AzureProfiler::Profile(request, response) => response.send(grpc_result(
193 ctx.until_cancelled(self.handle_profile(request)).await,
194 )),
195 }
196 }
197
198 async fn handle_diag_request(
199 &self,
200 driver: &(impl Driver + Spawn + Clone),
201 req: UnderhillDiag,
202 mut ctx: CancelContext,
203 ) {
204 match req {
205 UnderhillDiag::Exec(request, response) => response.send(grpc_result(
206 ctx.until_cancelled(self.handle_exec(driver, &request))
207 .await,
208 )),
209 UnderhillDiag::Wait(request, response) => response.send(grpc_result(
210 ctx.until_cancelled(self.handle_wait(&request)).await,
211 )),
212 UnderhillDiag::Start(request, response) => {
213 response.send(grpc_result(
214 ctx.until_cancelled(self.handle_start(request)).await,
215 ));
216 }
217 UnderhillDiag::Kmsg(request, response) => {
218 response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
219 }
220 UnderhillDiag::Crash(request, response) => {
221 response.send(grpc_result(
222 ctx.until_cancelled(self.handle_crash(request)).await,
223 ));
224 }
225 UnderhillDiag::Restart(_, response) => {
226 response.send(grpc_result(
227 ctx.until_cancelled(self.handle_restart()).await,
228 ));
229 }
230 UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
231 .handle_read_file(driver, &request)
232 .await))),
233 UnderhillDiag::Pause(_, response) => {
234 response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
235 }
236 UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
237 ctx.until_cancelled(self.handle_packet_capture(&request))
238 .await,
239 )),
240 UnderhillDiag::Resume(_, response) => {
241 response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
242 }
243 UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
244 ctx.until_cancelled(self.handle_dump_saved_state()).await,
245 )),
246 }
247 }
248
249 async fn handle_diag2_request(
250 &self,
251 _driver: &(impl Driver + Spawn + Clone),
252 req: OpenhclDiag,
253 mut ctx: CancelContext,
254 ) {
255 match req {
256 OpenhclDiag::Ping((), response) => {
257 response.send(Ok(()));
258 }
259 OpenhclDiag::MemoryProfileTrace(request, response) => response.send(grpc_result(
260 ctx.until_cancelled(self.handle_memory_profile_trace(&request))
261 .await,
262 )),
263 }
264 }
265
266 async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
267 let params = StartParams {
268 env: request
269 .env
270 .into_iter()
271 .map(|pair| (pair.name, pair.value))
272 .collect(),
273 args: request.args,
274 };
275 self.request_send
276 .call_failable(DiagRequest::Start, params)
277 .await?;
278 Ok(())
279 }
280
281 async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
282 self.request_send.send(DiagRequest::Crash(request.pid));
283
284 Ok(())
285 }
286
287 async fn handle_exec(
288 &self,
289 driver: &(impl Driver + Spawn + Clone),
290 request: &ExecRequest,
291 ) -> anyhow::Result<ExecResponse> {
292 tracing::info!(
293 command = %request.command,
294 stdin = request.stdin,
295 stdout = request.stdout,
296 stderr = request.stderr,
297 tty = request.tty,
298 "exec request"
299 );
300
301 let stdin = if request.stdin != 0 {
302 Some(
303 self.take_connection(request.stdin)
304 .await
305 .context("failed to get stdin conn")?,
306 )
307 } else {
308 None
309 };
310 let stdout = if request.stdout != 0 {
311 Some(
312 self.take_connection(request.stdout)
313 .await
314 .context("failed to get stdout conn")?,
315 )
316 } else {
317 None
318 };
319 let stderr = if request.stderr != 0 {
320 Some(
321 self.take_connection(request.stderr)
322 .await
323 .context("failed to get stderr conn")?,
324 )
325 } else {
326 None
327 };
328
329 let mut builder = pal::unix::process::Builder::new(&request.command);
330 builder.args(&request.args);
331 if request.clear_env {
332 builder.env_clear();
333 }
334 for diag_proto::EnvPair { name, value } in &request.env {
335 if let Some(value) = value {
336 builder.env(name, value);
337 } else {
338 builder.env_remove(name);
339 }
340 }
341
342 #[cfg(target_arch = "x86_64")]
347 {
348 let result =
349 safe_intrinsics::cpuid(hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION, 0);
350 let tdx_isolated = (result.ebx & 0xF) == 3;
352 if tdx_isolated {
353 builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
354 }
355 };
356
357 let mut stdin_relay = None;
358 let mut stdout_relay = None;
359 let mut stderr_relay = None;
360 let mut raw_stdout = None;
361 let mut raw_stderr = None;
362 let mut child = {
363 let (stdin_pipes, stdout_pipes, stderr_pipes);
364 let stdin_socket;
365 let stdout_socket;
366 let stderr_socket;
367 let pty;
368 if request.tty {
369 pty = term::open_pty().context("failed to create pty")?;
370
371 let primary = PolledPipe::new(driver, pty.0)
372 .context("failed to create polled pty primary")?;
373
374 let secondary = &pty.1;
375
376 let (primary_read, primary_write) = primary.split();
377 if let Some(stdin) = stdin {
378 stdin_relay = Some(driver.spawn("pty stdin relay", async move {
379 relay(stdin, primary_write).await;
380 }));
381 }
382 if let Some(stdout) = stdout {
383 stdout_relay =
384 Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
385 }
386
387 builder
388 .setsid(true)
389 .controlling_terminal(secondary.as_fd())
390 .stdin(Stdio::Fd(secondary.as_fd()))
391 .stdout(Stdio::Fd(secondary.as_fd()))
392 .stderr(Stdio::Fd(secondary.as_fd()));
393 } else if request.raw_socket_io {
394 if let Some(stdin) = stdin {
395 stdin_socket = stdin.into_inner();
396 builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
397 }
398 if let Some(stdout) = stdout {
399 stdout_socket = raw_stdout.insert(stdout.into_inner());
400 builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
401 if request.combine_stderr {
402 builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
403 }
404 }
405 if let Some(stderr) = stderr {
406 stderr_socket = raw_stderr.insert(stderr.into_inner());
407 builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
408 }
409 } else {
410 if let Some(stdin) = stdin {
411 stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
412 let pipe = PolledPipe::new(driver, stdin_pipes.1)
413 .context("failed to create polled pipe")?;
414 stdin_relay = Some(driver.spawn("stdin relay", async move {
415 relay(stdin, pipe).await;
416 }));
417 builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
418 }
419 if let Some(stdout) = stdout {
420 stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
421 let pipe = PolledPipe::new(driver, stdout_pipes.0)
422 .context("failed to create polled pipe")?;
423 stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
424 builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
425 if request.combine_stderr {
426 builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
427 }
428 }
429 if let Some(stderr) = stderr {
430 stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
431 let pipe = PolledPipe::new(driver, stderr_pipes.0)
432 .context("failed to create polled pipe")?;
433 stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
434 builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
435 }
436 }
437
438 builder
439 .spawn()
440 .with_context(|| format!("failed to launch {}", &request.command))?
441 };
442
443 let pid = child.id();
444
445 tracing::info!(pid, "spawned child");
446
447 let mut child_ready = driver
448 .new_dyn_fd_ready(child.as_fd().as_raw_fd())
449 .expect("failed creating child poll");
450
451 let status = driver.spawn("diag child wait", {
452 let driver = driver.clone();
453 async move {
454 poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
455 .await;
456 let status = child.try_wait().unwrap().unwrap();
457 tracing::info!(pid, ?status, "child exited");
458
459 drop(stdin_relay);
461
462 let finish_raw = |raw: Option<Socket>| {
465 raw.and_then(|raw| {
466 let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
467 PolledSocket::new(&driver, raw).ok()
468 })
469 };
470 let raw_stdout = finish_raw(raw_stdout);
471 let raw_stderr = finish_raw(raw_stderr);
472
473 driver
476 .spawn("socket-wait", async move {
477 let await_output_relay = async |task, raw| {
478 let socket = if let Some(task) = task {
479 Some(task.await)
480 } else {
481 raw
482 };
483 if let Some(socket) = socket {
484 let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
487 }
488 };
489
490 await_output_relay(stdout_relay, raw_stdout).await;
491 await_output_relay(stderr_relay, raw_stderr).await;
492 })
493 .detach();
494
495 status
496 }
497 });
498
499 self.children.lock().insert(pid, status);
500 Ok(ExecResponse { pid })
501 }
502
503 async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
504 tracing::debug!(pid = request.pid, "wait request");
505 let channel = self
506 .children
507 .lock()
508 .remove(&request.pid)
509 .context("pid not found")?;
510
511 let status = channel.await;
512 let exit_code = status.code().unwrap_or(255);
513
514 tracing::debug!(pid = request.pid, exit_code, "wait complete");
515
516 Ok(WaitResponse { exit_code })
517 }
518
519 async fn handle_inspect(
520 &self,
521 request: &InspectRequest,
522 mut ctx: CancelContext,
523 ) -> InspectResponse2 {
524 tracing::debug!(
525 path = request.path.as_str(),
526 depth = request.depth,
527 "inspect request"
528 );
529 let mut inspection = InspectionBuilder::new(&request.path)
530 .depth(Some(request.depth as usize))
531 .sensitivity(self.inspect_sensitivity_level)
532 .inspect(inspect::send(&self.request_send, DiagRequest::Inspect));
533
534 let _ = ctx.until_cancelled(inspection.resolve()).await;
537
538 let result = inspection.results();
539 InspectResponse2 { result }
540 }
541
542 async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
543 tracing::debug!(
544 path = request.path.as_str(),
545 value = request.value.as_str(),
546 "update request"
547 );
548 let new_value = InspectionBuilder::new(&request.path)
549 .sensitivity(self.inspect_sensitivity_level)
550 .update(
551 &request.value,
552 inspect::send(&self.request_send, DiagRequest::Inspect),
553 )
554 .await?;
555 Ok(UpdateResponse2 { new_value })
556 }
557
558 async fn handle_kmsg(
559 &self,
560 driver: &(impl Driver + Spawn + Clone),
561 request: &KmsgRequest,
562 ) -> anyhow::Result<()> {
563 self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
564 .await
565 }
566
567 async fn handle_read_file(
568 &self,
569 driver: &(impl Driver + Spawn + Clone),
570 request: &FileRequest,
571 ) -> anyhow::Result<()> {
572 self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
573 .await
574 }
575
576 async fn handle_packet_capture(
577 &self,
578 request: &NetworkPacketCaptureRequest,
579 ) -> anyhow::Result<NetworkPacketCaptureResponse> {
580 let operation = if request.operation == Operation::Query as i32 {
581 PacketCaptureOperation::Query
582 } else if request.operation == Operation::Start as i32 {
583 PacketCaptureOperation::Start
584 } else if request.operation == Operation::Stop as i32 {
585 PacketCaptureOperation::Stop
586 } else {
587 anyhow::bail!("unsupported request type {}", request.operation);
588 };
589
590 let op_data = match operation {
591 PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
593 PacketCaptureOperation::Start => {
594 let Some(op_data) = &request.op_data else {
595 anyhow::bail!("missing start operation parameters");
596 };
597
598 match op_data {
599 diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
600 let writers = join_all(start_data.conns.iter().map(async |c| {
601 let conn = self.take_connection(*c).await?;
602 Ok(conn.into_inner())
603 }))
604 .await
605 .into_iter()
606 .collect::<anyhow::Result<Vec<Socket>>>()?;
607 Some(OperationData::OpStartData(StartData {
608 writers,
609 snaplen: start_data.snaplen,
610 }))
611 }
612 }
613 }
614 _ => None,
615 };
616
617 let params = PacketCaptureParams { operation, op_data };
618 let params = self
619 .request_send
620 .call_failable(DiagRequest::PacketCapture, params)
621 .await?;
622 let num_streams = match params.op_data {
623 Some(OperationData::OpQueryData(num_streams)) => num_streams,
624 _ => 0,
625 };
626 Ok(NetworkPacketCaptureResponse { num_streams })
627 }
628
629 async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
630 let conn = self.take_connection(request.conn).await?;
631 #[cfg(feature = "profiler")]
632 {
633 let profiler_request = profiler_worker::ProfilerRequest {
634 profiler_args: request.profiler_args,
635 duration: request.duration,
636 conn: conn.into_inner(),
637 };
638
639 self.request_send
640 .call_failable(DiagRequest::Profile, profiler_request)
641 .await?;
642 }
643 #[cfg(not(feature = "profiler"))]
644 {
645 drop(conn);
647 tracing::error!("Profiler feature disabled");
648 }
649 Ok(())
650 }
651
652 async fn handle_read_file_request(
653 &self,
654 driver: &(impl Driver + Spawn + Clone),
655 conn: u64,
656 follow: bool,
657 file_path: &str,
658 ) -> anyhow::Result<()> {
659 let mut conn = self.take_connection(conn).await?;
660 let file = fs_err::File::open(file_path).context("failed to open file")?;
661
662 let file_meta = file.metadata()?;
663
664 if file_meta.file_type().is_char_device() {
665 let file =
666 PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
667
668 driver
669 .spawn("read file relay", async move {
670 if let Err(err) = relay_read_file(file, conn, follow).await {
671 tracing::warn!(
672 error = &*err as &dyn std::error::Error,
673 "read file relay failed"
674 );
675 }
676 })
677 .detach();
678 } else if file_meta.file_type().is_file() {
679 driver
680 .spawn("read file relay", async move {
681 if let Err(err) =
688 futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
689 {
690 tracing::warn!(
691 error = &err as &dyn std::error::Error,
692 "read file relay failed"
693 );
694 }
695 })
696 .detach();
697 } else {
698 anyhow::bail!("cannot read directory");
699 }
700
701 Ok(())
702 }
703
704 async fn handle_restart(&self) -> anyhow::Result<()> {
705 self.request_send
706 .call_failable(DiagRequest::Restart, ())
707 .await?;
708 Ok(())
709 }
710
711 async fn handle_pause(&self) -> anyhow::Result<()> {
712 self.request_send
713 .call_failable(DiagRequest::Pause, ())
714 .await?;
715 Ok(())
716 }
717
718 async fn handle_resume(&self) -> anyhow::Result<()> {
719 self.request_send
720 .call_failable(DiagRequest::Resume, ())
721 .await?;
722 Ok(())
723 }
724
725 async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
726 let data = self
727 .request_send
728 .call_failable(DiagRequest::Save, ())
729 .await?;
730
731 Ok(diag_proto::DumpSavedStateResponse { data })
732 }
733
734 async fn handle_memory_profile_trace(
735 &self,
736 request: &MemoryProfileTraceRequest,
737 ) -> anyhow::Result<diag_proto::MemoryProfileTraceResponse> {
738 cfg_if::cfg_if! {
739 if #[cfg(feature = "mem-profile-tracing")]
740 {
741 let data = self
742 .request_send
743 .call_failable(DiagRequest::MemoryProfileTrace, request.pid)
744 .await?;
745
746 Ok(diag_proto::MemoryProfileTraceResponse { data })
747 } else {
748 let _ = request;
749 anyhow::bail!(
750 "Profiler tracing feature disabled: rebuild with the `mem-profile-tracing` feature enabled"
751 )
752 }
753 }
754 }
755}
756
757async fn relay<
758 R: 'static + AsyncRead + Unpin + Send,
759 W: 'static + AsyncWrite + PollReady + Unpin + Send,
760>(
761 mut read: R,
762 mut write: W,
763) -> W {
764 let mut buffer = [0; 1024];
765 let result: anyhow::Result<_> = async {
766 loop {
767 let n = futures::select! { n = read.read(&mut buffer).fuse() => n.context("read failed")?,
769 _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
770 break;
780 }
781 };
782 if n == 0 {
783 break;
784 }
785 write
786 .write_all(&buffer[..n])
787 .await
788 .context("write failed")?;
789 }
790 Ok(())
791 }
792 .await;
793 let _ = write.close().await;
794 if let Err(err) = result {
795 tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
796 }
797 write
798}
799
800async fn relay_read_file(
801 mut file: PolledPipe,
802 mut conn: PolledSocket<Socket>,
803 follow: bool,
804) -> anyhow::Result<()> {
805 let mut buffer = [0; FILE_LINE_MAX];
806 loop {
807 let n = if follow {
808 futures::select! { _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
810 n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
811 }
812 } else {
813 file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
816 };
817 let n = match n {
818 Ok(0) => break,
819 Ok(count) => count,
820 Err(e) => {
821 match e.kind() {
822 io::ErrorKind::BrokenPipe => {
823 continue;
826 }
827 io::ErrorKind::WouldBlock => {
828 assert!(!follow);
830 break;
831 }
832 _ => return Err(e).context("file read failed"),
833 }
834 }
835 };
836 assert!(
837 n < buffer.len(),
838 "the file returned a line bigger than its maximum"
839 );
840 buffer[n] = 0;
842 conn.write_all(&buffer[..n + 1])
844 .await
845 .context("socket write failed")?;
846 }
847 Ok(())
848}