aboutsummaryrefslogtreecommitdiff
path: root/cmd/moncheck/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/moncheck/main.go')
-rw-r--r--cmd/moncheck/main.go141
1 files changed, 141 insertions, 0 deletions
diff --git a/cmd/moncheck/main.go b/cmd/moncheck/main.go
new file mode 100644
index 0000000..ca31a02
--- /dev/null
+++ b/cmd/moncheck/main.go
@@ -0,0 +1,141 @@
+package main
+
+import (
+ "bytes"
+ "context"
+ "database/sql"
+ "encoding/json"
+ "flag"
+ "io/ioutil"
+ "log"
+ "os/exec"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/lib/pq"
+)
+
+var (
+ configPath = flag.String("config", "moncheck.conf", "path to the config file")
+)
+
+type (
+ Config struct {
+ DB string `json:"db"`
+ Wait string `json:"wait"`
+ }
+)
+
+func main() {
+ flag.Parse()
+
+ raw, err := ioutil.ReadFile(*configPath)
+ if err != nil {
+ log.Fatalf("could not read config: %s", err)
+ }
+ config := Config{}
+ if err := json.Unmarshal(raw, &config); err != nil {
+ log.Fatalf("could not parse config: %s", err)
+ }
+
+ waitDuration, err := time.ParseDuration(config.Wait)
+ if err != nil {
+ log.Fatalf("could not parse wait duration: %s", err)
+ }
+
+ db, err := sql.Open("postgres", config.DB)
+ if err != nil {
+ log.Fatalf("could not open database connection: %s", err)
+ }
+
+ for i := 0; i < 25; i++ {
+ go check(i, db, waitDuration)
+ }
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ wg.Wait()
+}
+
+func check(thread int, db *sql.DB, waitDuration time.Duration) {
+ for {
+ tx, err := db.Begin()
+ if err != nil {
+ log.Printf("[%d] could not start transaction: %s", thread, err)
+ continue
+ }
+ rows, err := tx.Query("select check_id, check_id, cmdLine, states, next_time from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;")
+ if err != nil {
+ log.Printf("[%d] could not start query: %s", thread, err)
+ tx.Rollback()
+ continue
+ }
+ var (
+ id int64
+ cmdLine []string
+ states []int64
+ notify bool
+ )
+ found := false
+ for rows.Next() {
+ if err := rows.Err(); err != nil {
+ log.Printf("could not fetch row: %s", err)
+ tx.Rollback()
+ break
+ }
+ if err := rows.Scan(&id, pq.Array(&cmdLine), pq.Array(&states), &notify); err != nil {
+ log.Printf("could not scan values: %s", err)
+ tx.Rollback()
+ break
+ }
+ found = true
+ }
+ if !found {
+ time.Sleep(waitDuration)
+ tx.Rollback()
+ continue
+ }
+ statePos := 5
+ if len(states) < 6 {
+ statePos = len(states)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
+ output := bytes.NewBuffer([]byte{})
+ cmd.Stdout = output
+ cmd.Stderr = output
+ if err != nil && err == context.Canceled {
+ log.Printf("[%d] check took too long: %s", id, err)
+ // TODO which state to choose?
+ // TODO add notification handler
+ // TODO all this casting should be done better
+ states = append([]int64{1}, states[:statePos]...)
+ } else if err != nil {
+ cancel()
+ exitErr, ok := err.(*exec.ExitError)
+ if !ok {
+ log.Printf("[%d]error running check: %s", id, err)
+ states = append([]int64{1}, states[:statePos]...)
+ } else {
+ status, ok := exitErr.Sys().(syscall.WaitStatus)
+ if !ok {
+ } else {
+ states = append([]int64{int64(status.ExitStatus())}, states[:statePos]...)
+ }
+ }
+ } else {
+ cancel()
+ states = append([]int64{0}, states[:statePos]...)
+ }
+
+ if _, err := tx.Exec("update active_checks set next_time = now() + interval, states = $2 where check_id = $1", id, pq.Array(&states)); err != nil {
+ log.Printf("[%d] could not update row '%d': %s", thread, id, err)
+ }
+ if notify {
+ if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", id, pq.Array(&states), &output); err != nil {
+ log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
+ }
+ }
+ tx.Commit()
+ }
+}