Files
nadir-agent/internal/modules/services/logs.go
T
2026-06-24 17:29:45 +02:00

250 lines
8.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"context"
"encoding/json"
"slices"
"strconv"
"strings"
"time"
"nadir/internal/oscmd"
"github.com/danielgtaylor/huma/v2"
"github.com/danielgtaylor/huma/v2/sse"
)
const (
defaultLogLines = 100
maxLogLines = 10000
)
// logStreamSem limits concurrent log-follow connections. Each one forks a
// journalctl or tail subprocess, so too many would exhaust system resources.
var logStreamSem = make(chan struct{}, 10)
// LogEntry is one log record. For the journal source it is distilled from
// journalctl's JSON; for the file source only Message is set (the raw line,
// which usually carries its own embedded timestamp).
type LogEntry struct {
Time string `json:"time" example:"2026-06-20T08:15:04Z" doc:"Record timestamp (RFC3339, UTC); empty for file lines"`
Priority int `json:"priority" example:"6" doc:"syslog priority 0 (emerg) 7 (debug); 6 for file lines"`
Message string `json:"message" example:"Started OpenSSH server daemon."`
}
// ErrorEvent is an SSE event carrying a stream-level error (e.g. bad unit).
type ErrorEvent struct {
Message string `json:"message"`
}
type LogsInput struct {
Unit string `path:"unit" example:"docker.service" doc:"Unit name as listed by GET /api/services; the trailing .service is optional"`
Source string `query:"source" enum:"journal,file" default:"journal" doc:"Where to read logs from"`
Path string `query:"path" example:"/var/log/nginx/error.log" doc:"Log file (file source only); must be allowlisted for this unit in config"`
Lines int `query:"lines" default:"100" doc:"How many recent records to return (max 10000)"`
Since string `query:"since" example:"-1h" doc:"journalctl time filter (journal source only)"`
Priority int `query:"priority" default:"7" minimum:"0" maximum:"7" doc:"Max syslog priority to include: 0 emerg .. 7 debug (journal source only). 7 = all."`
}
type LogsOutput struct {
Body struct {
Entries []LogEntry `json:"entries" doc:"Log records, oldest first"`
}
}
type LogStreamInput struct {
Unit string `path:"unit" example:"docker.service" doc:"Unit name as listed by GET /api/services; the trailing .service is optional"`
Source string `query:"source" enum:"journal,file" default:"journal" doc:"Where to stream logs from"`
Path string `query:"path" example:"/var/log/nginx/error.log" doc:"Log file (file source only); must be allowlisted for this unit in config"`
Since string `query:"since" example:"-1h" doc:"Backfill window (journal source only)"`
Priority int `query:"priority" default:"7" minimum:"0" maximum:"7" doc:"Max syslog priority to include: 0 emerg .. 7 debug (journal source only). 7 = all."`
}
func registerLogs(api huma.API, logFiles map[string][]string) {
huma.Register(api, huma.Operation{
OperationID: "services-logs",
Method: "GET",
Path: "/api/services/{unit}/logs",
Summary: "Get recent log records for a service",
Description: "Returns a snapshot of the unit's logs from the journal " +
"(default) or an allowlisted file (source=file&path=). Use /logs/stream " +
"to follow new records live.",
Tags: []string{tagServices},
Metadata: op("read"),
// No 404: the journal is historical, so logs are returned even for units
// that aren't currently loaded; an unknown unit just yields an empty list.
Errors: []int{400, 401, 403, 500},
}, func(ctx context.Context, in *LogsInput) (*LogsOutput, error) {
if err := validateUnit(in.Unit); err != nil {
return nil, err
}
var lines []string
var err error
if in.Source == "file" {
path, perr := resolveLogPath(logFiles, in.Unit, in.Path)
if perr != nil {
return nil, perr
}
lines, err = oscmd.RunLines("tail", "-n", strconv.Itoa(clampLines(in.Lines)), "--", path)
if err != nil {
return nil, huma.Error500InternalServerError("tail failed", err)
}
return fileOutput(lines), nil
}
args := []string{"-u", journalUnit(in.Unit), "--no-pager", "-o", "json", "-p", strconv.Itoa(in.Priority), "-n", strconv.Itoa(clampLines(in.Lines))}
if in.Since != "" {
args = append(args, "--since", in.Since)
}
lines, err = oscmd.RunLines("journalctl", args...)
if err != nil {
return nil, huma.Error500InternalServerError("journalctl failed", err)
}
out := &LogsOutput{}
out.Body.Entries = []LogEntry{}
for _, l := range lines {
if e, ok := parseJournalLine([]byte(l)); ok {
out.Body.Entries = append(out.Body.Entries, e)
}
}
return out, nil
})
// Streaming via huma's sse package keeps the route inside huma, so the RBAC
// middleware still enforces op("read") - a raw mux handler would bypass it.
sse.Register(api, huma.Operation{
OperationID: "services-logs-stream",
Method: "GET",
Path: "/api/services/{unit}/logs/stream",
Summary: "Stream a service's logs (Server-Sent Events)",
Description: "Follows the unit's journal (journalctl -f) or an allowlisted " +
"file (source=file&path=, via tail -F) and emits a `log` event per " +
"record. Stops when the client disconnects.",
Tags: []string{tagServices},
Metadata: op("read"),
}, map[string]any{
"log": LogEntry{},
"error": ErrorEvent{},
}, func(ctx context.Context, in *LogStreamInput, send sse.Sender) {
if err := validateUnit(in.Unit); err != nil {
send.Data(ErrorEvent{Message: "invalid unit name"})
return
}
select {
case logStreamSem <- struct{}{}:
default:
send.Data(ErrorEvent{Message: "too many concurrent log streams, try again later"})
return
}
defer func() { <-logStreamSem }()
var cmd string
var args []string
if in.Source == "file" {
path, perr := resolveLogPath(logFiles, in.Unit, in.Path)
if perr != nil {
send.Data(ErrorEvent{Message: perr.Error()})
return
}
cmd, args = "tail", []string{"-n", strconv.Itoa(defaultLogLines), "-F", "--", path}
} else {
cmd = "journalctl"
args = []string{"-u", journalUnit(in.Unit), "--no-pager", "-o", "json", "-p", strconv.Itoa(in.Priority), "-f"}
if in.Since != "" {
args = append(args, "--since", in.Since)
}
}
lines, err := oscmd.RunStream(ctx, cmd, args...)
if err != nil {
send.Data(ErrorEvent{Message: cmd + " failed: " + err.Error()})
return
}
for l := range lines {
e, ok := LogEntry{Priority: 6, Message: l}, true
if in.Source != "file" {
e, ok = parseJournalLine([]byte(l))
}
if ok {
if send.Data(e) != nil {
return // client gone; ctx cancel will kill the command
}
}
}
})
}
// resolveLogPath validates that path is allowlisted for unit. The caller never
// gets to point exec at an arbitrary file - only paths an admin listed under
// log_files for this unit are accepted.
func resolveLogPath(logFiles map[string][]string, unit, path string) (string, error) {
if path == "" {
return "", huma.Error400BadRequest("source=file requires a path")
}
// Match the allowlist key suffix-insensitively (nginx == nginx.service), so
// it behaves like the journal source regardless of which form the caller and
// the config author each used.
want := journalUnit(unit)
for key, paths := range logFiles {
if journalUnit(key) == want && slices.Contains(paths, path) {
return path, nil
}
}
return "", huma.Error403Forbidden("log file not allowlisted for unit " + unit + ": " + path)
}
func fileOutput(lines []string) *LogsOutput {
out := &LogsOutput{}
out.Body.Entries = make([]LogEntry, 0, len(lines))
for _, l := range lines {
out.Body.Entries = append(out.Body.Entries, LogEntry{Priority: 6, Message: l})
}
return out
}
// journalUnit normalizes a unit name for `journalctl -u`. journalctl treats a
// bare name as a .service, and on some setups only the bare form matches the
// recorded _SYSTEMD_UNIT, so we always strip the suffix. This is the services
// module, so .service is the only suffix we expect.
func journalUnit(unit string) string {
return strings.TrimSuffix(unit, ".service")
}
func clampLines(n int) int {
switch {
case n <= 0:
return defaultLogLines
case n > maxLogLines:
return maxLogLines
default:
return n
}
}
// parseJournalLine distills one journalctl `-o json` record. Returns false for
// unparseable lines. Binary MESSAGE fields (encoded as a byte array rather than
// a string) yield an empty message rather than an error.
func parseJournalLine(line []byte) (LogEntry, bool) {
var raw struct {
Message any `json:"MESSAGE"`
Priority string `json:"PRIORITY"`
TS string `json:"__REALTIME_TIMESTAMP"`
}
if err := json.Unmarshal(line, &raw); err != nil {
return LogEntry{}, false
}
// Records without a PRIORITY (it's often absent) default to info (6), not
// the zero value 0 which is emerg - that would fake critical alerts.
e := LogEntry{Priority: 6}
e.Message, _ = raw.Message.(string)
if p, err := strconv.Atoi(raw.Priority); err == nil {
e.Priority = p
}
if us, err := strconv.ParseInt(raw.TS, 10, 64); err == nil {
e.Time = time.UnixMicro(us).UTC().Format(time.RFC3339)
}
return e, true
}