UmberHubManager/fiwi/usb_probe.py

87 lines
2.4 KiB
Python

"""Local USB listing (``lsusb``) — sync and :mod:`asyncio` variants."""
from __future__ import annotations
import asyncio
import shutil
import subprocess
def lsusb_lines():
lsusb_bin = shutil.which("lsusb")
if not lsusb_bin:
return []
try:
proc = subprocess.run(
[lsusb_bin], capture_output=True, text=True, timeout=15
)
except (OSError, subprocess.TimeoutExpired):
return []
if proc.returncode != 0 or not proc.stdout:
return []
return proc.stdout.splitlines()
async def alsusb_lines() -> list[str]:
"""Async :func:`lsusb_lines` for use under ``asyncio.gather`` with SSH probes."""
lsusb_bin = shutil.which("lsusb")
if not lsusb_bin:
return []
proc = None
try:
proc = await asyncio.create_subprocess_exec(
lsusb_bin,
stdin=asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
out_b, _ = await asyncio.wait_for(proc.communicate(), timeout=15)
except (OSError, asyncio.TimeoutExpired):
if proc is not None and proc.returncode is None:
proc.kill()
await proc.wait()
return []
if proc is None or proc.returncode != 0 or not out_b:
return []
return out_b.decode(errors="replace").splitlines()
def lsusb_new_devices(before_lines, after_lines):
"""Lines present in after but not before, excluding Acroname hub vendor lines."""
before = set(before_lines)
out = []
for ln in after_lines:
if ln in before:
continue
if "24ff:" in ln.lower():
continue
out.append(ln)
return out
def lsusb_acroname_lines():
try:
lsusb_bin = shutil.which("lsusb")
if not lsusb_bin:
return []
out = subprocess.run(
[lsusb_bin],
capture_output=True,
text=True,
timeout=5,
)
if out.returncode != 0 or not out.stdout:
return []
return [
ln
for ln in out.stdout.splitlines()
if "24ff:" in ln.lower() or " acroname" in ln.lower()
]
except (OSError, subprocess.TimeoutExpired):
return []
async def alsusb_acroname_lines() -> list[str]:
lines = await alsusb_lines()
return [ln for ln in lines if "24ff:" in ln.lower() or " acroname" in ln.lower()]