Files
traefik-dns-watcher/main.go

299 lines
7.9 KiB
Go

package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
)
func main() {
if maybeHandleGitAskpass() {
return
}
cfg, err := LoadConfig()
if err != nil {
fmt.Fprintln(os.Stderr, "configuration error:", err)
os.Exit(1)
}
logLevel, err := parseLogLevel(cfg.LogLevel)
if err != nil {
fmt.Fprintln(os.Stderr, "configuration error:", err)
os.Exit(1)
}
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: logLevel,
})))
slog.Info("traefik-dns-watcher starting",
"traefik_url", cfg.TraefikURL,
"zones", cfg.Zones,
"repo_path", cfg.RepoPath,
"dynamic_dir", cfg.DynamicDir,
"log_level", cfg.LogLevel,
"git_https_token_enabled", cfg.GitAuthToken != "",
"git_auth_username", cfg.GitAuthUsername,
"reconcile_interval", cfg.ReconcileInterval,
"debounce_delay", cfg.DebounceDelay,
"cf_auto_ttl", cfg.CloudflareAutoTTL,
)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
// triggerCh is a 1-buffered channel that acts as a coalescing "reconcile needed" signal.
// Sending never blocks: if a trigger is already pending, the new one is silently dropped.
triggerCh := make(chan struct{}, 1)
maybeTrigger := func() {
select {
case triggerCh <- struct{}{}:
default: // a trigger is already queued
}
}
// Debounce goroutine: absorbs rapid bursts of triggers and fires a single reconcile
// after the quiet period defined by cfg.DebounceDelay.
go func() {
timer := time.NewTimer(0)
// Drain the initial zero tick so the timer starts in a stopped state.
if !timer.Stop() {
<-timer.C
}
for {
select {
case <-triggerCh:
// Reset the timer on every incoming trigger — extends the quiet window.
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(cfg.DebounceDelay)
case <-timer.C:
go Reconcile(cfg)
case <-ctx.Done():
timer.Stop()
return
}
}
}()
// Periodic ticker: ensures DNS state is reconciled even when Docker events are silent.
go func() {
ticker := time.NewTicker(cfg.ReconcileInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
maybeTrigger()
case <-ctx.Done():
return
}
}
}()
// Docker events watcher: subscribes to container lifecycle events and uses them
// as low-latency triggers for reconciliation.
go watchDockerEvents(ctx, maybeTrigger)
// Perform an immediate reconcile at startup to bring DNS into sync.
go Reconcile(cfg)
<-ctx.Done()
slog.Info("shutdown signal received")
// Wait for any in-flight reconcile to finish before exiting.
reconcileMu.Lock()
reconcileMu.Unlock() //nolint:staticcheck
slog.Info("traefik-dns-watcher stopped")
}
func parseLogLevel(v string) (slog.Level, error) {
switch strings.ToLower(v) {
case "debug":
return slog.LevelDebug, nil
case "info":
return slog.LevelInfo, nil
case "warn":
return slog.LevelWarn, nil
case "error":
return slog.LevelError, nil
default:
return 0, fmt.Errorf("LOG_LEVEL: invalid value %q (allowed: debug, info, warn, error)", v)
}
}
// maybeHandleGitAskpass serves username/password for git HTTPS auth in non-interactive mode.
// This process mode is only enabled for git child processes that set TDW_GIT_ASKPASS=1.
func maybeHandleGitAskpass() bool {
enabled, _ := strconv.ParseBool(os.Getenv("TDW_GIT_ASKPASS"))
if !enabled {
return false
}
prompt := ""
if len(os.Args) > 1 {
prompt = strings.ToLower(os.Args[1])
}
if strings.Contains(prompt, "username") {
fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_USERNAME"))
return true
}
// For password/token prompts, return token by default.
fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_TOKEN"))
return true
}
// watchDockerEvents connects to the Docker daemon and forwards container lifecycle
// events to the trigger function. Reconnects with exponential backoff on failure.
func watchDockerEvents(ctx context.Context, trigger func()) {
backoff := 2 * time.Second
const maxBackoff = 60 * time.Second
for {
if ctx.Err() != nil {
return
}
err := runDockerEventLoop(ctx, trigger)
if ctx.Err() != nil {
return
}
slog.Warn("docker events stream ended — reconnecting",
"error", err, "backoff", backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
// dockerEvent is the minimal structure we need from the Docker events JSON stream.
type dockerEvent struct {
Type string `json:"Type"`
Action string `json:"Action"`
Actor struct {
ID string `json:"ID"`
Attributes map[string]string `json:"Attributes"`
} `json:"Actor"`
}
// runDockerEventLoop opens a single Docker events HTTP stream (raw, no SDK) and
// forwards relevant container events to the trigger function.
// Supports both Unix socket (unix:///var/run/docker.sock) and TCP (tcp://host:port)
// via the standard DOCKER_HOST environment variable.
func runDockerEventLoop(ctx context.Context, trigger func()) error {
httpClient, baseURL, err := newDockerHTTPClient()
if err != nil {
return fmt.Errorf("build docker HTTP client: %w", err)
}
filterVal := `{"type":["container"],"event":["start","stop","die","destroy"]}`
eventsURL := baseURL + "/events?filters=" + url.QueryEscape(filterVal)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, eventsURL, nil)
if err != nil {
return fmt.Errorf("build docker events request: %w", err)
}
resp, err := httpClient.Do(req)
if err != nil {
return fmt.Errorf("connect to docker events: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("docker events returned HTTP %d", resp.StatusCode)
}
slog.Info("connected to docker events stream")
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var evt dockerEvent
if err := json.Unmarshal(line, &evt); err != nil {
slog.Debug("failed to parse docker event", "error", err)
continue
}
if evt.Type != "container" {
continue
}
// Docker may append extra details after ":" for some event kinds.
// We only care about the base action token.
action := evt.Action
if idx := strings.Index(action, ":"); idx >= 0 {
action = strings.TrimSpace(action[:idx])
}
switch action {
case "start", "stop", "die", "destroy":
actorID := evt.Actor.ID
if len(actorID) > 12 {
actorID = actorID[:12]
}
slog.Debug("docker event received", "action", action, "id", actorID)
trigger()
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("docker events stream error: %w", err)
}
return fmt.Errorf("docker events stream closed")
}
// newDockerHTTPClient builds an HTTP client and base URL for the Docker daemon.
// Reads DOCKER_HOST from the environment (default: unix:///var/run/docker.sock).
// For Unix sockets the HTTP transport dials the socket; the placeholder host "docker"
// is used in the URL (standard practice for Unix socket HTTP clients).
func newDockerHTTPClient() (*http.Client, string, error) {
dockerHost := os.Getenv("DOCKER_HOST")
if dockerHost == "" {
dockerHost = "unix:///var/run/docker.sock"
}
switch {
case strings.HasPrefix(dockerHost, "unix://"):
socketPath := strings.TrimPrefix(dockerHost, "unix://")
transport := &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
},
}
return &http.Client{Transport: transport}, "http://docker", nil
case strings.HasPrefix(dockerHost, "tcp://"):
baseURL := strings.Replace(dockerHost, "tcp://", "http://", 1)
return &http.Client{}, baseURL, nil
default:
return nil, "", fmt.Errorf("unsupported DOCKER_HOST scheme: %q (expected unix:// or tcp://)", dockerHost)
}
}