FiWiTSF/tsf_sync_rt_starter.c

834 lines
19 KiB
C

/*
* FiWiTSF — TSF sync: RT scheduling + mac80211 debugfs "tsf" read/write.
*
* Build: make (or see Makefile)
*
* Requires: CONFIG_MAC80211_DEBUGFS, debugfs mounted, root or CAP_SYS_NICE for RT.
*
* Threading: default is one RT thread (round-robin followers) — lowest contention
* on driver locks and simplest timing. Optional --parallel uses one sampler +
* one RT thread per follower (see -j).
*
* SIGINT/SIGTERM stop the loop cleanly. Use -u to print periodic per-follower
* error/step stats (TSF units are microseconds). Mean/variance use Welford's
* method; optional --stats-rms-warn flags high spread (RMS error in µs).
* Optional --kalman: 1D random-walk Kalman on measured error; same -k/-s caps
* for fair A/B vs raw error (stats lines still use raw measured error).
*/
#define _GNU_SOURCE
#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
#include <inttypes.h>
#include <math.h>
#include <pthread.h>
#include <sched.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <sys/mman.h>
#define NSEC_PER_SEC 1000000000L
/*
* Default noise variances (µs²) for 1D random-walk + scalar measurement.
* Tune for your link; these are a reasonable starting point for ~10 ms ticks.
*/
#define KF_DEFAULT_Q_PER_TICK 50.0
#define KF_DEFAULT_R_MEAS 4000.0
static volatile sig_atomic_t g_stop;
struct kf1 {
double x; /* posterior mean, µs */
double P; /* posterior variance, µs² */
int inited;
};
struct follower_acc {
unsigned n;
double mean; /* Welford running mean (µs) */
double m2; /* Welford sum of squared deviations */
int64_t max_abs_err;
int64_t max_abs_step;
};
struct cli {
const char *master_path;
char **follower_paths;
size_t n_followers;
int cpu; /* -1 = no affinity */
long period_ns;
unsigned int rt_prio;
unsigned int kp_ppm;
uint64_t max_step_us;
bool parallel;
unsigned int stats_interval_sec; /* 0 = off */
double stats_rms_warn_us; /* 0 = off; warn if RMS error exceeds this */
bool use_kalman;
};
struct sync_ctx {
const struct cli *cli;
struct follower_acc *stats; /* NULL or n_followers entries */
struct kf1 *kf; /* NULL or n_followers; per-follower filter state */
struct timespec last_stats_mono;
};
static void usage(const char *prog)
{
fprintf(stderr,
"Usage: %s -m PATH [-f PATH]... [options]\n"
" (repeat -f for each follower; at least one required)\n"
"\n"
" -m, --master PATH debugfs tsf file for master radio\n"
" -f, --follower PATH debugfs tsf file for a follower (repeat)\n"
" -c, --cpu N pin all RT threads to CPU N (-1 = none)\n"
" -p, --period-ms N control loop period in ms (default 10)\n"
" -P, --rt-priority N SCHED_FIFO priority 1-99 (default 80)\n"
" -k, --kp-ppm N gain: apply (error * N / 1e6) per tick (default 100000 = 10%%)\n"
" -s, --max-step-us N max |TSF step| per tick in microseconds (default 200)\n"
" -u, --stats-interval N print per-follower stats every N seconds (0=off, default 0)\n"
" -G, --stats-rms-warn N warn if error RMS exceeds N µs in a stats window (0=off)\n"
" -K, --kalman filter measured TSF error (1D random walk; same -k/-s)\n"
" -j, --parallel one RT thread per follower + master sampler thread\n"
" -h, --help this text\n"
"\n"
"Example:\n"
" %s -m /sys/kernel/debug/ieee80211/phy0/netdev:wlan0/tsf \\\n"
" -f /sys/kernel/debug/ieee80211/phy1/netdev:wlan1/tsf\n",
prog, prog);
}
static int read_tsf_hex(const char *path, uint64_t *out)
{
char buf[64];
int fd;
ssize_t n;
int ret = -1;
fd = open(path, O_RDONLY);
if (fd < 0)
goto out;
n = read(fd, buf, sizeof(buf) - 1);
close(fd);
if (n <= 0)
goto out;
buf[n] = '\0';
if (sscanf(buf, "0x%llx", (unsigned long long *)out) != 1)
goto out;
ret = 0;
out:
return ret;
}
static int write_tsf_decimal(const char *path, uint64_t tsf)
{
char buf[32];
int fd;
int len;
int ret = -1;
len = snprintf(buf, sizeof(buf), "%llu\n", (unsigned long long)tsf);
if (len < 0 || len >= (int)sizeof(buf))
goto out;
fd = open(path, O_WRONLY);
if (fd < 0)
goto out;
if (write(fd, buf, len) != len) {
close(fd);
goto out;
}
close(fd);
ret = 0;
out:
return ret;
}
static void timespec_add_ns(struct timespec *ts, long ns)
{
ts->tv_nsec += ns;
while (ts->tv_nsec >= NSEC_PER_SEC) {
ts->tv_nsec -= NSEC_PER_SEC;
ts->tv_sec++;
}
while (ts->tv_nsec < 0) {
ts->tv_nsec += NSEC_PER_SEC;
ts->tv_sec--;
}
}
static void apply_rt_attributes(pthread_attr_t *attr, const struct cli *cli);
static void on_signal(int signo)
{
(void)signo;
g_stop = 1;
}
static void setup_shutdown_signals(void)
{
struct sigaction sa;
g_stop = 0;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = on_signal;
sigemptyset(&sa.sa_mask);
if (sigaction(SIGINT, &sa, NULL) != 0 || sigaction(SIGTERM, &sa, NULL) != 0)
perror("sigaction");
}
static void block_sigint_term_in_this_thread(void)
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
pthread_sigmask(SIG_BLOCK, &set, NULL);
}
static void unblock_sigint_term_in_this_thread(void)
{
sigset_t set;
sigemptyset(&set);
sigaddset(&set, SIGINT);
sigaddset(&set, SIGTERM);
pthread_sigmask(SIG_UNBLOCK, &set, NULL);
}
/* Welford's online mean / population variance (stable for windowed stats). */
static void welford_add(struct follower_acc *acc, double x)
{
double delta;
double delta2;
acc->n++;
delta = x - acc->mean;
acc->mean += delta / (double)acc->n;
delta2 = x - acc->mean;
acc->m2 += delta * delta2;
}
static double follower_var_pop(const struct follower_acc *a)
{
double v = 0.0;
if (a->n != 0)
v = a->m2 / (double)a->n;
return v;
}
static void acc_add_sample(struct follower_acc *acc, int64_t err, int64_t step,
pthread_mutex_t *mu)
{
int64_t ae = err >= 0 ? err : -err;
int64_t as = step >= 0 ? step : -step;
if (mu)
pthread_mutex_lock(mu);
welford_add(acc, (double)err);
if (ae > acc->max_abs_err)
acc->max_abs_err = ae;
if (as > acc->max_abs_step)
acc->max_abs_step = as;
if (mu)
pthread_mutex_unlock(mu);
}
static void stats_print_and_reset(FILE *fp, char **paths, size_t n,
struct follower_acc *acc, double rms_warn_us)
{
size_t i;
for (i = 0; i < n; i++) {
struct follower_acc *a = &acc[i];
double var;
double rms;
if (a->n == 0) {
fprintf(fp, "[stats] %s samples=0\n", paths[i]);
continue;
}
var = follower_var_pop(a);
if (var < 0.0)
var = 0.0;
rms = sqrt(var);
fprintf(fp,
"[stats] %s n=%u mean=%.1f var=%.1f rms=%.1f max|err|=%"
PRId64 " max|step|=%" PRId64 " (µs; var µs²)\n",
paths[i], a->n, a->mean, var, rms, a->max_abs_err,
a->max_abs_step);
if (rms_warn_us > 0.0 && rms > rms_warn_us) {
fprintf(stderr,
"[warn] %s error RMS %.1f µs exceeds threshold %.1f µs "
"(population var %.1f µs²)\n",
paths[i], rms, rms_warn_us, var);
}
memset(a, 0, sizeof(*a));
}
}
/* z = measured offset error (µs). Returns filtered estimate for control. */
static double kf1_update(struct kf1 *k, double z, double q, double r)
{
double p_pred;
double Kgain;
double xf;
if (!k->inited) {
k->x = z;
k->P = r;
k->inited = 1;
xf = k->x;
} else {
p_pred = k->P + q;
Kgain = p_pred / (p_pred + r);
k->x += Kgain * (z - k->x);
k->P = (1.0 - Kgain) * p_pred;
xf = k->x;
}
return xf;
}
static void stats_maybe_print(const struct cli *cli, struct follower_acc *acc,
struct timespec *last_mono, pthread_mutex_t *mu)
{
struct timespec now;
if (!cli->stats_interval_sec || !acc)
goto out;
if (clock_gettime(CLOCK_MONOTONIC, &now) < 0)
goto out;
if (last_mono->tv_sec == 0 && last_mono->tv_nsec == 0) {
*last_mono = now;
goto out;
}
if ((unsigned long)(now.tv_sec - last_mono->tv_sec) <
cli->stats_interval_sec)
goto out;
if (mu)
pthread_mutex_lock(mu);
stats_print_and_reset(stderr, cli->follower_paths, cli->n_followers,
acc, cli->stats_rms_warn_us);
if (mu)
pthread_mutex_unlock(mu);
*last_mono = now;
out:
return;
}
static void apply_correction(const struct cli *cli, uint64_t master,
const char *slave_path, size_t idx,
struct follower_acc *stats_all,
pthread_mutex_t *stats_mu, struct kf1 *kf_states)
{
uint64_t slave;
int64_t err;
int64_t err_ctl;
int64_t step;
int64_t cap;
uint64_t new_tsf;
if (read_tsf_hex(slave_path, &slave) < 0) {
fprintf(stderr, "read follower TSF failed: %s\n", slave_path);
goto out;
}
err = (int64_t)(master - slave);
if (cli->use_kalman && kf_states)
err_ctl = (int64_t)llround(kf1_update(&kf_states[idx],
(double)err,
KF_DEFAULT_Q_PER_TICK,
KF_DEFAULT_R_MEAS));
else
err_ctl = err;
step = (err_ctl * (int64_t)cli->kp_ppm) / 1000000;
cap = (int64_t)cli->max_step_us;
if (step > cap)
step = cap;
else if (step < -cap)
step = -cap;
if (step == 0 && err_ctl != 0)
step = (err_ctl > 0) ? 1 : -1;
new_tsf = slave + (uint64_t)step;
if (write_tsf_decimal(slave_path, new_tsf) < 0) {
fprintf(stderr, "set follower TSF failed: %s\n", slave_path);
goto out;
}
if (stats_all)
acc_add_sample(&stats_all[idx], err, step, stats_mu);
out:
return;
}
/* ---------- single RT thread (default) ---------- */
static void *sync_thread_single(void *arg)
{
struct sync_ctx *sctx = arg;
const struct cli *cli = sctx->cli;
struct timespec next;
size_t i;
void *rv = NULL;
unblock_sigint_term_in_this_thread();
if (clock_gettime(CLOCK_MONOTONIC, &next) < 0) {
perror("clock_gettime");
} else {
for (;;) {
uint64_t master;
if (g_stop)
break;
if (read_tsf_hex(cli->master_path, &master) < 0) {
fprintf(stderr, "read master TSF failed: %s\n",
cli->master_path);
} else if (!g_stop) {
for (i = 0; i < cli->n_followers; i++)
apply_correction(cli, master,
cli->follower_paths[i],
i, sctx->stats, NULL,
sctx->kf);
}
stats_maybe_print(cli, sctx->stats, &sctx->last_stats_mono,
NULL);
if (g_stop)
break;
timespec_add_ns(&next, cli->period_ns);
while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME,
&next, NULL) == EINTR) {
if (g_stop)
goto out_loop;
}
}
out_loop:;
}
return rv;
}
/* ---------- parallel: sampler + one thread per follower ---------- */
struct parallel_shared {
const struct cli *cli;
uint64_t master_snap;
pthread_barrier_t bar_after_master;
pthread_barrier_t bar_after_followers;
struct follower_acc *stats;
pthread_mutex_t stats_mu;
bool stats_mu_inited;
struct timespec last_stats_mono;
struct kf1 *kf;
};
struct parallel_ctx;
struct follower_slot {
const char *path;
struct parallel_ctx *owner;
size_t index;
};
struct parallel_ctx {
const struct cli *cli;
struct parallel_shared *ps;
struct follower_slot followers[];
};
static void *parallel_sampler(void *arg)
{
struct parallel_shared *ps = arg;
const struct cli *cli = ps->cli;
struct timespec rel;
unblock_sigint_term_in_this_thread();
rel.tv_sec = cli->period_ns / NSEC_PER_SEC;
rel.tv_nsec = cli->period_ns % NSEC_PER_SEC;
for (;;) {
if (!g_stop) {
if (read_tsf_hex(cli->master_path, &ps->master_snap) < 0) {
fprintf(stderr, "read master TSF failed: %s\n",
cli->master_path);
ps->master_snap = 0;
}
}
pthread_barrier_wait(&ps->bar_after_master);
pthread_barrier_wait(&ps->bar_after_followers);
stats_maybe_print(cli, ps->stats, &ps->last_stats_mono,
ps->stats_mu_inited ? &ps->stats_mu : NULL);
if (g_stop)
break;
while (clock_nanosleep(CLOCK_MONOTONIC, 0, &rel, NULL) == EINTR) {
if (g_stop)
goto out;
}
}
out:
return NULL;
}
static void *parallel_follower_work(void *arg)
{
struct follower_slot *fs = arg;
struct parallel_ctx *pc = fs->owner;
struct parallel_shared *ps = pc->ps;
const struct cli *cli = pc->cli;
struct timespec rel;
rel.tv_sec = cli->period_ns / NSEC_PER_SEC;
rel.tv_nsec = cli->period_ns % NSEC_PER_SEC;
for (;;) {
pthread_barrier_wait(&ps->bar_after_master);
if (!g_stop)
apply_correction(cli, ps->master_snap, fs->path, fs->index,
ps->stats,
ps->stats_mu_inited ? &ps->stats_mu
: NULL,
ps->kf);
pthread_barrier_wait(&ps->bar_after_followers);
if (g_stop)
break;
while (clock_nanosleep(CLOCK_MONOTONIC, 0, &rel, NULL) == EINTR) {
if (g_stop)
goto out;
}
}
out:
return NULL;
}
static int run_parallel(const struct cli *cli)
{
struct parallel_shared ps;
struct parallel_ctx *pc = NULL;
pthread_t sampler;
pthread_t *workers = NULL;
pthread_attr_t attr;
size_t i;
int ret = 0;
size_t n = cli->n_followers;
memset(&ps, 0, sizeof(ps));
ps.cli = cli;
pthread_barrier_init(&ps.bar_after_master, NULL, (unsigned)(n + 1));
pthread_barrier_init(&ps.bar_after_followers, NULL, (unsigned)(n + 1));
if (cli->stats_interval_sec) {
ps.stats = calloc(n, sizeof(struct follower_acc));
if (!ps.stats) {
perror("calloc stats");
ret = 1;
goto out_bar;
}
if (pthread_mutex_init(&ps.stats_mu, NULL) != 0) {
perror("pthread_mutex_init");
free(ps.stats);
ps.stats = NULL;
ret = 1;
goto out_bar;
}
ps.stats_mu_inited = true;
}
if (cli->use_kalman) {
ps.kf = calloc(n, sizeof(struct kf1));
if (!ps.kf) {
perror("calloc kf");
ret = 1;
goto out_stats;
}
}
pc = calloc(1, sizeof(*pc) + n * sizeof(struct follower_slot));
if (!pc) {
ret = 1;
goto out_stats;
}
pc->cli = cli;
pc->ps = &ps;
for (i = 0; i < n; i++) {
pc->followers[i].path = cli->follower_paths[i];
pc->followers[i].owner = pc;
pc->followers[i].index = i;
}
workers = calloc(n, sizeof(pthread_t));
if (!workers) {
ret = 1;
goto out_pc;
}
pthread_attr_init(&attr);
apply_rt_attributes(&attr, cli);
/* Start all followers first so they block on bar_after_master; then sampler. */
for (i = 0; i < n; i++) {
if (pthread_create(&workers[i], &attr, parallel_follower_work,
&pc->followers[i]) != 0) {
perror("pthread_create follower");
ret = 1;
pthread_attr_destroy(&attr);
fprintf(stderr,
"aborting: workers blocked on barrier (kill process)\n");
exit(1);
}
}
if (pthread_create(&sampler, &attr, parallel_sampler, &ps) != 0) {
perror("pthread_create sampler");
ret = 1;
pthread_attr_destroy(&attr);
fprintf(stderr,
"aborting: workers blocked on barrier (kill process)\n");
exit(1);
}
pthread_attr_destroy(&attr);
pthread_join(sampler, NULL);
for (i = 0; i < n; i++)
pthread_join(workers[i], NULL);
free(workers);
out_pc:
free(pc);
out_stats:
if (ps.stats_mu_inited)
pthread_mutex_destroy(&ps.stats_mu);
free(ps.stats);
free(ps.kf);
out_bar:
pthread_barrier_destroy(&ps.bar_after_followers);
pthread_barrier_destroy(&ps.bar_after_master);
return ret;
}
static int parse_cli(int argc, char **argv, struct cli *out)
{
static const struct option long_opts[] = {
{ "master", required_argument, NULL, 'm' },
{ "follower", required_argument, NULL, 'f' },
{ "cpu", required_argument, NULL, 'c' },
{ "period-ms", required_argument, NULL, 'p' },
{ "rt-priority", required_argument, NULL, 'P' },
{ "kp-ppm", required_argument, NULL, 'k' },
{ "max-step-us", required_argument, NULL, 's' },
{ "stats-interval", required_argument, NULL, 'u' },
{ "stats-rms-warn", required_argument, NULL, 'G' },
{ "kalman", no_argument, NULL, 'K' },
{ "parallel", no_argument, NULL, 'j' },
{ "help", no_argument, NULL, 'h' },
{ NULL, 0, NULL, 0 },
};
int ret = 1;
int c;
char **p;
memset(out, 0, sizeof(*out));
out->cpu = -1;
out->period_ns = 10L * 1000 * 1000;
out->rt_prio = 80;
out->kp_ppm = 100000;
out->max_step_us = 200;
for (;;) {
c = getopt_long(argc, argv, "m:f:c:p:P:k:s:u:G:Kjh", long_opts,
NULL);
if (c == -1)
break;
switch (c) {
case 'm':
out->master_path = optarg;
break;
case 'f':
p = realloc(out->follower_paths,
(out->n_followers + 1) * sizeof(*p));
if (!p) {
perror("realloc");
ret = -1;
goto out;
}
out->follower_paths = p;
out->follower_paths[out->n_followers++] = optarg;
break;
case 'c':
out->cpu = atoi(optarg);
break;
case 'p':
out->period_ns = atol(optarg) * 1000 * 1000;
if (out->period_ns <= 0) {
fprintf(stderr, "period-ms must be positive\n");
ret = -1;
goto out;
}
break;
case 'P':
out->rt_prio = (unsigned int)atoi(optarg);
if (out->rt_prio < 1 || out->rt_prio > 99) {
fprintf(stderr, "rt-priority must be 1-99\n");
ret = -1;
goto out;
}
break;
case 'k':
out->kp_ppm = (unsigned int)strtoul(optarg, NULL, 10);
break;
case 's':
out->max_step_us = strtoull(optarg, NULL, 10);
break;
case 'u':
out->stats_interval_sec = (unsigned int)strtoul(optarg, NULL, 10);
break;
case 'G':
out->stats_rms_warn_us = strtod(optarg, NULL);
if (out->stats_rms_warn_us < 0.0) {
fprintf(stderr, "stats-rms-warn must be >= 0\n");
ret = -1;
goto out;
}
break;
case 'K':
out->use_kalman = true;
break;
case 'j':
out->parallel = true;
break;
case 'h':
usage(argv[0]);
ret = 0;
goto out;
default:
usage(argv[0]);
ret = -1;
goto out;
}
}
if (ret != 1)
goto out;
if (!out->master_path || out->n_followers == 0) {
fprintf(stderr, "error: need -m and at least one -f\n");
usage(argv[0]);
ret = -1;
goto out;
}
if (optind != argc) {
fprintf(stderr, "error: unexpected arguments\n");
usage(argv[0]);
ret = -1;
goto out;
}
out:
return ret;
}
static void apply_rt_attributes(pthread_attr_t *attr, const struct cli *cli)
{
struct sched_param sp;
cpu_set_t cpuset;
pthread_attr_setinheritsched(attr, PTHREAD_EXPLICIT_SCHED);
pthread_attr_setschedpolicy(attr, SCHED_FIFO);
sp.sched_priority = cli->rt_prio;
pthread_attr_setschedparam(attr, &sp);
if (cli->cpu >= 0) {
CPU_ZERO(&cpuset);
CPU_SET((unsigned)cli->cpu, &cpuset);
pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset);
}
}
static int run_single(const struct cli *cli)
{
pthread_t th;
pthread_attr_t attr;
struct sync_ctx sctx = { .cli = cli };
struct follower_acc *stats = NULL;
struct kf1 *kf = NULL;
int ret = 0;
if (cli->stats_interval_sec) {
stats = calloc(cli->n_followers, sizeof(struct follower_acc));
if (!stats) {
perror("calloc stats");
ret = 1;
goto out;
}
}
if (cli->use_kalman) {
kf = calloc(cli->n_followers, sizeof(struct kf1));
if (!kf) {
perror("calloc kf");
ret = 1;
goto out;
}
}
sctx.stats = stats;
sctx.kf = kf;
memset(&sctx.last_stats_mono, 0, sizeof(sctx.last_stats_mono));
pthread_attr_init(&attr);
apply_rt_attributes(&attr, cli);
if (pthread_create(&th, &attr, sync_thread_single, &sctx) != 0) {
perror("pthread_create");
ret = 1;
} else {
pthread_join(th, NULL);
}
pthread_attr_destroy(&attr);
out:
free(kf);
free(stats);
return ret;
}
int main(int argc, char **argv)
{
struct cli cli;
int pr;
int ret = 0;
pr = parse_cli(argc, argv, &cli);
if (pr < 0) {
ret = 1;
goto out;
}
if (pr == 0) {
ret = 0;
goto out;
}
if (mlockall(MCL_CURRENT | MCL_FUTURE) < 0)
perror("mlockall (ignore if unprivileged)");
setup_shutdown_signals();
block_sigint_term_in_this_thread();
if (cli.parallel)
ret = run_parallel(&cli);
else
ret = run_single(&cli);
out:
free(cli.follower_paths);
return ret;
}