-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathcommand_pipe.ml
More file actions
81 lines (75 loc) · 2.49 KB
/
command_pipe.ml
File metadata and controls
81 lines (75 loc) · 2.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
open Printf
open Devkit
module M = Map.Make (String)
let log = Log.from "command_pipe"
type t = {
mutable status : [ `Stopped | `Running of Lwt_io.input Lwt_io.channel ];
fname : string;
mutable commands : (unit -> unit Lwt.t) M.t;
}
let kill t =
match t.status with
| `Stopped -> Lwt.return_unit
| `Running ic -> begin
t.status <- `Stopped;
let%lwt () =
try%lwt Lwt_unix.unlink t.fname
with exn ->
log#warn ~exn "unlink %S" t.fname;
Lwt.return_unit
in
Lwt_io.close ic
end
let make () =
let dir_name = "var" in
let fname = Filename.concat dir_name (sprintf "control.%d.fifo" (Unix.getpid ())) in
(try Unix.unlink fname with _ -> ());
let t = { status = `Stopped; fname; commands = M.empty } in
let rec loop () =
match t.status with
| `Stopped -> Lwt.return_unit
| `Running ic ->
let%lwt () =
match%lwt Lwt_io.read_line_opt ic with
| None ->
(* We need to reopen the FIFO because the reader gets EOF when the writer closes it. *)
let%lwt () = Lwt_io.close ic in
(* We explicitly provide the [flags] because we don't want the default [O_NONBLOCK] *)
let%lwt ic = Lwt_io.open_file ~flags:[ O_RDONLY ] ~mode:Lwt_io.input fname in
t.status <- `Running ic;
Lwt.return_unit
| Some command ->
match M.find_opt (String.trim command) t.commands with
| None ->
let commands = M.bindings t.commands |> List.map fst |> List.map (sprintf "%S") |> String.concat ", " in
log#error "command not found: %S. Registered commands: %s" command commands;
Lwt.return_unit
| Some f ->
try%lwt f ()
with exn ->
log#error ~exn "command error %S" command;
Lwt.return_unit
in
loop ()
in
let run () =
try%lwt
let%lwt () =
try%lwt Lwt_unix.mkdir dir_name 0o744 with
| Unix.Unix_error (Unix.EEXIST, _, _) -> Lwt.return_unit
| exn -> Lwt.reraise exn
in
let%lwt ic =
let%lwt () = Lwt_unix.mkfifo fname 0o644 in
Lwt_io.open_file ~flags:[ O_RDONLY; O_NONBLOCK ] ~mode:Lwt_io.input fname
in
t.status <- `Running ic;
loop ()
with exn ->
log#error ~exn "run error";
kill t
in
let bg_pool = Background_pool.create () in
Background_pool.add ~at_exit:(fun () -> kill t) ~pick:(Daemon.wait_exit ()) bg_pool "command_pipe" (fun () -> run ());
t
let add_command t name f = t.commands <- M.add name f t.commands