Fix issue with high CPU on listener after initiator disconnect

#6
This commit is contained in:
Aaron Heise 2023-02-18 07:14:15 -06:00
parent 8edb4020b1
commit d4339ba224

View File

@ -454,26 +454,28 @@ class CallbackSubprocess:
self._stdout_is_pipe = stdout_is_pipe
self._stderr_is_pipe = stderr_is_pipe
def _ensure_pipes_closed(self):
self._log.debug("Ensuring pipes are closed")
stdin = self._child_stdin
stdout = self._child_stdout
stderr = self._child_stderr
fds = list(filter(lambda x: x is not None, list({stdin, stdout, stderr})))
for fd in fds:
self._log.debug(f"Closing fd {fd}")
with contextlib.suppress(OSError):
os.close(self._child_stdin)
with contextlib.suppress(OSError):
tty_unset_reader_callbacks(fd)
self._child_stdin = None
self._child_stdout = None
self._child_stderr = None
def terminate(self, kill_delay: float = 1.0):
"""
Terminate child process if running
:param kill_delay: if after kill_delay seconds the child process has not exited, escalate to SIGHUP and SIGKILL
"""
same = self._child_stdout == self._child_stderr
with contextlib.suppress(OSError):
if not self._stdout_eof:
tty_unset_reader_callbacks(self._child_stdout)
os.close(self._child_stdout)
self._child_stdout = None
if not same:
with contextlib.suppress(OSError):
if not self._stderr_eof:
tty_unset_reader_callbacks(self._child_stderr)
os.close(self._child_stderr)
self._child_stdout = None
self._log.debug("terminate()")
if not self.running:
return
@ -491,10 +493,11 @@ class CallbackSubprocess:
self._loop.call_later(kill_delay, kill)
def wait():
self._log.debug("wait()")
with contextlib.suppress(OSError):
self._log.debug("wait()")
os.waitpid(self._pid, 0)
self._log.debug("wait() finish")
self._ensure_pipes_closed()
self._log.debug("wait() finish")
threading.Thread(target=wait, daemon=True).start()
@ -604,7 +607,10 @@ class CallbackSubprocess:
if self._return_code is not None and not process_exists(self._pid):
self._log.debug(f"polled return code {self._return_code}")
self._terminated_cb(self._return_code)
self._loop.call_later(CallbackSubprocess.PROCESS_POLL_TIME, poll)
if self.running:
self._loop.call_later(CallbackSubprocess.PROCESS_POLL_TIME, poll)
else:
self._ensure_pipes_closed()
except Exception as e:
if not hasattr(e, "errno") or e.errno != errno.ECHILD:
self._log.debug(f"Error in process poll: {e}")
@ -621,8 +627,6 @@ class CallbackSubprocess:
self._stdout_eof = True
tty_unset_reader_callbacks(self._child_stdout)
self._stdout_cb(bytearray())
with contextlib.suppress(OSError):
os.close(self._child_stdout)
def stderr():
try:
@ -634,8 +638,6 @@ class CallbackSubprocess:
self._stderr_eof = True
tty_unset_reader_callbacks(self._child_stderr)
self._stdout_cb(bytearray())
with contextlib.suppress(OSError):
os.close(self._child_stderr)
tty_add_reader_callback(self._child_stdout, stdout, self._loop)
if self._child_stderr != self._child_stdout: