From bfd2b5d324d7348eff22c4a991fa0f21e09360f5 Mon Sep 17 00:00:00 2001 From: Gibheer Date: Fri, 16 Nov 2018 10:39:21 +0100 Subject: initial commit --- cmd/moncheck/main.go | 141 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 cmd/moncheck/main.go (limited to 'cmd') diff --git a/cmd/moncheck/main.go b/cmd/moncheck/main.go new file mode 100644 index 0000000..ca31a02 --- /dev/null +++ b/cmd/moncheck/main.go @@ -0,0 +1,141 @@ +package main + +import ( + "bytes" + "context" + "database/sql" + "encoding/json" + "flag" + "io/ioutil" + "log" + "os/exec" + "sync" + "syscall" + "time" + + "github.com/lib/pq" +) + +var ( + configPath = flag.String("config", "moncheck.conf", "path to the config file") +) + +type ( + Config struct { + DB string `json:"db"` + Wait string `json:"wait"` + } +) + +func main() { + flag.Parse() + + raw, err := ioutil.ReadFile(*configPath) + if err != nil { + log.Fatalf("could not read config: %s", err) + } + config := Config{} + if err := json.Unmarshal(raw, &config); err != nil { + log.Fatalf("could not parse config: %s", err) + } + + waitDuration, err := time.ParseDuration(config.Wait) + if err != nil { + log.Fatalf("could not parse wait duration: %s", err) + } + + db, err := sql.Open("postgres", config.DB) + if err != nil { + log.Fatalf("could not open database connection: %s", err) + } + + for i := 0; i < 25; i++ { + go check(i, db, waitDuration) + } + wg := sync.WaitGroup{} + wg.Add(1) + wg.Wait() +} + +func check(thread int, db *sql.DB, waitDuration time.Duration) { + for { + tx, err := db.Begin() + if err != nil { + log.Printf("[%d] could not start transaction: %s", thread, err) + continue + } + rows, err := tx.Query("select check_id, check_id, cmdLine, states, next_time from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;") + if err != nil { + log.Printf("[%d] could not start query: %s", thread, err) + tx.Rollback() + continue + } + var ( + id int64 + cmdLine []string + states []int64 + notify bool + ) + found := false + for rows.Next() { + if err := rows.Err(); err != nil { + log.Printf("could not fetch row: %s", err) + tx.Rollback() + break + } + if err := rows.Scan(&id, pq.Array(&cmdLine), pq.Array(&states), ¬ify); err != nil { + log.Printf("could not scan values: %s", err) + tx.Rollback() + break + } + found = true + } + if !found { + time.Sleep(waitDuration) + tx.Rollback() + continue + } + statePos := 5 + if len(states) < 6 { + statePos = len(states) + } + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...) + output := bytes.NewBuffer([]byte{}) + cmd.Stdout = output + cmd.Stderr = output + if err != nil && err == context.Canceled { + log.Printf("[%d] check took too long: %s", id, err) + // TODO which state to choose? + // TODO add notification handler + // TODO all this casting should be done better + states = append([]int64{1}, states[:statePos]...) + } else if err != nil { + cancel() + exitErr, ok := err.(*exec.ExitError) + if !ok { + log.Printf("[%d]error running check: %s", id, err) + states = append([]int64{1}, states[:statePos]...) + } else { + status, ok := exitErr.Sys().(syscall.WaitStatus) + if !ok { + } else { + states = append([]int64{int64(status.ExitStatus())}, states[:statePos]...) + } + } + } else { + cancel() + states = append([]int64{0}, states[:statePos]...) + } + + if _, err := tx.Exec("update active_checks set next_time = now() + interval, states = $2 where check_id = $1", id, pq.Array(&states)); err != nil { + log.Printf("[%d] could not update row '%d': %s", thread, id, err) + } + if notify { + if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", id, pq.Array(&states), &output); err != nil { + log.Printf("[%d] could not create notification for '%d': %s", thread, id, err) + } + } + tx.Commit() + } +} -- cgit v1.2.3-70-g09d2