/* * FiWiTSF — TSF sync: RT scheduling + mac80211 debugfs "tsf" read/write. * * Build: make (or see Makefile) * * Requires: CONFIG_MAC80211_DEBUGFS, debugfs mounted, root or CAP_SYS_NICE for RT. * * Threading: default is one RT thread (round-robin followers) — lowest contention * on driver locks and simplest timing. Optional --parallel uses one sampler + * one RT thread per follower (see -j). * * SIGINT/SIGTERM stop the loop cleanly. Use -u to print periodic per-follower * error/step stats (TSF units are microseconds). Mean/variance use Welford's * method; optional --stats-rms-warn flags high spread (RMS error in µs). * Optional --kalman: 1D random-walk Kalman on measured error; same -k/-s caps * for fair A/B vs raw error (stats lines still use raw measured error). */ #define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define NSEC_PER_SEC 1000000000L /* * Default noise variances (µs²) for 1D random-walk + scalar measurement. * Tune for your link; these are a reasonable starting point for ~10 ms ticks. */ #define KF_DEFAULT_Q_PER_TICK 50.0 #define KF_DEFAULT_R_MEAS 4000.0 static volatile sig_atomic_t g_stop; struct kf1 { double x; /* posterior mean, µs */ double P; /* posterior variance, µs² */ int inited; }; struct follower_acc { unsigned n; double mean; /* Welford running mean (µs) */ double m2; /* Welford sum of squared deviations */ int64_t max_abs_err; int64_t max_abs_step; }; struct cli { const char *master_path; char **follower_paths; size_t n_followers; int cpu; /* -1 = no affinity */ long period_ns; unsigned int rt_prio; unsigned int kp_ppm; uint64_t max_step_us; bool parallel; unsigned int stats_interval_sec; /* 0 = off */ double stats_rms_warn_us; /* 0 = off; warn if RMS error exceeds this */ bool use_kalman; }; struct sync_ctx { const struct cli *cli; struct follower_acc *stats; /* NULL or n_followers entries */ struct kf1 *kf; /* NULL or n_followers; per-follower filter state */ struct timespec last_stats_mono; }; static void usage(const char *prog) { fprintf(stderr, "Usage: %s -m PATH [-f PATH]... [options]\n" " (repeat -f for each follower; at least one required)\n" "\n" " -m, --master PATH debugfs tsf file for master radio\n" " -f, --follower PATH debugfs tsf file for a follower (repeat)\n" " -c, --cpu N pin all RT threads to CPU N (-1 = none)\n" " -p, --period-ms N control loop period in ms (default 10)\n" " -P, --rt-priority N SCHED_FIFO priority 1-99 (default 80)\n" " -k, --kp-ppm N gain: apply (error * N / 1e6) per tick (default 100000 = 10%%)\n" " -s, --max-step-us N max |TSF step| per tick in microseconds (default 200)\n" " -u, --stats-interval N print per-follower stats every N seconds (0=off, default 0)\n" " -G, --stats-rms-warn N warn if error RMS exceeds N µs in a stats window (0=off)\n" " -K, --kalman filter measured TSF error (1D random walk; same -k/-s)\n" " -j, --parallel one RT thread per follower + master sampler thread\n" " -h, --help this text\n" "\n" "Example:\n" " %s -m /sys/kernel/debug/ieee80211/phy0/netdev:wlan0/tsf \\\n" " -f /sys/kernel/debug/ieee80211/phy1/netdev:wlan1/tsf\n", prog, prog); } static int read_tsf_hex(const char *path, uint64_t *out) { char buf[64]; int fd; ssize_t n; int ret = -1; fd = open(path, O_RDONLY); if (fd < 0) goto out; n = read(fd, buf, sizeof(buf) - 1); close(fd); if (n <= 0) goto out; buf[n] = '\0'; if (sscanf(buf, "0x%llx", (unsigned long long *)out) != 1) goto out; ret = 0; out: return ret; } static int write_tsf_decimal(const char *path, uint64_t tsf) { char buf[32]; int fd; int len; int ret = -1; len = snprintf(buf, sizeof(buf), "%llu\n", (unsigned long long)tsf); if (len < 0 || len >= (int)sizeof(buf)) goto out; fd = open(path, O_WRONLY); if (fd < 0) goto out; if (write(fd, buf, len) != len) { close(fd); goto out; } close(fd); ret = 0; out: return ret; } static void timespec_add_ns(struct timespec *ts, long ns) { ts->tv_nsec += ns; while (ts->tv_nsec >= NSEC_PER_SEC) { ts->tv_nsec -= NSEC_PER_SEC; ts->tv_sec++; } while (ts->tv_nsec < 0) { ts->tv_nsec += NSEC_PER_SEC; ts->tv_sec--; } } static void apply_rt_attributes(pthread_attr_t *attr, const struct cli *cli); static void on_signal(int signo) { (void)signo; g_stop = 1; } static void setup_shutdown_signals(void) { struct sigaction sa; g_stop = 0; memset(&sa, 0, sizeof(sa)); sa.sa_handler = on_signal; sigemptyset(&sa.sa_mask); if (sigaction(SIGINT, &sa, NULL) != 0 || sigaction(SIGTERM, &sa, NULL) != 0) perror("sigaction"); } static void block_sigint_term_in_this_thread(void) { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGINT); sigaddset(&set, SIGTERM); pthread_sigmask(SIG_BLOCK, &set, NULL); } static void unblock_sigint_term_in_this_thread(void) { sigset_t set; sigemptyset(&set); sigaddset(&set, SIGINT); sigaddset(&set, SIGTERM); pthread_sigmask(SIG_UNBLOCK, &set, NULL); } /* Welford's online mean / population variance (stable for windowed stats). */ static void welford_add(struct follower_acc *acc, double x) { double delta; double delta2; acc->n++; delta = x - acc->mean; acc->mean += delta / (double)acc->n; delta2 = x - acc->mean; acc->m2 += delta * delta2; } static double follower_var_pop(const struct follower_acc *a) { double v = 0.0; if (a->n != 0) v = a->m2 / (double)a->n; return v; } static void acc_add_sample(struct follower_acc *acc, int64_t err, int64_t step, pthread_mutex_t *mu) { int64_t ae = err >= 0 ? err : -err; int64_t as = step >= 0 ? step : -step; if (mu) pthread_mutex_lock(mu); welford_add(acc, (double)err); if (ae > acc->max_abs_err) acc->max_abs_err = ae; if (as > acc->max_abs_step) acc->max_abs_step = as; if (mu) pthread_mutex_unlock(mu); } static void stats_print_and_reset(FILE *fp, char **paths, size_t n, struct follower_acc *acc, double rms_warn_us) { size_t i; for (i = 0; i < n; i++) { struct follower_acc *a = &acc[i]; double var; double rms; if (a->n == 0) { fprintf(fp, "[stats] %s samples=0\n", paths[i]); continue; } var = follower_var_pop(a); if (var < 0.0) var = 0.0; rms = sqrt(var); fprintf(fp, "[stats] %s n=%u mean=%.1f var=%.1f rms=%.1f max|err|=%" PRId64 " max|step|=%" PRId64 " (µs; var µs²)\n", paths[i], a->n, a->mean, var, rms, a->max_abs_err, a->max_abs_step); if (rms_warn_us > 0.0 && rms > rms_warn_us) { fprintf(stderr, "[warn] %s error RMS %.1f µs exceeds threshold %.1f µs " "(population var %.1f µs²)\n", paths[i], rms, rms_warn_us, var); } memset(a, 0, sizeof(*a)); } } /* z = measured offset error (µs). Returns filtered estimate for control. */ static double kf1_update(struct kf1 *k, double z, double q, double r) { double p_pred; double Kgain; double xf; if (!k->inited) { k->x = z; k->P = r; k->inited = 1; xf = k->x; } else { p_pred = k->P + q; Kgain = p_pred / (p_pred + r); k->x += Kgain * (z - k->x); k->P = (1.0 - Kgain) * p_pred; xf = k->x; } return xf; } static void stats_maybe_print(const struct cli *cli, struct follower_acc *acc, struct timespec *last_mono, pthread_mutex_t *mu) { struct timespec now; if (!cli->stats_interval_sec || !acc) goto out; if (clock_gettime(CLOCK_MONOTONIC, &now) < 0) goto out; if (last_mono->tv_sec == 0 && last_mono->tv_nsec == 0) { *last_mono = now; goto out; } if ((unsigned long)(now.tv_sec - last_mono->tv_sec) < cli->stats_interval_sec) goto out; if (mu) pthread_mutex_lock(mu); stats_print_and_reset(stderr, cli->follower_paths, cli->n_followers, acc, cli->stats_rms_warn_us); if (mu) pthread_mutex_unlock(mu); *last_mono = now; out: return; } static void apply_correction(const struct cli *cli, uint64_t master, const char *slave_path, size_t idx, struct follower_acc *stats_all, pthread_mutex_t *stats_mu, struct kf1 *kf_states) { uint64_t slave; int64_t err; int64_t err_ctl; int64_t step; int64_t cap; uint64_t new_tsf; if (read_tsf_hex(slave_path, &slave) < 0) { fprintf(stderr, "read follower TSF failed: %s\n", slave_path); goto out; } err = (int64_t)(master - slave); if (cli->use_kalman && kf_states) err_ctl = (int64_t)llround(kf1_update(&kf_states[idx], (double)err, KF_DEFAULT_Q_PER_TICK, KF_DEFAULT_R_MEAS)); else err_ctl = err; step = (err_ctl * (int64_t)cli->kp_ppm) / 1000000; cap = (int64_t)cli->max_step_us; if (step > cap) step = cap; else if (step < -cap) step = -cap; if (step == 0 && err_ctl != 0) step = (err_ctl > 0) ? 1 : -1; new_tsf = slave + (uint64_t)step; if (write_tsf_decimal(slave_path, new_tsf) < 0) { fprintf(stderr, "set follower TSF failed: %s\n", slave_path); goto out; } if (stats_all) acc_add_sample(&stats_all[idx], err, step, stats_mu); out: return; } /* ---------- single RT thread (default) ---------- */ static void *sync_thread_single(void *arg) { struct sync_ctx *sctx = arg; const struct cli *cli = sctx->cli; struct timespec next; size_t i; void *rv = NULL; unblock_sigint_term_in_this_thread(); if (clock_gettime(CLOCK_MONOTONIC, &next) < 0) { perror("clock_gettime"); } else { for (;;) { uint64_t master; if (g_stop) break; if (read_tsf_hex(cli->master_path, &master) < 0) { fprintf(stderr, "read master TSF failed: %s\n", cli->master_path); } else if (!g_stop) { for (i = 0; i < cli->n_followers; i++) apply_correction(cli, master, cli->follower_paths[i], i, sctx->stats, NULL, sctx->kf); } stats_maybe_print(cli, sctx->stats, &sctx->last_stats_mono, NULL); if (g_stop) break; timespec_add_ns(&next, cli->period_ns); while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next, NULL) == EINTR) { if (g_stop) goto out_loop; } } out_loop:; } return rv; } /* ---------- parallel: sampler + one thread per follower ---------- */ struct parallel_shared { const struct cli *cli; uint64_t master_snap; pthread_barrier_t bar_after_master; pthread_barrier_t bar_after_followers; struct follower_acc *stats; pthread_mutex_t stats_mu; bool stats_mu_inited; struct timespec last_stats_mono; struct kf1 *kf; }; struct parallel_ctx; struct follower_slot { const char *path; struct parallel_ctx *owner; size_t index; }; struct parallel_ctx { const struct cli *cli; struct parallel_shared *ps; struct follower_slot followers[]; }; static void *parallel_sampler(void *arg) { struct parallel_shared *ps = arg; const struct cli *cli = ps->cli; struct timespec rel; unblock_sigint_term_in_this_thread(); rel.tv_sec = cli->period_ns / NSEC_PER_SEC; rel.tv_nsec = cli->period_ns % NSEC_PER_SEC; for (;;) { if (!g_stop) { if (read_tsf_hex(cli->master_path, &ps->master_snap) < 0) { fprintf(stderr, "read master TSF failed: %s\n", cli->master_path); ps->master_snap = 0; } } pthread_barrier_wait(&ps->bar_after_master); pthread_barrier_wait(&ps->bar_after_followers); stats_maybe_print(cli, ps->stats, &ps->last_stats_mono, ps->stats_mu_inited ? &ps->stats_mu : NULL); if (g_stop) break; while (clock_nanosleep(CLOCK_MONOTONIC, 0, &rel, NULL) == EINTR) { if (g_stop) goto out; } } out: return NULL; } static void *parallel_follower_work(void *arg) { struct follower_slot *fs = arg; struct parallel_ctx *pc = fs->owner; struct parallel_shared *ps = pc->ps; const struct cli *cli = pc->cli; struct timespec rel; rel.tv_sec = cli->period_ns / NSEC_PER_SEC; rel.tv_nsec = cli->period_ns % NSEC_PER_SEC; for (;;) { pthread_barrier_wait(&ps->bar_after_master); if (!g_stop) apply_correction(cli, ps->master_snap, fs->path, fs->index, ps->stats, ps->stats_mu_inited ? &ps->stats_mu : NULL, ps->kf); pthread_barrier_wait(&ps->bar_after_followers); if (g_stop) break; while (clock_nanosleep(CLOCK_MONOTONIC, 0, &rel, NULL) == EINTR) { if (g_stop) goto out; } } out: return NULL; } static int run_parallel(const struct cli *cli) { struct parallel_shared ps; struct parallel_ctx *pc = NULL; pthread_t sampler; pthread_t *workers = NULL; pthread_attr_t attr; size_t i; int ret = 0; size_t n = cli->n_followers; memset(&ps, 0, sizeof(ps)); ps.cli = cli; pthread_barrier_init(&ps.bar_after_master, NULL, (unsigned)(n + 1)); pthread_barrier_init(&ps.bar_after_followers, NULL, (unsigned)(n + 1)); if (cli->stats_interval_sec) { ps.stats = calloc(n, sizeof(struct follower_acc)); if (!ps.stats) { perror("calloc stats"); ret = 1; goto out_bar; } if (pthread_mutex_init(&ps.stats_mu, NULL) != 0) { perror("pthread_mutex_init"); free(ps.stats); ps.stats = NULL; ret = 1; goto out_bar; } ps.stats_mu_inited = true; } if (cli->use_kalman) { ps.kf = calloc(n, sizeof(struct kf1)); if (!ps.kf) { perror("calloc kf"); ret = 1; goto out_stats; } } pc = calloc(1, sizeof(*pc) + n * sizeof(struct follower_slot)); if (!pc) { ret = 1; goto out_stats; } pc->cli = cli; pc->ps = &ps; for (i = 0; i < n; i++) { pc->followers[i].path = cli->follower_paths[i]; pc->followers[i].owner = pc; pc->followers[i].index = i; } workers = calloc(n, sizeof(pthread_t)); if (!workers) { ret = 1; goto out_pc; } pthread_attr_init(&attr); apply_rt_attributes(&attr, cli); /* Start all followers first so they block on bar_after_master; then sampler. */ for (i = 0; i < n; i++) { if (pthread_create(&workers[i], &attr, parallel_follower_work, &pc->followers[i]) != 0) { perror("pthread_create follower"); ret = 1; pthread_attr_destroy(&attr); fprintf(stderr, "aborting: workers blocked on barrier (kill process)\n"); exit(1); } } if (pthread_create(&sampler, &attr, parallel_sampler, &ps) != 0) { perror("pthread_create sampler"); ret = 1; pthread_attr_destroy(&attr); fprintf(stderr, "aborting: workers blocked on barrier (kill process)\n"); exit(1); } pthread_attr_destroy(&attr); pthread_join(sampler, NULL); for (i = 0; i < n; i++) pthread_join(workers[i], NULL); free(workers); out_pc: free(pc); out_stats: if (ps.stats_mu_inited) pthread_mutex_destroy(&ps.stats_mu); free(ps.stats); free(ps.kf); out_bar: pthread_barrier_destroy(&ps.bar_after_followers); pthread_barrier_destroy(&ps.bar_after_master); return ret; } static int parse_cli(int argc, char **argv, struct cli *out) { static const struct option long_opts[] = { { "master", required_argument, NULL, 'm' }, { "follower", required_argument, NULL, 'f' }, { "cpu", required_argument, NULL, 'c' }, { "period-ms", required_argument, NULL, 'p' }, { "rt-priority", required_argument, NULL, 'P' }, { "kp-ppm", required_argument, NULL, 'k' }, { "max-step-us", required_argument, NULL, 's' }, { "stats-interval", required_argument, NULL, 'u' }, { "stats-rms-warn", required_argument, NULL, 'G' }, { "kalman", no_argument, NULL, 'K' }, { "parallel", no_argument, NULL, 'j' }, { "help", no_argument, NULL, 'h' }, { NULL, 0, NULL, 0 }, }; int ret = 1; int c; char **p; memset(out, 0, sizeof(*out)); out->cpu = -1; out->period_ns = 10L * 1000 * 1000; out->rt_prio = 80; out->kp_ppm = 100000; out->max_step_us = 200; for (;;) { c = getopt_long(argc, argv, "m:f:c:p:P:k:s:u:G:Kjh", long_opts, NULL); if (c == -1) break; switch (c) { case 'm': out->master_path = optarg; break; case 'f': p = realloc(out->follower_paths, (out->n_followers + 1) * sizeof(*p)); if (!p) { perror("realloc"); ret = -1; goto out; } out->follower_paths = p; out->follower_paths[out->n_followers++] = optarg; break; case 'c': out->cpu = atoi(optarg); break; case 'p': out->period_ns = atol(optarg) * 1000 * 1000; if (out->period_ns <= 0) { fprintf(stderr, "period-ms must be positive\n"); ret = -1; goto out; } break; case 'P': out->rt_prio = (unsigned int)atoi(optarg); if (out->rt_prio < 1 || out->rt_prio > 99) { fprintf(stderr, "rt-priority must be 1-99\n"); ret = -1; goto out; } break; case 'k': out->kp_ppm = (unsigned int)strtoul(optarg, NULL, 10); break; case 's': out->max_step_us = strtoull(optarg, NULL, 10); break; case 'u': out->stats_interval_sec = (unsigned int)strtoul(optarg, NULL, 10); break; case 'G': out->stats_rms_warn_us = strtod(optarg, NULL); if (out->stats_rms_warn_us < 0.0) { fprintf(stderr, "stats-rms-warn must be >= 0\n"); ret = -1; goto out; } break; case 'K': out->use_kalman = true; break; case 'j': out->parallel = true; break; case 'h': usage(argv[0]); ret = 0; goto out; default: usage(argv[0]); ret = -1; goto out; } } if (ret != 1) goto out; if (!out->master_path || out->n_followers == 0) { fprintf(stderr, "error: need -m and at least one -f\n"); usage(argv[0]); ret = -1; goto out; } if (optind != argc) { fprintf(stderr, "error: unexpected arguments\n"); usage(argv[0]); ret = -1; goto out; } out: return ret; } static void apply_rt_attributes(pthread_attr_t *attr, const struct cli *cli) { struct sched_param sp; cpu_set_t cpuset; pthread_attr_setinheritsched(attr, PTHREAD_EXPLICIT_SCHED); pthread_attr_setschedpolicy(attr, SCHED_FIFO); sp.sched_priority = cli->rt_prio; pthread_attr_setschedparam(attr, &sp); if (cli->cpu >= 0) { CPU_ZERO(&cpuset); CPU_SET((unsigned)cli->cpu, &cpuset); pthread_attr_setaffinity_np(attr, sizeof(cpuset), &cpuset); } } static int run_single(const struct cli *cli) { pthread_t th; pthread_attr_t attr; struct sync_ctx sctx = { .cli = cli }; struct follower_acc *stats = NULL; struct kf1 *kf = NULL; int ret = 0; if (cli->stats_interval_sec) { stats = calloc(cli->n_followers, sizeof(struct follower_acc)); if (!stats) { perror("calloc stats"); ret = 1; goto out; } } if (cli->use_kalman) { kf = calloc(cli->n_followers, sizeof(struct kf1)); if (!kf) { perror("calloc kf"); ret = 1; goto out; } } sctx.stats = stats; sctx.kf = kf; memset(&sctx.last_stats_mono, 0, sizeof(sctx.last_stats_mono)); pthread_attr_init(&attr); apply_rt_attributes(&attr, cli); if (pthread_create(&th, &attr, sync_thread_single, &sctx) != 0) { perror("pthread_create"); ret = 1; } else { pthread_join(th, NULL); } pthread_attr_destroy(&attr); out: free(kf); free(stats); return ret; } int main(int argc, char **argv) { struct cli cli; int pr; int ret = 0; pr = parse_cli(argc, argv, &cli); if (pr < 0) { ret = 1; goto out; } if (pr == 0) { ret = 0; goto out; } if (mlockall(MCL_CURRENT | MCL_FUTURE) < 0) perror("mlockall (ignore if unprivileged)"); setup_shutdown_signals(); block_sigint_term_in_this_thread(); if (cli.parallel) ret = run_parallel(&cli); else ret = run_single(&cli); out: free(cli.follower_paths); return ret; }