142 lines
3.4 KiB
Go
142 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"flag"
|
|
"io/ioutil"
|
|
"log"
|
|
"os/exec"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
configPath = flag.String("config", "moncheck.conf", "path to the config file")
|
|
)
|
|
|
|
type (
|
|
Config struct {
|
|
DB string `json:"db"`
|
|
Wait string `json:"wait"`
|
|
}
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
raw, err := ioutil.ReadFile(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("could not read config: %s", err)
|
|
}
|
|
config := Config{}
|
|
if err := json.Unmarshal(raw, &config); err != nil {
|
|
log.Fatalf("could not parse config: %s", err)
|
|
}
|
|
|
|
waitDuration, err := time.ParseDuration(config.Wait)
|
|
if err != nil {
|
|
log.Fatalf("could not parse wait duration: %s", err)
|
|
}
|
|
|
|
db, err := sql.Open("postgres", config.DB)
|
|
if err != nil {
|
|
log.Fatalf("could not open database connection: %s", err)
|
|
}
|
|
|
|
for i := 0; i < 25; i++ {
|
|
go check(i, db, waitDuration)
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
wg.Wait()
|
|
}
|
|
|
|
func check(thread int, db *sql.DB, waitDuration time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("[%d] could not start transaction: %s", thread, err)
|
|
continue
|
|
}
|
|
rows, err := tx.Query("select check_id, check_id, cmdLine, states, next_time from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;")
|
|
if err != nil {
|
|
log.Printf("[%d] could not start query: %s", thread, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
var (
|
|
id int64
|
|
cmdLine []string
|
|
states []int64
|
|
notify bool
|
|
)
|
|
found := false
|
|
for rows.Next() {
|
|
if err := rows.Err(); err != nil {
|
|
log.Printf("could not fetch row: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
if err := rows.Scan(&id, pq.Array(&cmdLine), pq.Array(&states), ¬ify); err != nil {
|
|
log.Printf("could not scan values: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
found = true
|
|
}
|
|
if !found {
|
|
time.Sleep(waitDuration)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
statePos := 5
|
|
if len(states) < 6 {
|
|
statePos = len(states)
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
|
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
|
|
output := bytes.NewBuffer([]byte{})
|
|
cmd.Stdout = output
|
|
cmd.Stderr = output
|
|
if err != nil && err == context.Canceled {
|
|
log.Printf("[%d] check took too long: %s", id, err)
|
|
// TODO which state to choose?
|
|
// TODO add notification handler
|
|
// TODO all this casting should be done better
|
|
states = append([]int64{1}, states[:statePos]...)
|
|
} else if err != nil {
|
|
cancel()
|
|
exitErr, ok := err.(*exec.ExitError)
|
|
if !ok {
|
|
log.Printf("[%d]error running check: %s", id, err)
|
|
states = append([]int64{1}, states[:statePos]...)
|
|
} else {
|
|
status, ok := exitErr.Sys().(syscall.WaitStatus)
|
|
if !ok {
|
|
} else {
|
|
states = append([]int64{int64(status.ExitStatus())}, states[:statePos]...)
|
|
}
|
|
}
|
|
} else {
|
|
cancel()
|
|
states = append([]int64{0}, states[:statePos]...)
|
|
}
|
|
|
|
if _, err := tx.Exec("update active_checks set next_time = now() + interval, states = $2 where check_id = $1", id, pq.Array(&states)); err != nil {
|
|
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
|
|
}
|
|
if notify {
|
|
if _, err := tx.Exec("insert into notifications(check_id, states, output) values ($1, $2, $3);", id, pq.Array(&states), &output); err != nil {
|
|
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
|
|
}
|
|
}
|
|
tx.Commit()
|
|
}
|
|
}
|