From b69cd1ea3e2894ebe5afd189c8719836b7ed7aa6 Mon Sep 17 00:00:00 2001 From: Gibheer Date: Thu, 2 Dec 2021 15:57:00 +0100 Subject: cmd/moncheck - switch to monzero.Checker This switches the internals of the moncheck instance to using the monzero.Checker API. This way we can reuse it completely and also see if it works properly. --- cmd/moncheck/main.go | 100 ++++++++------------------------------------------- 1 file changed, 15 insertions(+), 85 deletions(-) diff --git a/cmd/moncheck/main.go b/cmd/moncheck/main.go index 3bac52e..b987186 100644 --- a/cmd/moncheck/main.go +++ b/cmd/moncheck/main.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "context" "database/sql" "database/sql/driver" "encoding/json" @@ -11,13 +10,12 @@ import ( "io/ioutil" "log" "os" - "os/exec" "strconv" "strings" "sync" - "syscall" "time" + "git.zero-knowledge.org/gibheer/monzero" "github.com/lib/pq" ) @@ -73,98 +71,30 @@ func main() { log.Fatalf("could not resolve hostname: %s", err) } + checker := monzero.NewChecker(monzero.CheckerConfig{ + CheckerID: cfg.CheckerID, + DB: db, + Timeout: timeout, + HostIdentifier: hostname, + Executor: monzero.CheckExec, + }) + for i := 0; i < config.Workers; i++ { - go check(i, db, waitDuration, timeout, hostname, config.CheckerID) + go check(checker, waitDuration) } wg := sync.WaitGroup{} wg.Add(1) wg.Wait() } -func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string, checker_id int) { +func check(checker *monzero.Checker, waitDuration time.Duration) { for { - tx, err := db.Begin() - if err != nil { - log.Printf("[%d] could not start transaction: %s", thread, err) - continue - } - row := tx.QueryRow(`select check_id, cmdLine, states, mapping_id - from active_checks - where next_time < now() - and enabled - and checker_id = $1 - order by next_time - for update skip locked - limit 1;`, checker_id) - var ( - id int64 - cmdLine []string - states States - mapId int - state int - ) - err = row.Scan(&id, pq.Array(&cmdLine), &states, &mapId) - if err != nil && err == sql.ErrNoRows { - tx.Rollback() - time.Sleep(waitDuration) - continue - } else if err != nil { - log.Printf("could not scan values: %s", err) - tx.Rollback() - break - } - ctx, cancel := context.WithTimeout(context.Background(), timeout) - cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...) - output := bytes.NewBuffer([]byte{}) - cmd.Stdout = output - cmd.Stderr = output - err = cmd.Run() - if err != nil && ctx.Err() == context.DeadlineExceeded { - cancel() - state = 2 - fmt.Fprintf(output, "check took longer than %s", timeout) - } else if err != nil && cmd.ProcessState == nil { - log.Printf("[%d] error running check: %s", id, err) - state = 3 - } else if err != nil { - cancel() - status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus) - if !ok { - log.Printf("[%d]error running check: %s", id, err) - state = 2 - } else { - state = status.ExitStatus() + if err := checker.Next(); err != nil { + if err != monzero.ErrNoCheck { + log.Printf("could not run check: %s", err) } - } else { - cancel() - state = 0 - } - msg := output.String() - - if _, err := tx.Exec(`update active_checks ac - set next_time = now() + intval, states = ARRAY[$2::int] || states[1:4], - msg = $3, - acknowledged = case when $4 then false else acknowledged end, - state_since = case $2 when states[1] then state_since else now() end - where check_id = $1`, id, &state, &msg, states.ToOK()); err != nil { - log.Printf("[%d] could not update row '%d': %s", thread, id, err) - tx.Rollback() - continue - } - if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id, check_host) - select $1, array_agg(ml.target), $2, $3, cn.notifier_id, $4 - from active_checks ac - cross join lateral unnest(ac.states) s - join checks_notify cn on ac.check_id = cn.check_id - join mapping_level ml on ac.mapping_id = ml.mapping_id and s.s = ml.source - where ac.check_id = $1 - and ac.acknowledged = false - group by cn.notifier_id;`, &id, &msg, &mapId, &hostname); err != nil { - log.Printf("[%d] could not create notification for '%d': %s", thread, id, err) - tx.Rollback() - continue + time.Sleep(waitDuration) } - tx.Commit() } } -- cgit v1.2.3-70-g09d2