diff --git a/src/async-process.c b/src/async-process.c index ad58157..771bbe0 100644 --- a/src/async-process.c +++ b/src/async-process.c @@ -1,133 +1,374 @@ #include "async-process.h" -static const char* open_pty(int *out_fd) -{ - int fd = posix_openpt(O_RDWR | O_CLOEXEC | O_NOCTTY); - if (fd < 0) return NULL; - if (grantpt(fd) == -1 || unlockpt(fd) == -1) return NULL; - fcntl(fd, F_SETFD, FD_CLOEXEC); - const char *name = ptsname(fd); - if (name == NULL) { - close(fd); - return NULL; - } - *out_fd = fd; - return name; +int init_str(struct str *str) { + str->buf = malloc(sizeof(char) * 256); + if (str->buf == NULL) + return -1; + + str->len = 0; + str->cap = 256; + + return 0; } -static struct process* allocate_process(int fd, const char *pts_name, int pid) -{ - struct process *process = malloc(sizeof(struct process)); - if (process == NULL) +void del_str(struct str *str) { + free(str->buf); +} + +static struct process* allocate_process(int fd_io, + int fd_er, + const char *pts_io_name, + const char *pts_er_name, + int pid) { + int stdout_ret = -1, stderr_ret = -1, both_ret = -1; + char *io_str = NULL, *er_str = NULL; + size_t io_strlen = strlen(pts_io_name) + 1; + size_t er_strlen = strlen(pts_er_name) + 1; + + struct process *process = malloc(sizeof(struct process)); + if (process == NULL) + return NULL; + + stdout_ret = init_str(&process->stdout); + if (stdout_ret == -1) + goto FAILED_MALLOC; + + stderr_ret = init_str(&process->stderr); + if (stderr_ret == -1) + goto FAILED_MALLOC; + + both_ret = init_str(&process->both); + if (both_ret == -1) + goto FAILED_MALLOC; + + io_str = malloc(io_strlen * sizeof(char)); + if (io_str == NULL) + goto FAILED_MALLOC; + + er_str = malloc(er_strlen * sizeof(char)); + if (er_str == NULL) + goto FAILED_MALLOC; + + memcpy(io_str, pts_io_name, io_strlen); + memcpy(er_str, pts_er_name, er_strlen); + + process->pts_io_name = io_str; + process->pts_er_name = er_str; + process->fd_io = fd_io; + process->fd_er = fd_er; + process->pid = pid; + + return process; + +FAILED_MALLOC: + if (process != NULL) free(process); + if (stdout_ret != -1) del_str(&process->stdout); + if (stderr_ret != -1) del_str(&process->stderr); + if (both_ret != -1) del_str(&process->both); + if (io_str != NULL) free(io_str); + if (er_str != NULL) free(er_str); return NULL; - process->fd = fd; - process->pty_name = malloc(strlen(pts_name) + 1); - process->pid = pid; - strcpy(process->pty_name, pts_name); - return process; +} + +void delete_process(struct process *process) { + kill(process->pid, 9); + close(process->fd_io); + close(process->fd_er); + del_str(&process->stdout); + del_str(&process->stderr); + del_str(&process->both); + free(process->pts_io_name); + free(process->pts_er_name); + free(process); } void my_exit(int status) { - // exitを使うとatexitで動作に影響を与えられる、これが原因でプロセスを終了できなくなる事があるので使うのを避ける - // 例えばSDL2はat_exitを使っているせいか、lemのSDL2 frontendでasync_processが動作しなくなっていた - _exit(status); + // exitを使うとatexitで動作に影響を与えられる、これが原因でプロセスを終了できなくなる事があるので使うのを避ける + // 例えばSDL2はat_exitを使っているせいか、lemのSDL2 frontendでasync_processが動作しなくなっていた + _exit(status); } -struct process* create_process(char *const command[], bool nonblock, const char *path) -{ - int pty_master; - const char *pts_name = open_pty(&pty_master); - if (pts_name == NULL) - return NULL; +// opens a PTY and assigns master and slave file descriptors to fdm and fds +// respectively. Name will be malloced and it is the callers responsibility +// to free name. On success, return 0. On fail, returns -1. All references +// will also be either initialized or set -1/NULL appropriately. +// nonblock will set nonblock mode on the master PTY FD if nonblock == true +int open_pty(int *fdm, int *fds, char **name, bool nonblock) { + *fdm = -1; + *fds = -1; + *name = NULL; + + // gets a PTY, and initializes the attached slave PTS. grantpt and unlockpt + // are required before opening the slave device. + *fdm = posix_openpt(O_RDWR | O_NOCTTY); + if (*fdm == -1 || grantpt(*fdm) == -1 || unlockpt(*fdm) == -1) + goto FAILED_SETUP; - if (nonblock) - fcntl(pty_master, F_SETFL, O_NONBLOCK); - - int pipefd[2]; - - if (pipe(pipefd) == -1) return NULL; - - pid_t pid = fork(); - - if (pid == 0) { - close(pipefd[0]); - pid = fork(); - if (pid == 0) { - close(pipefd[1]); - setsid(); - int pty_slave = open(pts_name, O_RDWR | O_NOCTTY); - close(pty_master); - - // Set raw mode - struct termios tty; - tcgetattr(pty_slave, &tty); - cfmakeraw(&tty); - tcsetattr(pty_slave, TCSANOW, &tty); - - dup2(pty_slave, STDIN_FILENO); - dup2(pty_slave, STDOUT_FILENO); - dup2(pty_slave, STDERR_FILENO); - close(pty_slave); - if (path != NULL) chdir(path); - execvp(command[0], command); - int error_status = errno; - if (error_status == ENOENT) { + // ptsname returns a string that must be copied, as it is overwritten + // on subsequent calls. + const char *tmp = ptsname(*fdm); + if (tmp == NULL) + goto FAILED_SETUP; + + size_t tmp_len = strlen(tmp) + 1; + *name = malloc(tmp_len * sizeof(char)); + if (*name == NULL) + goto FAILED_SETUP; + + memcpy(*name, tmp, tmp_len); + + *fds = open(*name, O_RDWR | O_NOCTTY); + if (*fds == -1) + goto FAILED_SETUP; + + // ensure both slave and master close after program finishes + fcntl(*fdm, F_SETFD, FD_CLOEXEC); + fcntl(*fds, F_SETFD, FD_CLOEXEC); + + // set master as non-blocking (for get_process_output functions) + if (nonblock) { + fcntl(*fdm, F_SETFL, O_NONBLOCK); + } + + // Set raw mode + struct termios tty; + tcgetattr(*fds, &tty); + cfmakeraw(&tty); + tcsetattr(*fds, TCSANOW, &tty); + + return 0; + +FAILED_SETUP: + if (*fdm != -1) close(*fdm); + if (*fds != -1) close(*fds); + if (*name != NULL) free(*name); + + *fdm = -1; + *fds = -1; + *name = NULL; + return -1; +} + +struct process* create_process(char *const command[], const char *path, bool nonblock) { + // Unix PTYs are bi-directional communication streams. Typically, a terminal will + // combine stdout and stderr and display them in the same output. We want to + // keep the outputs separate at this level so master_pty_er is created just to carry + // the stderr stream. + // + // There is a potential bug here, if the process tries to set terminal attributes (like + // with stty), these updates won't be propogated across both terminals. + + int master_pty_io, slave_pts_io, master_pty_er, slave_pts_er; + char *pts_io_name, *pts_er_name; + int ret; + + ret = open_pty(&master_pty_io, &slave_pts_io, &pts_io_name, nonblock); + if (ret == -1) + goto FAILED_SETUP; + + ret = open_pty(&master_pty_er, &slave_pts_er, &pts_er_name, nonblock); + if (ret == -1) + goto FAILED_SETUP; + + // START CHILD PROCESS AND RETURN ITS PID + pid_t pid = fork(); + + if (pid == -1) { + goto FAILED_SETUP; + } else if (pid != 0) { + close(slave_pts_io); + close(slave_pts_er); + // parent process, return process structure. + struct process *p = allocate_process(master_pty_io, master_pty_er, + pts_io_name, pts_er_name, pid); + + // allocate_process copies the strings it is passed, open_pty mallocs strings + // so we need to free them here before we exit. + free(pts_io_name); + free(pts_er_name); + return p; + } + + // VVV CHILD PROCESS VVV + setsid(); + + // we don't need these in the child process. + free(pts_io_name); + free(pts_er_name); + close(master_pty_io); + close(master_pty_er); + + dup2(slave_pts_io, STDIN_FILENO); + dup2(slave_pts_io, STDOUT_FILENO); + dup2(slave_pts_er, STDERR_FILENO); + + close(slave_pts_io); + close(slave_pts_er); + + if (path != NULL) chdir(path); + + // run command, the current fork process will switch to + // the command. + execvp(command[0], command); + + // if execution reaches here, there was a problem starting + // the program. execvp does not return on success. + int error_status = errno; + if (error_status == ENOENT) { char str[128]; sprintf(str, "%s: command not found", command[0]); write(STDIN_FILENO, str, strlen(str)); - } else { + } else { char *str = strerror(error_status); write(STDIN_FILENO, str, strlen(str)); - } - my_exit(error_status); - } else { - char buf[12]; - sprintf(buf, "%d", pid); - write(pipefd[1], buf, strlen(buf)+1); - close(pipefd[1]); - my_exit(0); } - } else { - close(pipefd[1]); - if (waitpid(pid, NULL, 0) == -1) - return NULL; - char buf[12]; - read(pipefd[0], buf, sizeof(buf)); - close(pipefd[0]); - return allocate_process(pty_master, pts_name, atoi(buf)); - } + my_exit(error_status); + + // ERROR HANDLING +FAILED_SETUP: + // we can assume at this point that any FD that is not -1 needs closed. + if (master_pty_io != -1) close(master_pty_io); + if (master_pty_er != -1) close(master_pty_er); + if (slave_pts_io != -1) close(slave_pts_io); + if (slave_pts_er != -1) close(slave_pts_er); + + // we can assume that any name pointer that is not NULL needs free. + if (pts_io_name != NULL) free(pts_io_name); + if (pts_er_name != NULL) free(pts_er_name); - return NULL; + return NULL; } -void delete_process(struct process *process) -{ - kill(process->pid, 9); - close(process->fd); - free(process->pty_name); - free(process); +int process_pid(struct process *process) { + return process->pid; } -int process_pid(struct process *process) -{ - return process->pid; +// reads all data available in fd (should be non-blocking) into str, +// returns number of bytes read on success, -1 on error. +int str_read_fd(struct str *str, int fd) { + int total_read = 0; + while (true) { + // resize buffer if it is too small (include space for null terminator) + if (str->cap - str->len <= 1) { + char *new_ptr = realloc(str->buf, 2*str->cap); + if (new_ptr == NULL) + return -1; + str->buf = new_ptr; + str->cap *= 2; + } + + // read as much data from fd as possible. (read doesn't add '\0') + int n = read(fd, str->buf + str->len, str->cap - str->len); + + if (total_read == 0 && n == -1) { + return -1; // an error occured on first read + } else if (n <= 0) { + str->buf[str->len] = '\0'; // cap is always len+1 + return total_read; + } + + total_read += n; + str->len += n; + } + + return -1; // control flow shouldn't reach here. } -void process_send_input(struct process *process, const char *string) -{ - write(process->fd, string, strlen(string)); +const char* _process_receive_fd(struct str *s, int fd, size_t *bytes) { + int n = str_read_fd(s, fd); + + if (n == -1 || s->len == 0) + return NULL; + + if (bytes != NULL) + *bytes = s->len; // length of str, not including '\0' + return s->buf; } -const char* process_receive_output(struct process *process) -{ - int n = read(process->fd, process->buffer, sizeof(process->buffer)-1); - if (n == -1) - return NULL; - process->buffer[n] = '\0'; - return process->buffer; +const char* process_receive_stdout(struct process *p, size_t *bytes) { + const char *r = _process_receive_fd(&p->stdout, p->fd_io, bytes); + p->stdout.len = 0; + return r; +} + +const char* process_receive_stderr(struct process *p, size_t *bytes) { + const char *r = _process_receive_fd(&p->stderr, p->fd_er, bytes); + p->stderr.len = 0; + return r; } +const char* process_receive_output(struct process *p, size_t *bytes) { + // these lengths include null terminators + size_t stdout_len = 0; + size_t stderr_len = 0; + + _process_receive_fd(&p->stdout, p->fd_io, &stdout_len); + _process_receive_fd(&p->stderr, p->fd_er, &stderr_len); + + if (p->both.cap < (stdout_len + stderr_len + 1)) { + char *new_ptr = realloc(p->both.buf, stdout_len + stderr_len); + if (new_ptr == NULL) + return NULL; + p->both.buf = new_ptr; + p->both.cap = stdout_len + stderr_len; + } + + memcpy(p->both.buf, p->stdout.buf, stdout_len); + memcpy(p->both.buf + stdout_len, p->stderr.buf, stderr_len); + p->both.buf[stdout_len + stderr_len] = '\0'; + + if (bytes != NULL) + *bytes = stdout_len + stderr_len; + + p->both.len = 0; + p->stdout.len = 0; + p->stderr.len = 0; + + return p->both.buf; +} + +ssize_t _process_write(struct process *process, const char *buf, size_t n, bool readp) { + ssize_t bytes_written = 0; + + while (bytes_written < n) { + ssize_t sent = write(process->fd_io, buf + bytes_written, n - bytes_written); + + if (readp) { + _process_receive_fd(&process->stdout, process->fd_io, NULL); + _process_receive_fd(&process->stderr, process->fd_er, NULL); + } + + if (bytes_written != 0 && sent == -1) { + return bytes_written; + } else if (sent == -1) { + return -1; + } + + bytes_written += sent; + } + + return bytes_written; +} + +ssize_t process_write(struct process *process, const char *buf, size_t n) { + return _process_write(process, buf, n, true); +} + +ssize_t process_write_string(struct process *process, const char *string) { + return _process_write(process, string, strlen(string), true); +} + +ssize_t process_write_noread(struct process *process, const char *buf, size_t n) { + return _process_write(process, buf, n, false); +} + +ssize_t process_write_string_noread(struct process *process, const char *string) { + return _process_write(process, string, strlen(string), false); +} + + + int process_alive_p(struct process *process) { - return kill(process->pid, 0) == 0; + return kill(process->pid, 0) == 0; } diff --git a/src/async-process.h b/src/async-process.h index dd2b26f..8ca5974 100644 --- a/src/async-process.h +++ b/src/async-process.h @@ -18,18 +18,86 @@ #include #include +struct str { + char* buf; + size_t len; + size_t cap; +}; + +int init_str(struct str *str); +void del_str(struct str *str); +int str_read_fd(struct str *str, int fd); + struct process { - char buffer[1024*4]; - int fd; - char *pty_name; - pid_t pid; + struct str stdout; + struct str stderr; + struct str both; + + int fd_io; + int fd_er; + char *pts_io_name; + char *pts_er_name; + pid_t pid; }; -struct process* create_process(char *const command[], bool nonblock, const char *path); +struct process* create_process(char *const command[], const char *path, bool nonblock); void delete_process(struct process *process); int process_pid(struct process *process); -void process_send_input(struct process *process, const char *string); -const char* process_receive_output(struct process *process); + +/** Sends n bytes to process. + +returns the number of bytes written, or -1 indicating an error occurred. An +error will typically occur when the operating system cannot send all n bytes +because the PTY buffer is full. The process will have to read the buffer in +to make space for more data to be written. + +These functions read from the process STDOUT and STDERR buffers to keep +the process from being blocked. The results are buffered and will be returned +on the next call to a process_receive function. If these functions are used +in a separate thread from the process_receive functions, a race condition may +occur when this function is reading from STDOUT/STDERR (possibly realloc'ing) +while `process-receive_*` is reading from the same buffer. +*/ +ssize_t process_write(struct process *process, const char* buf, size_t n); +ssize_t process_write_string(struct process *process, const char *string); + +/** Sends n bytes to process. + +returns the number of bytes written, or -1 indicating an error occurred. An +error will typically occur when the operating system cannot send all n bytes +because the PTY buffer is full. The process will have to read the buffer in +to make space for more data to be written. + +Doesn't read devices STDOUT/STDERR file descriptors. If `process_receive* +functions are not called regularly, the internal PTY buffers may fill and +prevent the attached process from continuing to run. `process_write` and +`process_write_string` prevent this from happening, but their usage requires +other considerations. +*/ +ssize_t process_write_noread(struct process *process, const char* buf, size_t n); +ssize_t process_write_string_noread(struct process *process, const char *string); + +/** Return Process STDOUT. +Returns pointer to a buffer containing data returned by process STDOUT buffer. +this buffer will be overwritten by subsequent calls to this function; if +this output is meant to be kept, it should be copied out. +*/ +const char* process_receive_stdout(struct process *process, size_t *bytes); + +/** Return Process STDERR. +Returns pointer to a buffer containing data returned by process STDERR buffer. +this buffer will be overwritten by subsequent calls to this function; if +this output is meant to be kept, it should be copied out. +*/ +const char* process_receive_stderr(struct process *process, size_t *bytes); + +/** Receive Process STDOUT and STDERR (one after another). +Returns pointer to a buffer containing data returned by process STDERR and +STDOUT buffer. this buffer will be overwritten by subsequent calls to this + function; if this output is meant to be kept, it should be copied out. +*/ +const char* process_receive_output(struct process *process, size_t *bytes); + int process_alive_p(struct process *process); #endif diff --git a/src/async-process.lisp b/src/async-process.lisp index a88b3c6..03d6b53 100644 --- a/src/async-process.lisp +++ b/src/async-process.lisp @@ -5,7 +5,8 @@ :process-send-input :process-receive-output :process-alive-p - :create-process)) + :create-process + :cffi-test)) (in-package :async-process) (eval-when (:compile-toplevel :load-toplevel :execute) @@ -48,7 +49,7 @@ (:unix "libasyncprocess.so") (:windows "libasyncprocess.dll")) -(cffi:use-foreign-library async-process) +; (cffi:use-foreign-library #P"/home/ethan/Documents/async-process/.libs/libasyncprocess.so") (defclass process () ((process :reader process-process :initarg :process) @@ -56,8 +57,8 @@ (cffi:defcfun ("create_process" %create-process) :pointer (command :pointer) - (nonblock :boolean) - (path :string)) + (path :string) + (noblock :bool)) (cffi:defcfun ("delete_process" %delete-process) :void (process :pointer)) @@ -65,16 +66,32 @@ (cffi:defcfun ("process_pid" %process-pid) :int (process :pointer)) -(cffi:defcfun ("process_send_input" %process-send-input) :void +(cffi:defcfun ("process_write" %process-write) :ssize + (process :pointer) + (string :string) + (n :size)) + +(cffi:defcfun ("process_write_string" %process-write-string) :ssize (process :pointer) (string :string)) +(cffi:defcfun ("process_receive_stdout" %process-receive-stdout) :string + (process :pointer) + (bytes :pointer)) + +(cffi:defcfun ("process_receive_stderr" %process-receive-stderr) :string + (process :pointer) + (bytes :pointer)) + (cffi:defcfun ("process_receive_output" %process-receive-output) :pointer - (process :pointer)) + (process :pointer) + (bytes :pointer)) (cffi:defcfun ("process_alive_p" %process-alive-p) :boolean (process :pointer)) +(cffi:defcfun "cffi_test" :string) + (defun create-process (command &key nonblock (encode cffi:*default-foreign-encoding*) directory) (when (and directory (not (uiop:directory-exists-p directory))) (error "Directory ~S does not exist" directory)) @@ -85,9 +102,11 @@ :for c :in command :do (setf (cffi:mem-aref argv :string i) c)) (setf (cffi:mem-aref argv :string length) (cffi:null-pointer)) - (let ((p (%create-process argv nonblock (if directory - (namestring directory) - (cffi:null-pointer))))) + (let ((p (%create-process argv + (if directory + (namestring directory) + (cffi:null-pointer)) + nonblock))) (if (cffi:null-pointer-p p) (error "create-process failed: ~S" command) (make-instance 'process :process p :encode encode)))))) @@ -100,25 +119,27 @@ (defun process-send-input (process string) (let ((cffi:*default-foreign-encoding* (process-encode process))) - (%process-send-input (process-process process) string))) - -(defun pointer-to-string (pointer) - (unless (cffi:null-pointer-p pointer) - (let* ((bytes (loop :for i :from 0 - :for code := (cffi:mem-aref pointer :unsigned-char i) - :until (zerop code) - :collect code)) - (octets (make-array (length bytes) - :element-type '(unsigned-byte 8) - :initial-contents bytes))) - (handler-case (babel:octets-to-string octets) - (error () - ;; Fallback when an error occurs with UTF-8 encoding - (map 'string #'code-char octets)))))) - -(defun process-receive-output (process) - (let ((cffi:*default-foreign-encoding* (process-encode process))) - (pointer-to-string (%process-receive-output (process-process process))))) + (%process-write-string (process-process process) string))) + +(defun process-receive-output (process &optional (source :both)) + "`source` can be either `:stdout`, `:stderr`, or `:both`. It specifies the stream +to read from." + (declare (optimize (debug 3))) + (flet ((call-cfun (read-func) + "helper function to call one of the three cffi functions for receiving output." + (cffi:with-foreign-pointer (bytes 8) + (let ((cffi:*default-foreign-encoding* (process-encode process)) + (output (funcall read-func + (process-process process) + bytes))) + (cffi:foreign-string-to-lisp + output + :count (cffi:mem-ref bytes :size)))))) + + (case source + (:stdout (call-cfun '%process-receive-stdout)) + (:stderr (call-cfun '%process-receive-stderr)) + (:both (call-cfun '%process-receive-output))))) (defun process-alive-p (process) (%process-alive-p (process-process process))) diff --git a/src/test.c b/src/test.c new file mode 100644 index 0000000..37e6ecb --- /dev/null +++ b/src/test.c @@ -0,0 +1,58 @@ +#include "async-process.h" + +int main() { + char *cmd[] = {"tee", "ima-cool-file", NULL}; + struct process *p = create_process(cmd, NULL); + + struct str s; + init_str(&s); + + #define TEST_INPUT_SIZE 500000 + char test_input[TEST_INPUT_SIZE]; + + for (size_t i = 0; i < TEST_INPUT_SIZE; i+=10) { + memcpy(test_input+i, "123456789\n", 10); + } + + size_t n = 0; + while (n != TEST_INPUT_SIZE) { + ssize_t bytes = process_write(p, test_input+n, TEST_INPUT_SIZE-n); + if (bytes > 0) { + printf("wrote %d/%d bytes...\n", n, TEST_INPUT_SIZE); + n += bytes; + } else { + printf("%s: %s\n", strerrorname_np(errno), strerror(errno)); + } + + } + + printf("I just attempted to write %d.\nI wrote %d bytes.\n", TEST_INPUT_SIZE, n); + + while (true) { + int n = str_read_fd(&s, STDIN_FILENO); + if (n > 0) { + process_write(p, s.buf, n); + s.len = 0; + if (strcmp(s.buf, "exit\n") == 0) + break; + } + + const char *out = NULL; + const char *err = NULL; + + out = process_receive_stdout(p); + err = process_receive_stderr(p); + + if (out != NULL) { + printf("%s", out); + } + + if (err != NULL) { + printf("\033[31m%s\033[0m", err); + } + } + + delete_process(p); + del_str(&s); + return 0; +} diff --git a/src/test.lisp b/src/test.lisp new file mode 100644 index 0000000..859f36b --- /dev/null +++ b/src/test.lisp @@ -0,0 +1,31 @@ +asdf:*central-registry* +(ql:quickload "alexandria") +(ql:quickload "babel") + +(setf asdf:*central-registry* (list #P"/home/ethan/Documents/async-process/src/")) +(asdf:load-asd #P"/home/ethan/Documents/async-process/src/async-process.asd") + +(asdf:load-system "async-process") + +(defvar *proc* nil) +(setf *proc* (async-process:create-process '("tee" "/home/ethan/test.log") + :nonblock t)) + +(format t "~&~a" + (with-output-to-string (s) + (async-process:process-send-input *proc* (format nil "ima bot~%")) + (sleep 0.1) + (format s "~A" (async-process:process-receive-output *proc*)))) + +(async-process:process-send-input *proc* (format nil "bop~%")) +(format t (async-process:process-receive-output *proc* :both)) + +(format t "~&~S" (async-process:process-receive-output *proc* :both)) + +(defun cffi-null-string-test () + (format t "~&~S" + (cffi:with-pointer-to-vector-data + (p (make-array 10 + :element-type '(unsigned-byte 8) + :initial-element 0)) + (cffi:foreign-string-to-lisp p)))) \ No newline at end of file