aboutsummaryrefslogblamecommitdiff
path: root/cmd/moncheck/main.go
blob: ca31a02d97001e8f5d935535e26176acfa772ebd (plain) (tree)












































































































































                                                                                                                                                                                                        
package main

import (
	"bytes"
	"context"
	"database/sql"
	"encoding/json"
	"flag"
	"io/ioutil"
	"log"
	"os/exec"
	"sync"
	"syscall"
	"time"

	"github.com/lib/pq"
)

var (
	configPath = flag.String("config", "moncheck.conf", "path to the config file")
)

type (
	Config struct {
		DB   string `json:"db"`
		Wait string `json:"wait"`
	}
)

func main() {
	flag.Parse()

	raw, err := ioutil.ReadFile(*configPath)
	if err != nil {
		log.Fatalf("could not read config: %s", err)
	}
	config := Config{}
	if err := json.Unmarshal(raw, &config); err != nil {
		log.Fatalf("could not parse config: %s", err)
	}

	waitDuration, err := time.ParseDuration(config.Wait)
	if err != nil {
		log.Fatalf("could not parse wait duration: %s", err)
	}

	db, err := sql.Open("postgres", config.DB)
	if err != nil {
		log.Fatalf("could not open database connection: %s", err)
	}

	for i := 0; i < 25; i++ {
		go check(i, db, waitDuration)
	}
	wg := sync.WaitGroup{}
	wg.Add(1)
	wg.Wait()
}

func check(thread int, db *sql.DB, waitDuration time.Duration) {
	for {
		tx, err := db.Begin()
		if err != nil {
			log.Printf("[%d] could not start transaction: %s", thread, err)
			continue
		}
		rows, err := tx.Query("select check_id, check_id, cmdLine, states, next_time from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;")
		if err != nil {
			log.Printf("[%d] could not start query: %s", thread, err)
			tx.Rollback()
			continue
		}
		var (
			id      int64
			cmdLine []string
			states  []int64
			notify  bool
		)
		found := false
		for rows.Next() {
			if err := rows.Err(); err != nil {
				log.Printf("could not fetch row: %s", err)
				tx.Rollback()
				break
			}
			if err := rows.Scan(&id, pq.Array(&cmdLine), pq.Array(&states), &notify); err != nil {
				log.Printf("could not scan values: %s", err)
				tx.Rollback()
				break
			}
			found = true
		}
		if !found {
			time.Sleep(waitDuration)
			tx.Rollback()
			continue
		}
		statePos := 5
		if len(states) < 6 {
			statePos = len(states)
		}
		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
		cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
		output := bytes.NewBuffer([]byte{})
		cmd.Stdout = output
		cmd.Stderr = output
		if err != nil && err == context.Canceled {
			log.Printf("[%d] check took too long: %s", id, err)
			// TODO which state to choose?
			// TODO add notification handler
			// TODO all this casting should be done better
			states = append([]int64{1}, states[:statePos]...)
		} else if err != nil {
			cancel()
			exitErr, ok := err.(*exec.ExitError)
			if !ok {
				log.Printf("[%d]error running check: %s", id, err)
				states = append([]int64{1}, states[:statePos]...)
			} else {
				status, ok := exitErr.Sys().(syscall.WaitStatus)
				if !ok {
				} else {
					states = append([]int64{int64(status.ExitStatus())}, states[:statePos]...)
				}
			}
		} else {
			cancel()
			states = append([]int64{0}, states[:statePos]...)
		}

		if _, err := tx.Exec("update active_checks set next_time = now() + interval, states = $2 where check_id = $1", id, pq.Array(&states)); err != nil {
			log.Printf("[%d] could not update row '%d': %s", thread, id, err)
		}
		if notify {
			if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", id, pq.Array(&states), &output); err != nil {
				log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
			}
		}
		tx.Commit()
	}
}