feat: init commit with main func
This commit is contained in:
237
main.go
Normal file
237
main.go
Normal file
@@ -0,0 +1,237 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
||||
Level: slog.LevelInfo,
|
||||
})))
|
||||
|
||||
cfg, err := LoadConfig()
|
||||
if err != nil {
|
||||
slog.Error("configuration error", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
slog.Info("traefik-dns-watcher starting",
|
||||
"traefik_url", cfg.TraefikURL,
|
||||
"zones", cfg.Zones,
|
||||
"repo_path", cfg.RepoPath,
|
||||
"dynamic_dir", cfg.DynamicDir,
|
||||
"reconcile_interval", cfg.ReconcileInterval,
|
||||
"debounce_delay", cfg.DebounceDelay,
|
||||
)
|
||||
|
||||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
|
||||
defer stop()
|
||||
|
||||
// triggerCh is a 1-buffered channel that acts as a coalescing "reconcile needed" signal.
|
||||
// Sending never blocks: if a trigger is already pending, the new one is silently dropped.
|
||||
triggerCh := make(chan struct{}, 1)
|
||||
maybeTrigger := func() {
|
||||
select {
|
||||
case triggerCh <- struct{}{}:
|
||||
default: // a trigger is already queued
|
||||
}
|
||||
}
|
||||
|
||||
// Debounce goroutine: absorbs rapid bursts of triggers and fires a single reconcile
|
||||
// after the quiet period defined by cfg.DebounceDelay.
|
||||
go func() {
|
||||
timer := time.NewTimer(0)
|
||||
// Drain the initial zero tick so the timer starts in a stopped state.
|
||||
if !timer.Stop() {
|
||||
<-timer.C
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-triggerCh:
|
||||
// Reset the timer on every incoming trigger — extends the quiet window.
|
||||
if !timer.Stop() {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
timer.Reset(cfg.DebounceDelay)
|
||||
case <-timer.C:
|
||||
go Reconcile(cfg)
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Periodic ticker: ensures DNS state is reconciled even when Docker events are silent.
|
||||
go func() {
|
||||
ticker := time.NewTicker(cfg.ReconcileInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
maybeTrigger()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Docker events watcher: subscribes to container lifecycle events and uses them
|
||||
// as low-latency triggers for reconciliation.
|
||||
go watchDockerEvents(ctx, maybeTrigger)
|
||||
|
||||
// Perform an immediate reconcile at startup to bring DNS into sync.
|
||||
go Reconcile(cfg)
|
||||
|
||||
<-ctx.Done()
|
||||
slog.Info("shutdown signal received")
|
||||
|
||||
// Wait for any in-flight reconcile to finish before exiting.
|
||||
reconcileMu.Lock()
|
||||
reconcileMu.Unlock() //nolint:staticcheck
|
||||
slog.Info("traefik-dns-watcher stopped")
|
||||
}
|
||||
|
||||
// watchDockerEvents connects to the Docker daemon and forwards container lifecycle
|
||||
// events to the trigger function. Reconnects with exponential backoff on failure.
|
||||
func watchDockerEvents(ctx context.Context, trigger func()) {
|
||||
backoff := 2 * time.Second
|
||||
const maxBackoff = 60 * time.Second
|
||||
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
err := runDockerEventLoop(ctx, trigger)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
slog.Warn("docker events stream ended — reconnecting",
|
||||
"error", err, "backoff", backoff)
|
||||
select {
|
||||
case <-time.After(backoff):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dockerEvent is the minimal structure we need from the Docker events JSON stream.
|
||||
type dockerEvent struct {
|
||||
Type string `json:"Type"`
|
||||
Action string `json:"Action"`
|
||||
Actor struct {
|
||||
ID string `json:"ID"`
|
||||
} `json:"Actor"`
|
||||
}
|
||||
|
||||
// runDockerEventLoop opens a single Docker events HTTP stream (raw, no SDK) and
|
||||
// forwards relevant container events to the trigger function.
|
||||
// Supports both Unix socket (unix:///var/run/docker.sock) and TCP (tcp://host:port)
|
||||
// via the standard DOCKER_HOST environment variable.
|
||||
func runDockerEventLoop(ctx context.Context, trigger func()) error {
|
||||
httpClient, baseURL, err := newDockerHTTPClient()
|
||||
if err != nil {
|
||||
return fmt.Errorf("build docker HTTP client: %w", err)
|
||||
}
|
||||
|
||||
filterVal := `{"type":["container"]}`
|
||||
eventsURL := baseURL + "/events?filters=" + url.QueryEscape(filterVal)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, eventsURL, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("build docker events request: %w", err)
|
||||
}
|
||||
|
||||
resp, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect to docker events: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("docker events returned HTTP %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
slog.Info("connected to docker events stream")
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
var evt dockerEvent
|
||||
if err := json.Unmarshal(line, &evt); err != nil {
|
||||
slog.Debug("failed to parse docker event", "error", err)
|
||||
continue
|
||||
}
|
||||
if evt.Type != "container" {
|
||||
continue
|
||||
}
|
||||
switch evt.Action {
|
||||
case "start", "stop", "die", "destroy":
|
||||
actorID := evt.Actor.ID
|
||||
if len(actorID) > 12 {
|
||||
actorID = actorID[:12]
|
||||
}
|
||||
slog.Debug("docker event received", "action", evt.Action, "id", actorID)
|
||||
trigger()
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return fmt.Errorf("docker events stream error: %w", err)
|
||||
}
|
||||
return fmt.Errorf("docker events stream closed")
|
||||
}
|
||||
|
||||
// newDockerHTTPClient builds an HTTP client and base URL for the Docker daemon.
|
||||
// Reads DOCKER_HOST from the environment (default: unix:///var/run/docker.sock).
|
||||
// For Unix sockets the HTTP transport dials the socket; the placeholder host "docker"
|
||||
// is used in the URL (standard practice for Unix socket HTTP clients).
|
||||
func newDockerHTTPClient() (*http.Client, string, error) {
|
||||
dockerHost := os.Getenv("DOCKER_HOST")
|
||||
if dockerHost == "" {
|
||||
dockerHost = "unix:///var/run/docker.sock"
|
||||
}
|
||||
|
||||
switch {
|
||||
case strings.HasPrefix(dockerHost, "unix://"):
|
||||
socketPath := strings.TrimPrefix(dockerHost, "unix://")
|
||||
transport := &http.Transport{
|
||||
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
|
||||
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
|
||||
},
|
||||
}
|
||||
return &http.Client{Transport: transport}, "http://docker", nil
|
||||
|
||||
case strings.HasPrefix(dockerHost, "tcp://"):
|
||||
baseURL := strings.Replace(dockerHost, "tcp://", "http://", 1)
|
||||
return &http.Client{}, baseURL, nil
|
||||
|
||||
default:
|
||||
return nil, "", fmt.Errorf("unsupported DOCKER_HOST scheme: %q (expected unix:// or tcp://)", dockerHost)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user