aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/moncheck/main.go100
1 files changed, 15 insertions, 85 deletions
diff --git a/cmd/moncheck/main.go b/cmd/moncheck/main.go
index 3bac52e..b987186 100644
--- a/cmd/moncheck/main.go
+++ b/cmd/moncheck/main.go
@@ -2,7 +2,6 @@ package main
import (
"bytes"
- "context"
"database/sql"
"database/sql/driver"
"encoding/json"
@@ -11,13 +10,12 @@ import (
"io/ioutil"
"log"
"os"
- "os/exec"
"strconv"
"strings"
"sync"
- "syscall"
"time"
+ "git.zero-knowledge.org/gibheer/monzero"
"github.com/lib/pq"
)
@@ -73,98 +71,30 @@ func main() {
log.Fatalf("could not resolve hostname: %s", err)
}
+ checker := monzero.NewChecker(monzero.CheckerConfig{
+ CheckerID: cfg.CheckerID,
+ DB: db,
+ Timeout: timeout,
+ HostIdentifier: hostname,
+ Executor: monzero.CheckExec,
+ })
+
for i := 0; i < config.Workers; i++ {
- go check(i, db, waitDuration, timeout, hostname, config.CheckerID)
+ go check(checker, waitDuration)
}
wg := sync.WaitGroup{}
wg.Add(1)
wg.Wait()
}
-func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string, checker_id int) {
+func check(checker *monzero.Checker, waitDuration time.Duration) {
for {
- tx, err := db.Begin()
- if err != nil {
- log.Printf("[%d] could not start transaction: %s", thread, err)
- continue
- }
- row := tx.QueryRow(`select check_id, cmdLine, states, mapping_id
- from active_checks
- where next_time < now()
- and enabled
- and checker_id = $1
- order by next_time
- for update skip locked
- limit 1;`, checker_id)
- var (
- id int64
- cmdLine []string
- states States
- mapId int
- state int
- )
- err = row.Scan(&id, pq.Array(&cmdLine), &states, &mapId)
- if err != nil && err == sql.ErrNoRows {
- tx.Rollback()
- time.Sleep(waitDuration)
- continue
- } else if err != nil {
- log.Printf("could not scan values: %s", err)
- tx.Rollback()
- break
- }
- ctx, cancel := context.WithTimeout(context.Background(), timeout)
- cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
- output := bytes.NewBuffer([]byte{})
- cmd.Stdout = output
- cmd.Stderr = output
- err = cmd.Run()
- if err != nil && ctx.Err() == context.DeadlineExceeded {
- cancel()
- state = 2
- fmt.Fprintf(output, "check took longer than %s", timeout)
- } else if err != nil && cmd.ProcessState == nil {
- log.Printf("[%d] error running check: %s", id, err)
- state = 3
- } else if err != nil {
- cancel()
- status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
- if !ok {
- log.Printf("[%d]error running check: %s", id, err)
- state = 2
- } else {
- state = status.ExitStatus()
+ if err := checker.Next(); err != nil {
+ if err != monzero.ErrNoCheck {
+ log.Printf("could not run check: %s", err)
}
- } else {
- cancel()
- state = 0
- }
- msg := output.String()
-
- if _, err := tx.Exec(`update active_checks ac
- set next_time = now() + intval, states = ARRAY[$2::int] || states[1:4],
- msg = $3,
- acknowledged = case when $4 then false else acknowledged end,
- state_since = case $2 when states[1] then state_since else now() end
- where check_id = $1`, id, &state, &msg, states.ToOK()); err != nil {
- log.Printf("[%d] could not update row '%d': %s", thread, id, err)
- tx.Rollback()
- continue
- }
- if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id, check_host)
- select $1, array_agg(ml.target), $2, $3, cn.notifier_id, $4
- from active_checks ac
- cross join lateral unnest(ac.states) s
- join checks_notify cn on ac.check_id = cn.check_id
- join mapping_level ml on ac.mapping_id = ml.mapping_id and s.s = ml.source
- where ac.check_id = $1
- and ac.acknowledged = false
- group by cn.notifier_id;`, &id, &msg, &mapId, &hostname); err != nil {
- log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
- tx.Rollback()
- continue
+ time.Sleep(waitDuration)
}
- tx.Commit()
}
}