aboutsummaryrefslogtreecommitdiff
path: root/cmd/moncheck
diff options
context:
space:
mode:
authorGibheer <gibheer+git@zero-knowledge.org>2021-12-02 15:57:00 +0100
committerGibheer <gibheer+git@zero-knowledge.org>2021-12-02 15:57:00 +0100
commitb69cd1ea3e2894ebe5afd189c8719836b7ed7aa6 (patch)
tree103114be1903b62f700fb1d83bf4b8ca33fcd79f /cmd/moncheck
parent0639a504ebcc48030f8eefd80839738cfb89a583 (diff)
cmd/moncheck - switch to monzero.Checker
This switches the internals of the moncheck instance to using the monzero.Checker API. This way we can reuse it completely and also see if it works properly.
Diffstat (limited to 'cmd/moncheck')
-rw-r--r--cmd/moncheck/main.go100
1 files changed, 15 insertions, 85 deletions
diff --git a/cmd/moncheck/main.go b/cmd/moncheck/main.go
index 3bac52e..b987186 100644
--- a/cmd/moncheck/main.go
+++ b/cmd/moncheck/main.go
@@ -2,7 +2,6 @@ package main
import (
"bytes"
- "context"
"database/sql"
"database/sql/driver"
"encoding/json"
@@ -11,13 +10,12 @@ import (
"io/ioutil"
"log"
"os"
- "os/exec"
"strconv"
"strings"
"sync"
- "syscall"
"time"
+ "git.zero-knowledge.org/gibheer/monzero"
"github.com/lib/pq"
)
@@ -73,98 +71,30 @@ func main() {
log.Fatalf("could not resolve hostname: %s", err)
}
+ checker := monzero.NewChecker(monzero.CheckerConfig{
+ CheckerID: cfg.CheckerID,
+ DB: db,
+ Timeout: timeout,
+ HostIdentifier: hostname,
+ Executor: monzero.CheckExec,
+ })
+
for i := 0; i < config.Workers; i++ {
- go check(i, db, waitDuration, timeout, hostname, config.CheckerID)
+ go check(checker, waitDuration)
}
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
-func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string, checker_id int) {
+func check(checker *monzero.Checker, waitDuration time.Duration) {
for {
- tx, err := db.Begin()
- if err != nil {
- log.Printf("[%d] could not start transaction: %s", thread, err)
- continue
- }
- row := tx.QueryRow(`select check_id, cmdLine, states, mapping_id
- from active_checks
- where next_time < now()
- and enabled
- and checker_id = $1
- order by next_time
- for update skip locked
- limit 1;`, checker_id)
- var (
- id int64
- cmdLine []string
- states States
- mapId int
- state int
- )
- err = row.Scan(&id, pq.Array(&cmdLine), &states, &mapId)
- if err != nil && err == sql.ErrNoRows {
- tx.Rollback()
- time.Sleep(waitDuration)
- continue
- } else if err != nil {
- log.Printf("could not scan values: %s", err)
- tx.Rollback()
- break
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
- output := bytes.NewBuffer([]byte{})
- cmd.Stdout = output
- cmd.Stderr = output
- err = cmd.Run()
- if err != nil && ctx.Err() == context.DeadlineExceeded {
- cancel()
- state = 2
- fmt.Fprintf(output, "check took longer than %s", timeout)
- } else if err != nil && cmd.ProcessState == nil {
- log.Printf("[%d] error running check: %s", id, err)
- state = 3
- } else if err != nil {
- cancel()
- status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
- if !ok {
- log.Printf("[%d]error running check: %s", id, err)
- state = 2
- } else {
- state = status.ExitStatus()
+ if err := checker.Next(); err != nil {
+ if err != monzero.ErrNoCheck {
+ log.Printf("could not run check: %s", err)
}
- } else {
- cancel()
- state = 0
- }
- msg := output.String()
-
- if _, err := tx.Exec(`update active_checks ac
- set next_time = now() + intval, states = ARRAY[$2::int] || states[1:4],
- msg = $3,
- acknowledged = case when $4 then false else acknowledged end,
- state_since = case $2 when states[1] then state_since else now() end
- where check_id = $1`, id, &state, &msg, states.ToOK()); err != nil {
- log.Printf("[%d] could not update row '%d': %s", thread, id, err)
- tx.Rollback()
- continue
- }
- if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id, check_host)
- select $1, array_agg(ml.target), $2, $3, cn.notifier_id, $4
- from active_checks ac
- cross join lateral unnest(ac.states) s
- join checks_notify cn on ac.check_id = cn.check_id
- join mapping_level ml on ac.mapping_id = ml.mapping_id and s.s = ml.source
- where ac.check_id = $1
- and ac.acknowledged = false
- group by cn.notifier_id;`, &id, &msg, &mapId, &hostname); err != nil {
- log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
- tx.Rollback()
- continue
+ time.Sleep(waitDuration)
}
- tx.Commit()
}
}