1use crate::grpc_result;
7use crate::new_pty;
8use anyhow::Context;
9use azure_profiler_proto::AzureProfiler;
10use azure_profiler_proto::ProfileRequest;
11use diag_proto::ExecRequest;
12use diag_proto::ExecResponse;
13use diag_proto::FILE_LINE_MAX;
14use diag_proto::FileRequest;
15use diag_proto::KmsgRequest;
16use diag_proto::MemoryProfileTraceRequest;
17use diag_proto::NetworkPacketCaptureRequest;
18use diag_proto::NetworkPacketCaptureResponse;
19use diag_proto::OpenhclDiag;
20use diag_proto::StartRequest;
21use diag_proto::UnderhillDiag;
22use diag_proto::WaitRequest;
23use diag_proto::WaitResponse;
24use diag_proto::network_packet_capture_request::Operation;
25use futures::AsyncRead;
26use futures::AsyncReadExt;
27use futures::AsyncWrite;
28use futures::AsyncWriteExt;
29use futures::FutureExt;
30use futures::StreamExt;
31use futures::future::join_all;
32use futures::io::AllowStdIo;
33use futures_concurrency::stream::Merge;
34use inspect::InspectionBuilder;
35use inspect_proto::InspectRequest;
36use inspect_proto::InspectResponse2;
37use inspect_proto::InspectService;
38use inspect_proto::UpdateRequest;
39use inspect_proto::UpdateResponse2;
40use mesh::CancelContext;
41use mesh::rpc::FailableRpc;
42use mesh::rpc::RpcSend;
43use mesh_rpc::server::RpcReceiver;
44use net_packet_capture::OperationData;
45use net_packet_capture::PacketCaptureOperation;
46use net_packet_capture::PacketCaptureParams;
47use net_packet_capture::StartData;
48use pal::unix::process::Stdio;
49use pal_async::driver::Driver;
50use pal_async::interest::InterestSlot;
51use pal_async::interest::PollEvents;
52use pal_async::pipe::PolledPipe;
53use pal_async::socket::AsSockRef;
54use pal_async::socket::PollReady;
55use pal_async::socket::PollReadyExt;
56use pal_async::socket::PolledSocket;
57use pal_async::task::Spawn;
58use pal_async::task::Task;
59use parking_lot::Mutex;
60use socket2::Socket;
61use std::collections::HashMap;
62use std::fs::File;
63use std::future::poll_fn;
64use std::io;
65use std::io::Read;
66use std::os::unix::fs::FileTypeExt;
67use std::os::unix::prelude::*;
68use std::process::ExitStatus;
69use std::sync::Arc;
70
71#[derive(Debug, mesh::MeshPayload)]
73pub enum DiagRequest {
74 Start(FailableRpc<StartParams, ()>),
76 Inspect(inspect::Deferred),
78 Crash(i32),
80 Restart(FailableRpc<(), ()>),
82 Pause(FailableRpc<(), ()>),
84 Resume(FailableRpc<(), ()>),
86 Save(FailableRpc<(), Vec<u8>>),
88 PacketCapture(FailableRpc<PacketCaptureParams<Socket>, PacketCaptureParams<Socket>>),
90 #[cfg(feature = "profiler")]
92 Profile(FailableRpc<profiler_worker::ProfilerRequest, ()>),
93 #[cfg(feature = "mem-profile-tracing")]
95 MemoryProfileTrace(FailableRpc<i32, Vec<u8>>),
96}
97
98#[derive(Debug, mesh::MeshPayload)]
100pub struct StartParams {
101 pub env: Vec<(String, Option<String>)>,
103 pub args: Vec<String>,
105}
106
107pub(crate) struct DiagServiceHandler {
108 request_send: mesh::Sender<DiagRequest>,
109 children: Mutex<HashMap<i32, Task<ExitStatus>>>,
110 inspect_sensitivity_level: Option<inspect::SensitivityLevel>,
111 inner: Arc<crate::Inner>,
112}
113
114impl DiagServiceHandler {
115 pub fn new(request_send: mesh::Sender<DiagRequest>, inner: Arc<crate::Inner>) -> Self {
116 Self {
117 children: Default::default(),
118 request_send,
119 inspect_sensitivity_level: if underhill_confidentiality::confidential_filtering_enabled(
121 ) {
122 Some(inspect::SensitivityLevel::Safe)
123 } else {
124 None
125 },
126 inner,
129 }
130 }
131
132 pub async fn process_requests(
133 self: &Arc<Self>,
134 driver: &(impl Driver + Spawn + Clone),
135 diag_recv: RpcReceiver<UnderhillDiag>,
136 diag2_recv: RpcReceiver<OpenhclDiag>,
137 inspect_recv: RpcReceiver<InspectService>,
138 profile_recv: RpcReceiver<AzureProfiler>,
139 ) -> anyhow::Result<()> {
140 enum Event {
141 Diag(UnderhillDiag),
142 Diag2(OpenhclDiag),
143 Inspect(InspectService),
144 Profile(AzureProfiler),
145 }
146 let mut s = (
147 diag_recv.map(|(ctx, req)| (ctx, Event::Diag(req))),
148 diag2_recv.map(|(ctx, req)| (ctx, Event::Diag2(req))),
149 inspect_recv.map(|(ctx, req)| (ctx, Event::Inspect(req))),
150 profile_recv.map(|(ctx, req)| (ctx, Event::Profile(req))),
151 )
152 .merge();
153
154 while let Some((ctx, req)) = s.next().await {
155 driver
156 .spawn("diag request", {
157 let driver = driver.clone();
158 let this = self.clone();
159 async move {
160 match req {
161 Event::Diag(req) => this.handle_diag_request(&driver, req, ctx).await,
162 Event::Diag2(req) => this.handle_diag2_request(&driver, req, ctx).await,
163 Event::Inspect(req) => this.handle_inspect_request(req, ctx).await,
164 Event::Profile(req) => this.handle_profile_request(req, ctx).await,
165 }
166 }
167 })
168 .detach();
169 }
170 Ok(())
171 }
172
173 async fn take_connection(&self, id: u64) -> anyhow::Result<PolledSocket<Socket>> {
174 self.inner.take_connection(id).await
175 }
176
177 async fn handle_inspect_request(&self, req: InspectService, mut ctx: CancelContext) {
178 match req {
179 InspectService::Inspect(request, response) => {
180 let inspect_response = self.handle_inspect(&request, ctx).await;
181 response.send(grpc_result(Ok(Ok(inspect_response))));
182 }
183 InspectService::Update(request, response) => {
184 response.send(grpc_result(
185 ctx.until_cancelled(self.handle_update(&request)).await,
186 ));
187 }
188 }
189 }
190
191 async fn handle_profile_request(&self, req: AzureProfiler, mut ctx: CancelContext) {
192 match req {
193 AzureProfiler::Profile(request, response) => response.send(grpc_result(
194 ctx.until_cancelled(self.handle_profile(request)).await,
195 )),
196 }
197 }
198
199 async fn handle_diag_request(
200 &self,
201 driver: &(impl Driver + Spawn + Clone),
202 req: UnderhillDiag,
203 mut ctx: CancelContext,
204 ) {
205 match req {
206 UnderhillDiag::Exec(request, response) => response.send(grpc_result(
207 ctx.until_cancelled(self.handle_exec(driver, &request))
208 .await,
209 )),
210 UnderhillDiag::Wait(request, response) => response.send(grpc_result(
211 ctx.until_cancelled(self.handle_wait(&request)).await,
212 )),
213 UnderhillDiag::Start(request, response) => {
214 response.send(grpc_result(
215 ctx.until_cancelled(self.handle_start(request)).await,
216 ));
217 }
218 UnderhillDiag::Kmsg(request, response) => {
219 response.send(grpc_result(Ok(self.handle_kmsg(driver, &request).await)))
220 }
221 UnderhillDiag::Crash(request, response) => {
222 response.send(grpc_result(
223 ctx.until_cancelled(self.handle_crash(request)).await,
224 ));
225 }
226 UnderhillDiag::Restart(_, response) => {
227 response.send(grpc_result(
228 ctx.until_cancelled(self.handle_restart()).await,
229 ));
230 }
231 UnderhillDiag::ReadFile(request, response) => response.send(grpc_result(Ok(self
232 .handle_read_file(driver, &request)
233 .await))),
234 UnderhillDiag::Pause(_, response) => {
235 response.send(grpc_result(ctx.until_cancelled(self.handle_pause()).await))
236 }
237 UnderhillDiag::PacketCapture(request, response) => response.send(grpc_result(
238 ctx.until_cancelled(self.handle_packet_capture(&request))
239 .await,
240 )),
241 UnderhillDiag::Resume(_, response) => {
242 response.send(grpc_result(ctx.until_cancelled(self.handle_resume()).await))
243 }
244 UnderhillDiag::DumpSavedState((), response) => response.send(grpc_result(
245 ctx.until_cancelled(self.handle_dump_saved_state()).await,
246 )),
247 }
248 }
249
250 async fn handle_diag2_request(
251 &self,
252 _driver: &(impl Driver + Spawn + Clone),
253 req: OpenhclDiag,
254 mut ctx: CancelContext,
255 ) {
256 match req {
257 OpenhclDiag::Ping((), response) => {
258 response.send(Ok(()));
259 }
260 OpenhclDiag::MemoryProfileTrace(request, response) => response.send(grpc_result(
261 ctx.until_cancelled(self.handle_memory_profile_trace(&request))
262 .await,
263 )),
264 }
265 }
266
267 async fn handle_start(&self, request: StartRequest) -> anyhow::Result<()> {
268 let params = StartParams {
269 env: request
270 .env
271 .into_iter()
272 .map(|pair| (pair.name, pair.value))
273 .collect(),
274 args: request.args,
275 };
276 self.request_send
277 .call_failable(DiagRequest::Start, params)
278 .await?;
279 Ok(())
280 }
281
282 async fn handle_crash(&self, request: diag_proto::CrashRequest) -> anyhow::Result<()> {
283 self.request_send.send(DiagRequest::Crash(request.pid));
284
285 Ok(())
286 }
287
288 async fn handle_exec(
289 &self,
290 driver: &(impl Driver + Spawn + Clone),
291 request: &ExecRequest,
292 ) -> anyhow::Result<ExecResponse> {
293 tracing::info!(
294 command = %request.command,
295 stdin = request.stdin,
296 stdout = request.stdout,
297 stderr = request.stderr,
298 tty = request.tty,
299 "exec request"
300 );
301
302 let stdin = if request.stdin != 0 {
303 Some(
304 self.take_connection(request.stdin)
305 .await
306 .context("failed to get stdin conn")?,
307 )
308 } else {
309 None
310 };
311 let stdout = if request.stdout != 0 {
312 Some(
313 self.take_connection(request.stdout)
314 .await
315 .context("failed to get stdout conn")?,
316 )
317 } else {
318 None
319 };
320 let stderr = if request.stderr != 0 {
321 Some(
322 self.take_connection(request.stderr)
323 .await
324 .context("failed to get stderr conn")?,
325 )
326 } else {
327 None
328 };
329
330 let mut builder = pal::unix::process::Builder::new(&request.command);
331 builder.args(&request.args);
332 if request.clear_env {
333 builder.env_clear();
334 }
335 for diag_proto::EnvPair { name, value } in &request.env {
336 if let Some(value) = value {
337 builder.env(name, value);
338 } else {
339 builder.env_remove(name);
340 }
341 }
342
343 #[cfg(target_arch = "x86_64")]
348 {
349 let result =
350 safe_intrinsics::cpuid(hvdef::HV_CPUID_FUNCTION_MS_HV_ISOLATION_CONFIGURATION, 0);
351 let tdx_isolated = (result.ebx & 0xF) == 3;
353 if tdx_isolated {
354 builder.env("GLIBC_TUNABLES", "glibc.cpu.x86_non_temporal_threshold=0x11a000:glibc.cpu.x86_rep_movsb_threshold=0x4000");
355 }
356 };
357
358 let mut stdin_relay = None;
359 let mut stdout_relay = None;
360 let mut stderr_relay = None;
361 let mut raw_stdout = None;
362 let mut raw_stderr = None;
363 let mut child = {
364 let (stdin_pipes, stdout_pipes, stderr_pipes);
365 let stdin_socket;
366 let stdout_socket;
367 let stderr_socket;
368 let pty;
369 if request.tty {
370 pty = new_pty::new_pty().context("failed to create pty")?;
371
372 let primary = PolledPipe::new(driver, pty.0)
373 .context("failed to create polled pty primary")?;
374
375 let secondary = &pty.1;
376
377 let (primary_read, primary_write) = primary.split();
378 if let Some(stdin) = stdin {
379 stdin_relay = Some(driver.spawn("pty stdin relay", async move {
380 relay(stdin, primary_write).await;
381 }));
382 }
383 if let Some(stdout) = stdout {
384 stdout_relay =
385 Some(driver.spawn("pty stdout relay", relay(primary_read, stdout)));
386 }
387
388 builder
389 .setsid(true)
390 .controlling_terminal(secondary.as_fd())
391 .stdin(Stdio::Fd(secondary.as_fd()))
392 .stdout(Stdio::Fd(secondary.as_fd()))
393 .stderr(Stdio::Fd(secondary.as_fd()));
394 } else if request.raw_socket_io {
395 if let Some(stdin) = stdin {
396 stdin_socket = stdin.into_inner();
397 builder.stdin(Stdio::Fd(stdin_socket.as_fd()));
398 }
399 if let Some(stdout) = stdout {
400 stdout_socket = raw_stdout.insert(stdout.into_inner());
401 builder.stdout(Stdio::Fd(stdout_socket.as_fd()));
402 if request.combine_stderr {
403 builder.stderr(Stdio::Fd(stdout_socket.as_fd()));
404 }
405 }
406 if let Some(stderr) = stderr {
407 stderr_socket = raw_stderr.insert(stderr.into_inner());
408 builder.stderr(Stdio::Fd(stderr_socket.as_fd()));
409 }
410 } else {
411 if let Some(stdin) = stdin {
412 stdin_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
413 let pipe = PolledPipe::new(driver, stdin_pipes.1)
414 .context("failed to create polled pipe")?;
415 stdin_relay = Some(driver.spawn("stdin relay", async move {
416 relay(stdin, pipe).await;
417 }));
418 builder.stdin(Stdio::Fd(stdin_pipes.0.as_fd()));
419 }
420 if let Some(stdout) = stdout {
421 stdout_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
422 let pipe = PolledPipe::new(driver, stdout_pipes.0)
423 .context("failed to create polled pipe")?;
424 stdout_relay = Some(driver.spawn("stdout relay", relay(pipe, stdout)));
425 builder.stdout(Stdio::Fd(stdout_pipes.1.as_fd()));
426 if request.combine_stderr {
427 builder.stderr(Stdio::Fd(stdout_pipes.1.as_fd()));
428 }
429 }
430 if let Some(stderr) = stderr {
431 stderr_pipes = pal::unix::pipe::pair().context("failed to create pipe pair")?;
432 let pipe = PolledPipe::new(driver, stderr_pipes.0)
433 .context("failed to create polled pipe")?;
434 stderr_relay = Some(driver.spawn("stderr relay", relay(pipe, stderr)));
435 builder.stderr(Stdio::Fd(stderr_pipes.1.as_fd()));
436 }
437 }
438
439 builder
440 .spawn()
441 .with_context(|| format!("failed to launch {}", &request.command))?
442 };
443
444 let pid = child.id();
445
446 tracing::info!(pid, "spawned child");
447
448 let mut child_ready = driver
449 .new_dyn_fd_ready(child.as_fd().as_raw_fd())
450 .expect("failed creating child poll");
451
452 let status = driver.spawn("diag child wait", {
453 let driver = driver.clone();
454 async move {
455 poll_fn(|cx| child_ready.poll_fd_ready(cx, InterestSlot::Read, PollEvents::IN))
456 .await;
457 let status = child.try_wait().unwrap().unwrap();
458 tracing::info!(pid, ?status, "child exited");
459
460 drop(stdin_relay);
462
463 let finish_raw = |raw: Option<Socket>| {
466 raw.and_then(|raw| {
467 let _ = raw.as_sock_ref().shutdown(std::net::Shutdown::Write);
468 PolledSocket::new(&driver, raw).ok()
469 })
470 };
471 let raw_stdout = finish_raw(raw_stdout);
472 let raw_stderr = finish_raw(raw_stderr);
473
474 driver
477 .spawn("socket-wait", async move {
478 let await_output_relay = async |task, raw| {
479 let socket = if let Some(task) = task {
480 Some(task.await)
481 } else {
482 raw
483 };
484 if let Some(socket) = socket {
485 let _ = futures::io::copy(socket, &mut futures::io::sink()).await;
488 }
489 };
490
491 await_output_relay(stdout_relay, raw_stdout).await;
492 await_output_relay(stderr_relay, raw_stderr).await;
493 })
494 .detach();
495
496 status
497 }
498 });
499
500 self.children.lock().insert(pid, status);
501 Ok(ExecResponse { pid })
502 }
503
504 async fn handle_wait(&self, request: &WaitRequest) -> anyhow::Result<WaitResponse> {
505 tracing::debug!(pid = request.pid, "wait request");
506 let channel = self
507 .children
508 .lock()
509 .remove(&request.pid)
510 .context("pid not found")?;
511
512 let status = channel.await;
513 let exit_code = status.code().unwrap_or(255);
514
515 tracing::debug!(pid = request.pid, exit_code, "wait complete");
516
517 Ok(WaitResponse { exit_code })
518 }
519
520 async fn handle_inspect(
521 &self,
522 request: &InspectRequest,
523 mut ctx: CancelContext,
524 ) -> InspectResponse2 {
525 tracing::debug!(
526 path = request.path.as_str(),
527 depth = request.depth,
528 "inspect request"
529 );
530 let mut inspection = InspectionBuilder::new(&request.path)
531 .depth(Some(request.depth as usize))
532 .sensitivity(self.inspect_sensitivity_level)
533 .inspect(inspect::send(&self.request_send, DiagRequest::Inspect));
534
535 let _ = ctx.until_cancelled(inspection.resolve()).await;
538
539 let result = inspection.results();
540 InspectResponse2 { result }
541 }
542
543 async fn handle_update(&self, request: &UpdateRequest) -> anyhow::Result<UpdateResponse2> {
544 tracing::debug!(
545 path = request.path.as_str(),
546 value = request.value.as_str(),
547 "update request"
548 );
549 let new_value = InspectionBuilder::new(&request.path)
550 .sensitivity(self.inspect_sensitivity_level)
551 .update(
552 &request.value,
553 inspect::send(&self.request_send, DiagRequest::Inspect),
554 )
555 .await?;
556 Ok(UpdateResponse2 { new_value })
557 }
558
559 async fn handle_kmsg(
560 &self,
561 driver: &(impl Driver + Spawn + Clone),
562 request: &KmsgRequest,
563 ) -> anyhow::Result<()> {
564 self.handle_read_file_request(driver, request.conn, request.follow, "/dev/kmsg")
565 .await
566 }
567
568 async fn handle_read_file(
569 &self,
570 driver: &(impl Driver + Spawn + Clone),
571 request: &FileRequest,
572 ) -> anyhow::Result<()> {
573 self.handle_read_file_request(driver, request.conn, request.follow, &request.file_path)
574 .await
575 }
576
577 async fn handle_packet_capture(
578 &self,
579 request: &NetworkPacketCaptureRequest,
580 ) -> anyhow::Result<NetworkPacketCaptureResponse> {
581 let operation = if request.operation == Operation::Query as i32 {
582 PacketCaptureOperation::Query
583 } else if request.operation == Operation::Start as i32 {
584 PacketCaptureOperation::Start
585 } else if request.operation == Operation::Stop as i32 {
586 PacketCaptureOperation::Stop
587 } else {
588 anyhow::bail!("unsupported request type {}", request.operation);
589 };
590
591 let op_data = match operation {
592 PacketCaptureOperation::Query => Some(OperationData::OpQueryData(0)),
594 PacketCaptureOperation::Start => {
595 let Some(op_data) = &request.op_data else {
596 anyhow::bail!("missing start operation parameters");
597 };
598
599 match op_data {
600 diag_proto::network_packet_capture_request::OpData::StartData(start_data) => {
601 let writers = join_all(start_data.conns.iter().map(async |c| {
602 let conn = self.take_connection(*c).await?;
603 Ok(conn.into_inner())
604 }))
605 .await
606 .into_iter()
607 .collect::<anyhow::Result<Vec<Socket>>>()?;
608 Some(OperationData::OpStartData(StartData {
609 writers,
610 snaplen: start_data.snaplen,
611 }))
612 }
613 }
614 }
615 _ => None,
616 };
617
618 let params = PacketCaptureParams { operation, op_data };
619 let params = self
620 .request_send
621 .call_failable(DiagRequest::PacketCapture, params)
622 .await?;
623 let num_streams = match params.op_data {
624 Some(OperationData::OpQueryData(num_streams)) => num_streams,
625 _ => 0,
626 };
627 Ok(NetworkPacketCaptureResponse { num_streams })
628 }
629
630 async fn handle_profile(&self, request: ProfileRequest) -> anyhow::Result<()> {
631 let conn = self.take_connection(request.conn).await?;
632 #[cfg(feature = "profiler")]
633 {
634 let profiler_request = profiler_worker::ProfilerRequest {
635 profiler_args: request.profiler_args,
636 duration: request.duration,
637 conn: conn.into_inner(),
638 };
639
640 self.request_send
641 .call_failable(DiagRequest::Profile, profiler_request)
642 .await?;
643 }
644 #[cfg(not(feature = "profiler"))]
645 {
646 drop(conn);
648 tracing::error!("Profiler feature disabled");
649 }
650 Ok(())
651 }
652
653 async fn handle_read_file_request(
654 &self,
655 driver: &(impl Driver + Spawn + Clone),
656 conn: u64,
657 follow: bool,
658 file_path: &str,
659 ) -> anyhow::Result<()> {
660 let mut conn = self.take_connection(conn).await?;
661 let file = fs_err::File::open(file_path).context("failed to open file")?;
662
663 let file_meta = file.metadata()?;
664
665 if file_meta.file_type().is_char_device() {
666 let file =
667 PolledPipe::new(driver, file.into()).context("failed to create polled pipe")?;
668
669 driver
670 .spawn("read file relay", async move {
671 if let Err(err) = relay_read_file(file, conn, follow).await {
672 tracing::warn!(
673 error = &*err as &dyn std::error::Error,
674 "read file relay failed"
675 );
676 }
677 })
678 .detach();
679 } else if file_meta.file_type().is_file() {
680 driver
681 .spawn("read file relay", async move {
682 if let Err(err) =
689 futures::io::copy(AllowStdIo::new(File::from(file)), &mut conn).await
690 {
691 tracing::warn!(
692 error = &err as &dyn std::error::Error,
693 "read file relay failed"
694 );
695 }
696 })
697 .detach();
698 } else {
699 anyhow::bail!("cannot read directory");
700 }
701
702 Ok(())
703 }
704
705 async fn handle_restart(&self) -> anyhow::Result<()> {
706 self.request_send
707 .call_failable(DiagRequest::Restart, ())
708 .await?;
709 Ok(())
710 }
711
712 async fn handle_pause(&self) -> anyhow::Result<()> {
713 self.request_send
714 .call_failable(DiagRequest::Pause, ())
715 .await?;
716 Ok(())
717 }
718
719 async fn handle_resume(&self) -> anyhow::Result<()> {
720 self.request_send
721 .call_failable(DiagRequest::Resume, ())
722 .await?;
723 Ok(())
724 }
725
726 async fn handle_dump_saved_state(&self) -> anyhow::Result<diag_proto::DumpSavedStateResponse> {
727 let data = self
728 .request_send
729 .call_failable(DiagRequest::Save, ())
730 .await?;
731
732 Ok(diag_proto::DumpSavedStateResponse { data })
733 }
734
735 async fn handle_memory_profile_trace(
736 &self,
737 request: &MemoryProfileTraceRequest,
738 ) -> anyhow::Result<diag_proto::MemoryProfileTraceResponse> {
739 cfg_if::cfg_if! {
740 if #[cfg(feature = "mem-profile-tracing")]
741 {
742 let data = self
743 .request_send
744 .call_failable(DiagRequest::MemoryProfileTrace, request.pid)
745 .await?;
746
747 Ok(diag_proto::MemoryProfileTraceResponse { data })
748 } else {
749 let _ = request;
750 anyhow::bail!(
751 "Profiler tracing feature disabled: rebuild with the `mem-profile-tracing` feature enabled"
752 )
753 }
754 }
755 }
756}
757
758async fn relay<
759 R: 'static + AsyncRead + Unpin + Send,
760 W: 'static + AsyncWrite + PollReady + Unpin + Send,
761>(
762 mut read: R,
763 mut write: W,
764) -> W {
765 let mut buffer = [0; 1024];
766 let result: anyhow::Result<_> = async {
767 loop {
768 let n = futures::select! { n = read.read(&mut buffer).fuse() => n.context("read failed")?,
770 _ = write.wait_ready(PollEvents::RDHUP).fuse() => {
771 break;
781 }
782 };
783 if n == 0 {
784 break;
785 }
786 write
787 .write_all(&buffer[..n])
788 .await
789 .context("write failed")?;
790 }
791 Ok(())
792 }
793 .await;
794 let _ = write.close().await;
795 if let Err(err) = result {
796 tracing::warn!(error = &*err as &dyn std::error::Error, "relay error");
797 }
798 write
799}
800
801async fn relay_read_file(
802 mut file: PolledPipe,
803 mut conn: PolledSocket<Socket>,
804 follow: bool,
805) -> anyhow::Result<()> {
806 let mut buffer = [0; FILE_LINE_MAX];
807 loop {
808 let n = if follow {
809 futures::select! { _ = conn.wait_ready(PollEvents::RDHUP).fuse() => break,
811 n = file.read(&mut buffer[..FILE_LINE_MAX - 1]).fuse() => n
812 }
813 } else {
814 file.get().read(&mut buffer[..FILE_LINE_MAX - 1])
817 };
818 let n = match n {
819 Ok(0) => break,
820 Ok(count) => count,
821 Err(e) => {
822 match e.kind() {
823 io::ErrorKind::BrokenPipe => {
824 continue;
827 }
828 io::ErrorKind::WouldBlock => {
829 assert!(!follow);
831 break;
832 }
833 _ => return Err(e).context("file read failed"),
834 }
835 }
836 };
837 assert!(
838 n < buffer.len(),
839 "the file returned a line bigger than its maximum"
840 );
841 buffer[n] = 0;
843 conn.write_all(&buffer[..n + 1])
845 .await
846 .context("socket write failed")?;
847 }
848 Ok(())
849}