cmd/moncheck - switch to monzero.Checker

This switches the internals of the moncheck instance to using the
monzero.Checker API. This way we can reuse it completely and also see if
it works properly.
This commit is contained in:
Gibheer 2021-12-02 15:57:00 +01:00
parent 0639a504eb
commit b69cd1ea3e

View File

@ -2,7 +2,6 @@ package main
import (
"bytes"
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
@ -11,13 +10,12 @@ import (
"io/ioutil"
"log"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"syscall"
"time"
"git.zero-knowledge.org/gibheer/monzero"
"github.com/lib/pq"
)
@ -73,98 +71,30 @@ func main() {
log.Fatalf("could not resolve hostname: %s", err)
}
checker := monzero.NewChecker(monzero.CheckerConfig{
CheckerID: cfg.CheckerID,
DB: db,
Timeout: timeout,
HostIdentifier: hostname,
Executor: monzero.CheckExec,
})
for i := 0; i < config.Workers; i++ {
go check(i, db, waitDuration, timeout, hostname, config.CheckerID)
go check(checker, waitDuration)
}
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string, checker_id int) {
func check(checker *monzero.Checker, waitDuration time.Duration) {
for {
tx, err := db.Begin()
if err != nil {
log.Printf("[%d] could not start transaction: %s", thread, err)
continue
}
row := tx.QueryRow(`select check_id, cmdLine, states, mapping_id
from active_checks
where next_time < now()
and enabled
and checker_id = $1
order by next_time
for update skip locked
limit 1;`, checker_id)
var (
id int64
cmdLine []string
states States
mapId int
state int
)
err = row.Scan(&id, pq.Array(&cmdLine), &states, &mapId)
if err != nil && err == sql.ErrNoRows {
tx.Rollback()
time.Sleep(waitDuration)
continue
} else if err != nil {
log.Printf("could not scan values: %s", err)
tx.Rollback()
break
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
output := bytes.NewBuffer([]byte{})
cmd.Stdout = output
cmd.Stderr = output
err = cmd.Run()
if err != nil && ctx.Err() == context.DeadlineExceeded {
cancel()
state = 2
fmt.Fprintf(output, "check took longer than %s", timeout)
} else if err != nil && cmd.ProcessState == nil {
log.Printf("[%d] error running check: %s", id, err)
state = 3
} else if err != nil {
cancel()
status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
if !ok {
log.Printf("[%d]error running check: %s", id, err)
state = 2
} else {
state = status.ExitStatus()
if err := checker.Next(); err != nil {
if err != monzero.ErrNoCheck {
log.Printf("could not run check: %s", err)
}
} else {
cancel()
state = 0
time.Sleep(waitDuration)
}
msg := output.String()
if _, err := tx.Exec(`update active_checks ac
set next_time = now() + intval, states = ARRAY[$2::int] || states[1:4],
msg = $3,
acknowledged = case when $4 then false else acknowledged end,
state_since = case $2 when states[1] then state_since else now() end
where check_id = $1`, id, &state, &msg, states.ToOK()); err != nil {
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
tx.Rollback()
continue
}
if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id, check_host)
select $1, array_agg(ml.target), $2, $3, cn.notifier_id, $4
from active_checks ac
cross join lateral unnest(ac.states) s
join checks_notify cn on ac.check_id = cn.check_id
join mapping_level ml on ac.mapping_id = ml.mapping_id and s.s = ml.source
where ac.check_id = $1
and ac.acknowledged = false
group by cn.notifier_id;`, &id, &msg, &mapId, &hostname); err != nil {
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
tx.Rollback()
continue
}
tx.Commit()
}
}