package main import ( "bufio" "context" "encoding/json" "fmt" "log/slog" "net" "net/http" "net/url" "os" "os/signal" "strconv" "strings" "syscall" "time" ) func main() { if maybeHandleGitAskpass() { return } slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, }))) cfg, err := LoadConfig() if err != nil { slog.Error("configuration error", "error", err) os.Exit(1) } slog.Info("traefik-dns-watcher starting", "traefik_url", cfg.TraefikURL, "zones", cfg.Zones, "repo_path", cfg.RepoPath, "dynamic_dir", cfg.DynamicDir, "git_https_token_enabled", cfg.GitAuthToken != "", "git_auth_username", cfg.GitAuthUsername, "reconcile_interval", cfg.ReconcileInterval, "debounce_delay", cfg.DebounceDelay, "cf_auto_ttl", cfg.CloudflareAutoTTL, ) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() // triggerCh is a 1-buffered channel that acts as a coalescing "reconcile needed" signal. // Sending never blocks: if a trigger is already pending, the new one is silently dropped. triggerCh := make(chan struct{}, 1) maybeTrigger := func() { select { case triggerCh <- struct{}{}: default: // a trigger is already queued } } // Debounce goroutine: absorbs rapid bursts of triggers and fires a single reconcile // after the quiet period defined by cfg.DebounceDelay. go func() { timer := time.NewTimer(0) // Drain the initial zero tick so the timer starts in a stopped state. if !timer.Stop() { <-timer.C } for { select { case <-triggerCh: // Reset the timer on every incoming trigger — extends the quiet window. if !timer.Stop() { select { case <-timer.C: default: } } timer.Reset(cfg.DebounceDelay) case <-timer.C: go Reconcile(cfg) case <-ctx.Done(): timer.Stop() return } } }() // Periodic ticker: ensures DNS state is reconciled even when Docker events are silent. go func() { ticker := time.NewTicker(cfg.ReconcileInterval) defer ticker.Stop() for { select { case <-ticker.C: maybeTrigger() case <-ctx.Done(): return } } }() // Docker events watcher: subscribes to container lifecycle events and uses them // as low-latency triggers for reconciliation. go watchDockerEvents(ctx, maybeTrigger) // Perform an immediate reconcile at startup to bring DNS into sync. go Reconcile(cfg) <-ctx.Done() slog.Info("shutdown signal received") // Wait for any in-flight reconcile to finish before exiting. reconcileMu.Lock() reconcileMu.Unlock() //nolint:staticcheck slog.Info("traefik-dns-watcher stopped") } // maybeHandleGitAskpass serves username/password for git HTTPS auth in non-interactive mode. // This process mode is only enabled for git child processes that set TDW_GIT_ASKPASS=1. func maybeHandleGitAskpass() bool { enabled, _ := strconv.ParseBool(os.Getenv("TDW_GIT_ASKPASS")) if !enabled { return false } prompt := "" if len(os.Args) > 1 { prompt = strings.ToLower(os.Args[1]) } if strings.Contains(prompt, "username") { fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_USERNAME")) return true } // For password/token prompts, return token by default. fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_TOKEN")) return true } // watchDockerEvents connects to the Docker daemon and forwards container lifecycle // events to the trigger function. Reconnects with exponential backoff on failure. func watchDockerEvents(ctx context.Context, trigger func()) { backoff := 2 * time.Second const maxBackoff = 60 * time.Second for { if ctx.Err() != nil { return } err := runDockerEventLoop(ctx, trigger) if ctx.Err() != nil { return } slog.Warn("docker events stream ended — reconnecting", "error", err, "backoff", backoff) select { case <-time.After(backoff): case <-ctx.Done(): return } if backoff < maxBackoff { backoff *= 2 if backoff > maxBackoff { backoff = maxBackoff } } } } // dockerEvent is the minimal structure we need from the Docker events JSON stream. type dockerEvent struct { Type string `json:"Type"` Action string `json:"Action"` Actor struct { ID string `json:"ID"` } `json:"Actor"` } // runDockerEventLoop opens a single Docker events HTTP stream (raw, no SDK) and // forwards relevant container events to the trigger function. // Supports both Unix socket (unix:///var/run/docker.sock) and TCP (tcp://host:port) // via the standard DOCKER_HOST environment variable. func runDockerEventLoop(ctx context.Context, trigger func()) error { httpClient, baseURL, err := newDockerHTTPClient() if err != nil { return fmt.Errorf("build docker HTTP client: %w", err) } filterVal := `{"type":["container"]}` eventsURL := baseURL + "/events?filters=" + url.QueryEscape(filterVal) req, err := http.NewRequestWithContext(ctx, http.MethodGet, eventsURL, nil) if err != nil { return fmt.Errorf("build docker events request: %w", err) } resp, err := httpClient.Do(req) if err != nil { return fmt.Errorf("connect to docker events: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("docker events returned HTTP %d", resp.StatusCode) } slog.Info("connected to docker events stream") scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Bytes() if len(line) == 0 { continue } var evt dockerEvent if err := json.Unmarshal(line, &evt); err != nil { slog.Debug("failed to parse docker event", "error", err) continue } if evt.Type != "container" { continue } switch evt.Action { case "start", "stop", "die", "destroy": actorID := evt.Actor.ID if len(actorID) > 12 { actorID = actorID[:12] } slog.Debug("docker event received", "action", evt.Action, "id", actorID) trigger() } } if err := scanner.Err(); err != nil { return fmt.Errorf("docker events stream error: %w", err) } return fmt.Errorf("docker events stream closed") } // newDockerHTTPClient builds an HTTP client and base URL for the Docker daemon. // Reads DOCKER_HOST from the environment (default: unix:///var/run/docker.sock). // For Unix sockets the HTTP transport dials the socket; the placeholder host "docker" // is used in the URL (standard practice for Unix socket HTTP clients). func newDockerHTTPClient() (*http.Client, string, error) { dockerHost := os.Getenv("DOCKER_HOST") if dockerHost == "" { dockerHost = "unix:///var/run/docker.sock" } switch { case strings.HasPrefix(dockerHost, "unix://"): socketPath := strings.TrimPrefix(dockerHost, "unix://") transport := &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, "unix", socketPath) }, } return &http.Client{Transport: transport}, "http://docker", nil case strings.HasPrefix(dockerHost, "tcp://"): baseURL := strings.Replace(dockerHost, "tcp://", "http://", 1) return &http.Client{}, baseURL, nil default: return nil, "", fmt.Errorf("unsupported DOCKER_HOST scheme: %q (expected unix:// or tcp://)", dockerHost) } }