Skip to content

Commit 9b5deb3

Browse files
import command pipe from ahrefskit (#48)
1 parent 7474359 commit 9b5deb3

2 files changed

Lines changed: 95 additions & 0 deletions

File tree

command_pipe.ml

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
open Printf
2+
open Devkit
3+
4+
module M = Map.Make (String)
5+
6+
let log = Log.from "command_pipe"
7+
8+
type t = {
9+
mutable status : [ `Stopped | `Running of Lwt_io.input Lwt_io.channel ];
10+
fname : string;
11+
mutable commands : (unit -> unit Lwt.t) M.t;
12+
}
13+
14+
let kill t =
15+
match t.status with
16+
| `Stopped -> Lwt.return_unit
17+
| `Running ic -> begin
18+
t.status <- `Stopped;
19+
let%lwt () =
20+
try%lwt Lwt_unix.unlink t.fname
21+
with exn ->
22+
log#warn ~exn "unlink %S" t.fname;
23+
Lwt.return_unit
24+
in
25+
Lwt_io.close ic
26+
end
27+
28+
let make () =
29+
let dir_name = "var" in
30+
let fname = Filename.concat dir_name (sprintf "control.%d.fifo" (Unix.getpid ())) in
31+
(try Unix.unlink fname with _ -> ());
32+
let t = { status = `Stopped; fname; commands = M.empty } in
33+
let rec loop () =
34+
match t.status with
35+
| `Stopped -> Lwt.return_unit
36+
| `Running ic ->
37+
let%lwt () =
38+
match%lwt Lwt_io.read_line_opt ic with
39+
| None ->
40+
(* We need to reopen the FIFO because the reader gets EOF when the writer closes it. *)
41+
let%lwt () = Lwt_io.close ic in
42+
(* We explicitly provide the [flags] because we don't want the default [O_NONBLOCK] *)
43+
let%lwt ic = Lwt_io.open_file ~flags:[ O_RDONLY ] ~mode:Lwt_io.input fname in
44+
t.status <- `Running ic;
45+
Lwt.return_unit
46+
| Some command ->
47+
match M.find_opt (String.trim command) t.commands with
48+
| None ->
49+
let commands = M.bindings t.commands |> List.map fst |> List.map (sprintf "%S") |> String.concat ", " in
50+
log#error "command not found: %S. Registered commands: %s" command commands;
51+
Lwt.return_unit
52+
| Some f ->
53+
try%lwt f ()
54+
with exn ->
55+
log#error ~exn "command error %S" command;
56+
Lwt.return_unit
57+
in
58+
loop ()
59+
in
60+
let run () =
61+
try%lwt
62+
let%lwt () =
63+
try%lwt Lwt_unix.mkdir dir_name 0o744 with
64+
| Unix.Unix_error (Unix.EEXIST, _, _) -> Lwt.return_unit
65+
| exn -> Lwt.reraise exn
66+
in
67+
let%lwt ic =
68+
let%lwt () = Lwt_unix.mkfifo fname 0o644 in
69+
Lwt_io.open_file ~flags:[ O_RDONLY; O_NONBLOCK ] ~mode:Lwt_io.input fname
70+
in
71+
t.status <- `Running ic;
72+
loop ()
73+
with exn ->
74+
log#error ~exn "run error";
75+
kill t
76+
in
77+
let bg_pool = Background_pool.create () in
78+
Background_pool.add ~at_exit:(fun () -> kill t) ~pick:(Daemon.wait_exit ()) bg_pool "command_pipe" (fun () -> run ());
79+
t
80+
81+
let add_command t name f = t.commands <- M.add name f t.commands

command_pipe.mli

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
(** Command pipe var/control.PID.fifo that reads commands one per line
2+
and runs the corresponding function. The FIFO is removed automatically
3+
on process exit *)
4+
5+
type t
6+
7+
(** Create FIFO and start processing commands. *)
8+
val make : unit -> t
9+
10+
(** Stop processing commands and close file. *)
11+
val kill : t -> unit Lwt.t
12+
13+
(** Define new command for the control FIFO. *)
14+
val add_command : t -> string -> (unit -> unit Lwt.t) -> unit

0 commit comments

Comments
 (0)