66 lines
1.8 KiB
Python
Executable File
66 lines
1.8 KiB
Python
Executable File
import time
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class PollResult:
|
|
"""
|
|
Result of a polling operation.
|
|
|
|
Attributes:
|
|
value: The last value returned by the polled function.
|
|
timed_out: True if timeout was reached before success.
|
|
elapsed_time: Total time spent polling in seconds.
|
|
iterations: Number of times the function was called.
|
|
"""
|
|
value: object
|
|
timed_out: bool
|
|
elapsed_time: float
|
|
iterations: int
|
|
|
|
|
|
def poll_until(func, timeout=5.0, interval=0.1):
|
|
"""
|
|
Repeatedly call a function until it succeeds or timeout is reached.
|
|
|
|
Args:
|
|
func: A callable that returns (success: bool, value: T).
|
|
When success is True, polling stops and value is returned.
|
|
timeout: Maximum time to poll in seconds.
|
|
interval: Time to sleep between poll attempts in seconds.
|
|
|
|
Returns:
|
|
PollResult containing:
|
|
- value: The last value returned by func
|
|
- timed_out: True if timeout was reached before success
|
|
- elapsed_time: Total time spent polling
|
|
- iterations: Number of times func was called
|
|
"""
|
|
start_time = time.time()
|
|
iterations = 0
|
|
last_value = None
|
|
|
|
while True:
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= timeout:
|
|
return PollResult(
|
|
value=last_value,
|
|
timed_out=True,
|
|
elapsed_time=elapsed,
|
|
iterations=iterations,
|
|
)
|
|
|
|
iterations += 1
|
|
success, last_value = func()
|
|
|
|
if success:
|
|
return PollResult(
|
|
value=last_value,
|
|
timed_out=False,
|
|
elapsed_time=time.time() - start_time,
|
|
iterations=iterations,
|
|
)
|
|
|
|
time.sleep(interval)
|
|
|