250 lines
8.8 KiB
Go
250 lines
8.8 KiB
Go
package services
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"slices"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"nadir/internal/oscmd"
|
||
|
||
"github.com/danielgtaylor/huma/v2"
|
||
"github.com/danielgtaylor/huma/v2/sse"
|
||
)
|
||
|
||
const (
|
||
defaultLogLines = 100
|
||
maxLogLines = 10000
|
||
)
|
||
|
||
// logStreamSem limits concurrent log-follow connections. Each one forks a
|
||
// journalctl or tail subprocess, so too many would exhaust system resources.
|
||
var logStreamSem = make(chan struct{}, 10)
|
||
|
||
// LogEntry is one log record. For the journal source it is distilled from
|
||
// journalctl's JSON; for the file source only Message is set (the raw line,
|
||
// which usually carries its own embedded timestamp).
|
||
type LogEntry struct {
|
||
Time string `json:"time" example:"2026-06-20T08:15:04Z" doc:"Record timestamp (RFC3339, UTC); empty for file lines"`
|
||
Priority int `json:"priority" example:"6" doc:"syslog priority 0 (emerg) – 7 (debug); 6 for file lines"`
|
||
Message string `json:"message" example:"Started OpenSSH server daemon."`
|
||
}
|
||
|
||
// ErrorEvent is an SSE event carrying a stream-level error (e.g. bad unit).
|
||
type ErrorEvent struct {
|
||
Message string `json:"message"`
|
||
}
|
||
|
||
type LogsInput struct {
|
||
Unit string `path:"unit" example:"docker.service" doc:"Unit name as listed by GET /api/services; the trailing .service is optional"`
|
||
Source string `query:"source" enum:"journal,file" default:"journal" doc:"Where to read logs from"`
|
||
Path string `query:"path" example:"/var/log/nginx/error.log" doc:"Log file (file source only); must be allowlisted for this unit in config"`
|
||
Lines int `query:"lines" default:"100" doc:"How many recent records to return (max 10000)"`
|
||
Since string `query:"since" example:"-1h" doc:"journalctl time filter (journal source only)"`
|
||
Priority int `query:"priority" default:"7" minimum:"0" maximum:"7" doc:"Max syslog priority to include: 0 emerg .. 7 debug (journal source only). 7 = all."`
|
||
}
|
||
|
||
type LogsOutput struct {
|
||
Body struct {
|
||
Entries []LogEntry `json:"entries" doc:"Log records, oldest first"`
|
||
}
|
||
}
|
||
|
||
type LogStreamInput struct {
|
||
Unit string `path:"unit" example:"docker.service" doc:"Unit name as listed by GET /api/services; the trailing .service is optional"`
|
||
Source string `query:"source" enum:"journal,file" default:"journal" doc:"Where to stream logs from"`
|
||
Path string `query:"path" example:"/var/log/nginx/error.log" doc:"Log file (file source only); must be allowlisted for this unit in config"`
|
||
Since string `query:"since" example:"-1h" doc:"Backfill window (journal source only)"`
|
||
Priority int `query:"priority" default:"7" minimum:"0" maximum:"7" doc:"Max syslog priority to include: 0 emerg .. 7 debug (journal source only). 7 = all."`
|
||
}
|
||
|
||
func registerLogs(api huma.API, logFiles map[string][]string) {
|
||
huma.Register(api, huma.Operation{
|
||
OperationID: "services-logs",
|
||
Method: "GET",
|
||
Path: "/api/services/{unit}/logs",
|
||
Summary: "Get recent log records for a service",
|
||
Description: "Returns a snapshot of the unit's logs from the journal " +
|
||
"(default) or an allowlisted file (source=file&path=). Use /logs/stream " +
|
||
"to follow new records live.",
|
||
Tags: []string{tagServices},
|
||
Metadata: op("read"),
|
||
// No 404: the journal is historical, so logs are returned even for units
|
||
// that aren't currently loaded; an unknown unit just yields an empty list.
|
||
Errors: []int{400, 401, 403, 500},
|
||
}, func(ctx context.Context, in *LogsInput) (*LogsOutput, error) {
|
||
if err := validateUnit(in.Unit); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var lines []string
|
||
var err error
|
||
if in.Source == "file" {
|
||
path, perr := resolveLogPath(logFiles, in.Unit, in.Path)
|
||
if perr != nil {
|
||
return nil, perr
|
||
}
|
||
lines, err = oscmd.RunLines("tail", "-n", strconv.Itoa(clampLines(in.Lines)), "--", path)
|
||
if err != nil {
|
||
return nil, huma.Error500InternalServerError("tail failed", err)
|
||
}
|
||
return fileOutput(lines), nil
|
||
}
|
||
|
||
args := []string{"-u", journalUnit(in.Unit), "--no-pager", "-o", "json", "-p", strconv.Itoa(in.Priority), "-n", strconv.Itoa(clampLines(in.Lines))}
|
||
if in.Since != "" {
|
||
args = append(args, "--since", in.Since)
|
||
}
|
||
lines, err = oscmd.RunLines("journalctl", args...)
|
||
if err != nil {
|
||
return nil, huma.Error500InternalServerError("journalctl failed", err)
|
||
}
|
||
out := &LogsOutput{}
|
||
out.Body.Entries = []LogEntry{}
|
||
for _, l := range lines {
|
||
if e, ok := parseJournalLine([]byte(l)); ok {
|
||
out.Body.Entries = append(out.Body.Entries, e)
|
||
}
|
||
}
|
||
return out, nil
|
||
})
|
||
|
||
// Streaming via huma's sse package keeps the route inside huma, so the RBAC
|
||
// middleware still enforces op("read") - a raw mux handler would bypass it.
|
||
sse.Register(api, huma.Operation{
|
||
OperationID: "services-logs-stream",
|
||
Method: "GET",
|
||
Path: "/api/services/{unit}/logs/stream",
|
||
Summary: "Stream a service's logs (Server-Sent Events)",
|
||
Description: "Follows the unit's journal (journalctl -f) or an allowlisted " +
|
||
"file (source=file&path=, via tail -F) and emits a `log` event per " +
|
||
"record. Stops when the client disconnects.",
|
||
Tags: []string{tagServices},
|
||
Metadata: op("read"),
|
||
}, map[string]any{
|
||
"log": LogEntry{},
|
||
"error": ErrorEvent{},
|
||
}, func(ctx context.Context, in *LogStreamInput, send sse.Sender) {
|
||
if err := validateUnit(in.Unit); err != nil {
|
||
send.Data(ErrorEvent{Message: "invalid unit name"})
|
||
return
|
||
}
|
||
|
||
select {
|
||
case logStreamSem <- struct{}{}:
|
||
default:
|
||
send.Data(ErrorEvent{Message: "too many concurrent log streams, try again later"})
|
||
return
|
||
}
|
||
defer func() { <-logStreamSem }()
|
||
|
||
var cmd string
|
||
var args []string
|
||
if in.Source == "file" {
|
||
path, perr := resolveLogPath(logFiles, in.Unit, in.Path)
|
||
if perr != nil {
|
||
send.Data(ErrorEvent{Message: perr.Error()})
|
||
return
|
||
}
|
||
cmd, args = "tail", []string{"-n", strconv.Itoa(defaultLogLines), "-F", "--", path}
|
||
} else {
|
||
cmd = "journalctl"
|
||
args = []string{"-u", journalUnit(in.Unit), "--no-pager", "-o", "json", "-p", strconv.Itoa(in.Priority), "-f"}
|
||
if in.Since != "" {
|
||
args = append(args, "--since", in.Since)
|
||
}
|
||
}
|
||
|
||
lines, err := oscmd.RunStream(ctx, cmd, args...)
|
||
if err != nil {
|
||
send.Data(ErrorEvent{Message: cmd + " failed: " + err.Error()})
|
||
return
|
||
}
|
||
for l := range lines {
|
||
e, ok := LogEntry{Priority: 6, Message: l}, true
|
||
if in.Source != "file" {
|
||
e, ok = parseJournalLine([]byte(l))
|
||
}
|
||
if ok {
|
||
if send.Data(e) != nil {
|
||
return // client gone; ctx cancel will kill the command
|
||
}
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
// resolveLogPath validates that path is allowlisted for unit. The caller never
|
||
// gets to point exec at an arbitrary file - only paths an admin listed under
|
||
// log_files for this unit are accepted.
|
||
func resolveLogPath(logFiles map[string][]string, unit, path string) (string, error) {
|
||
if path == "" {
|
||
return "", huma.Error400BadRequest("source=file requires a path")
|
||
}
|
||
// Match the allowlist key suffix-insensitively (nginx == nginx.service), so
|
||
// it behaves like the journal source regardless of which form the caller and
|
||
// the config author each used.
|
||
want := journalUnit(unit)
|
||
for key, paths := range logFiles {
|
||
if journalUnit(key) == want && slices.Contains(paths, path) {
|
||
return path, nil
|
||
}
|
||
}
|
||
return "", huma.Error403Forbidden("log file not allowlisted for unit " + unit + ": " + path)
|
||
}
|
||
|
||
func fileOutput(lines []string) *LogsOutput {
|
||
out := &LogsOutput{}
|
||
out.Body.Entries = make([]LogEntry, 0, len(lines))
|
||
for _, l := range lines {
|
||
out.Body.Entries = append(out.Body.Entries, LogEntry{Priority: 6, Message: l})
|
||
}
|
||
return out
|
||
}
|
||
|
||
// journalUnit normalizes a unit name for `journalctl -u`. journalctl treats a
|
||
// bare name as a .service, and on some setups only the bare form matches the
|
||
// recorded _SYSTEMD_UNIT, so we always strip the suffix. This is the services
|
||
// module, so .service is the only suffix we expect.
|
||
func journalUnit(unit string) string {
|
||
return strings.TrimSuffix(unit, ".service")
|
||
}
|
||
|
||
func clampLines(n int) int {
|
||
switch {
|
||
case n <= 0:
|
||
return defaultLogLines
|
||
case n > maxLogLines:
|
||
return maxLogLines
|
||
default:
|
||
return n
|
||
}
|
||
}
|
||
|
||
// parseJournalLine distills one journalctl `-o json` record. Returns false for
|
||
// unparseable lines. Binary MESSAGE fields (encoded as a byte array rather than
|
||
// a string) yield an empty message rather than an error.
|
||
func parseJournalLine(line []byte) (LogEntry, bool) {
|
||
var raw struct {
|
||
Message any `json:"MESSAGE"`
|
||
Priority string `json:"PRIORITY"`
|
||
TS string `json:"__REALTIME_TIMESTAMP"`
|
||
}
|
||
if err := json.Unmarshal(line, &raw); err != nil {
|
||
return LogEntry{}, false
|
||
}
|
||
// Records without a PRIORITY (it's often absent) default to info (6), not
|
||
// the zero value 0 which is emerg - that would fake critical alerts.
|
||
e := LogEntry{Priority: 6}
|
||
e.Message, _ = raw.Message.(string)
|
||
if p, err := strconv.Atoi(raw.Priority); err == nil {
|
||
e.Priority = p
|
||
}
|
||
if us, err := strconv.ParseInt(raw.TS, 10, 64); err == nil {
|
||
e.Time = time.UnixMicro(us).UTC().Format(time.RFC3339)
|
||
}
|
||
return e, true
|
||
}
|