299 lines
7.9 KiB
Go
299 lines
7.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"strconv"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
)
|
|
|
|
func main() {
|
|
if maybeHandleGitAskpass() {
|
|
return
|
|
}
|
|
|
|
cfg, err := LoadConfig()
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, "configuration error:", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
logLevel, err := parseLogLevel(cfg.LogLevel)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, "configuration error:", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: logLevel,
|
|
})))
|
|
|
|
slog.Info("traefik-dns-watcher starting",
|
|
"traefik_url", cfg.TraefikURL,
|
|
"zones", cfg.Zones,
|
|
"repo_path", cfg.RepoPath,
|
|
"dynamic_dir", cfg.DynamicDir,
|
|
"log_level", cfg.LogLevel,
|
|
"git_https_token_enabled", cfg.GitAuthToken != "",
|
|
"git_auth_username", cfg.GitAuthUsername,
|
|
"reconcile_interval", cfg.ReconcileInterval,
|
|
"debounce_delay", cfg.DebounceDelay,
|
|
"cf_auto_ttl", cfg.CloudflareAutoTTL,
|
|
)
|
|
|
|
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
|
|
defer stop()
|
|
|
|
// triggerCh is a 1-buffered channel that acts as a coalescing "reconcile needed" signal.
|
|
// Sending never blocks: if a trigger is already pending, the new one is silently dropped.
|
|
triggerCh := make(chan struct{}, 1)
|
|
maybeTrigger := func() {
|
|
select {
|
|
case triggerCh <- struct{}{}:
|
|
default: // a trigger is already queued
|
|
}
|
|
}
|
|
|
|
// Debounce goroutine: absorbs rapid bursts of triggers and fires a single reconcile
|
|
// after the quiet period defined by cfg.DebounceDelay.
|
|
go func() {
|
|
timer := time.NewTimer(0)
|
|
// Drain the initial zero tick so the timer starts in a stopped state.
|
|
if !timer.Stop() {
|
|
<-timer.C
|
|
}
|
|
for {
|
|
select {
|
|
case <-triggerCh:
|
|
// Reset the timer on every incoming trigger — extends the quiet window.
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer.Reset(cfg.DebounceDelay)
|
|
case <-timer.C:
|
|
go Reconcile(cfg)
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Periodic ticker: ensures DNS state is reconciled even when Docker events are silent.
|
|
go func() {
|
|
ticker := time.NewTicker(cfg.ReconcileInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
maybeTrigger()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Docker events watcher: subscribes to container lifecycle events and uses them
|
|
// as low-latency triggers for reconciliation.
|
|
go watchDockerEvents(ctx, maybeTrigger)
|
|
|
|
// Perform an immediate reconcile at startup to bring DNS into sync.
|
|
go Reconcile(cfg)
|
|
|
|
<-ctx.Done()
|
|
slog.Info("shutdown signal received")
|
|
|
|
// Wait for any in-flight reconcile to finish before exiting.
|
|
reconcileMu.Lock()
|
|
reconcileMu.Unlock() //nolint:staticcheck
|
|
slog.Info("traefik-dns-watcher stopped")
|
|
}
|
|
|
|
func parseLogLevel(v string) (slog.Level, error) {
|
|
switch strings.ToLower(v) {
|
|
case "debug":
|
|
return slog.LevelDebug, nil
|
|
case "info":
|
|
return slog.LevelInfo, nil
|
|
case "warn":
|
|
return slog.LevelWarn, nil
|
|
case "error":
|
|
return slog.LevelError, nil
|
|
default:
|
|
return 0, fmt.Errorf("LOG_LEVEL: invalid value %q (allowed: debug, info, warn, error)", v)
|
|
}
|
|
}
|
|
|
|
// maybeHandleGitAskpass serves username/password for git HTTPS auth in non-interactive mode.
|
|
// This process mode is only enabled for git child processes that set TDW_GIT_ASKPASS=1.
|
|
func maybeHandleGitAskpass() bool {
|
|
enabled, _ := strconv.ParseBool(os.Getenv("TDW_GIT_ASKPASS"))
|
|
if !enabled {
|
|
return false
|
|
}
|
|
|
|
prompt := ""
|
|
if len(os.Args) > 1 {
|
|
prompt = strings.ToLower(os.Args[1])
|
|
}
|
|
|
|
if strings.Contains(prompt, "username") {
|
|
fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_USERNAME"))
|
|
return true
|
|
}
|
|
|
|
// For password/token prompts, return token by default.
|
|
fmt.Fprint(os.Stdout, os.Getenv("GIT_AUTH_TOKEN"))
|
|
return true
|
|
}
|
|
|
|
// watchDockerEvents connects to the Docker daemon and forwards container lifecycle
|
|
// events to the trigger function. Reconnects with exponential backoff on failure.
|
|
func watchDockerEvents(ctx context.Context, trigger func()) {
|
|
backoff := 2 * time.Second
|
|
const maxBackoff = 60 * time.Second
|
|
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
err := runDockerEventLoop(ctx, trigger)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
slog.Warn("docker events stream ended — reconnecting",
|
|
"error", err, "backoff", backoff)
|
|
select {
|
|
case <-time.After(backoff):
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
if backoff < maxBackoff {
|
|
backoff *= 2
|
|
if backoff > maxBackoff {
|
|
backoff = maxBackoff
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// dockerEvent is the minimal structure we need from the Docker events JSON stream.
|
|
type dockerEvent struct {
|
|
Type string `json:"Type"`
|
|
Action string `json:"Action"`
|
|
Actor struct {
|
|
ID string `json:"ID"`
|
|
Attributes map[string]string `json:"Attributes"`
|
|
} `json:"Actor"`
|
|
}
|
|
|
|
// runDockerEventLoop opens a single Docker events HTTP stream (raw, no SDK) and
|
|
// forwards relevant container events to the trigger function.
|
|
// Supports both Unix socket (unix:///var/run/docker.sock) and TCP (tcp://host:port)
|
|
// via the standard DOCKER_HOST environment variable.
|
|
func runDockerEventLoop(ctx context.Context, trigger func()) error {
|
|
httpClient, baseURL, err := newDockerHTTPClient()
|
|
if err != nil {
|
|
return fmt.Errorf("build docker HTTP client: %w", err)
|
|
}
|
|
|
|
filterVal := `{"type":["container"],"event":["start","stop","die","destroy"]}`
|
|
eventsURL := baseURL + "/events?filters=" + url.QueryEscape(filterVal)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, eventsURL, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("build docker events request: %w", err)
|
|
}
|
|
|
|
resp, err := httpClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("connect to docker events: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("docker events returned HTTP %d", resp.StatusCode)
|
|
}
|
|
|
|
slog.Info("connected to docker events stream")
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
var evt dockerEvent
|
|
if err := json.Unmarshal(line, &evt); err != nil {
|
|
slog.Debug("failed to parse docker event", "error", err)
|
|
continue
|
|
}
|
|
if evt.Type != "container" {
|
|
continue
|
|
}
|
|
// Docker may append extra details after ":" for some event kinds.
|
|
// We only care about the base action token.
|
|
action := evt.Action
|
|
if idx := strings.Index(action, ":"); idx >= 0 {
|
|
action = strings.TrimSpace(action[:idx])
|
|
}
|
|
|
|
switch action {
|
|
case "start", "stop", "die", "destroy":
|
|
actorID := evt.Actor.ID
|
|
if len(actorID) > 12 {
|
|
actorID = actorID[:12]
|
|
}
|
|
slog.Debug("docker event received", "action", action, "id", actorID)
|
|
trigger()
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return fmt.Errorf("docker events stream error: %w", err)
|
|
}
|
|
return fmt.Errorf("docker events stream closed")
|
|
}
|
|
|
|
// newDockerHTTPClient builds an HTTP client and base URL for the Docker daemon.
|
|
// Reads DOCKER_HOST from the environment (default: unix:///var/run/docker.sock).
|
|
// For Unix sockets the HTTP transport dials the socket; the placeholder host "docker"
|
|
// is used in the URL (standard practice for Unix socket HTTP clients).
|
|
func newDockerHTTPClient() (*http.Client, string, error) {
|
|
dockerHost := os.Getenv("DOCKER_HOST")
|
|
if dockerHost == "" {
|
|
dockerHost = "unix:///var/run/docker.sock"
|
|
}
|
|
|
|
switch {
|
|
case strings.HasPrefix(dockerHost, "unix://"):
|
|
socketPath := strings.TrimPrefix(dockerHost, "unix://")
|
|
transport := &http.Transport{
|
|
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
|
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
|
|
},
|
|
}
|
|
return &http.Client{Transport: transport}, "http://docker", nil
|
|
|
|
case strings.HasPrefix(dockerHost, "tcp://"):
|
|
baseURL := strings.Replace(dockerHost, "tcp://", "http://", 1)
|
|
return &http.Client{}, baseURL, nil
|
|
|
|
default:
|
|
return nil, "", fmt.Errorf("unsupported DOCKER_HOST scheme: %q (expected unix:// or tcp://)", dockerHost)
|
|
}
|
|
}
|