2018-11-16 10:39:21 +01:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
2018-11-18 21:38:04 +01:00
|
|
|
"database/sql/driver"
|
2018-11-16 10:39:21 +01:00
|
|
|
"encoding/json"
|
|
|
|
"flag"
|
2018-11-18 21:38:04 +01:00
|
|
|
"fmt"
|
2018-11-16 10:39:21 +01:00
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
2018-12-10 09:34:00 +01:00
|
|
|
"os"
|
2018-11-16 10:39:21 +01:00
|
|
|
"os/exec"
|
2018-11-18 21:38:04 +01:00
|
|
|
"strconv"
|
|
|
|
"strings"
|
2018-11-16 10:39:21 +01:00
|
|
|
"sync"
|
|
|
|
"syscall"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/lib/pq"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
configPath = flag.String("config", "moncheck.conf", "path to the config file")
|
|
|
|
)
|
|
|
|
|
|
|
|
type (
|
|
|
|
Config struct {
|
2018-12-10 09:34:00 +01:00
|
|
|
DB string `json:"db"`
|
|
|
|
Timeout string `json:"timeout"`
|
|
|
|
Wait string `json:"wait"`
|
|
|
|
Path []string `json:"path"`
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
2018-11-18 21:38:04 +01:00
|
|
|
|
|
|
|
States []int
|
2018-11-16 10:39:21 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
func main() {
|
|
|
|
flag.Parse()
|
|
|
|
|
|
|
|
raw, err := ioutil.ReadFile(*configPath)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("could not read config: %s", err)
|
|
|
|
}
|
2018-11-29 10:45:33 +01:00
|
|
|
config := Config{Timeout: "30s", Wait: "30s"}
|
2018-11-16 10:39:21 +01:00
|
|
|
if err := json.Unmarshal(raw, &config); err != nil {
|
|
|
|
log.Fatalf("could not parse config: %s", err)
|
|
|
|
}
|
|
|
|
|
2018-12-10 09:34:00 +01:00
|
|
|
if err := os.Setenv("PATH", strings.Join(config.Path, ":")); err != nil {
|
|
|
|
log.Fatalf("could not set PATH: %s", err)
|
|
|
|
}
|
|
|
|
|
2018-11-16 10:39:21 +01:00
|
|
|
waitDuration, err := time.ParseDuration(config.Wait)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("could not parse wait duration: %s", err)
|
|
|
|
}
|
2018-11-29 10:45:33 +01:00
|
|
|
timeout, err := time.ParseDuration(config.Timeout)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("could not parse timeout: %s", err)
|
|
|
|
}
|
2018-11-16 10:39:21 +01:00
|
|
|
|
|
|
|
db, err := sql.Open("postgres", config.DB)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("could not open database connection: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < 25; i++ {
|
2018-11-29 10:45:33 +01:00
|
|
|
go check(i, db, waitDuration, timeout)
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
|
|
|
wg := sync.WaitGroup{}
|
|
|
|
wg.Add(1)
|
|
|
|
wg.Wait()
|
|
|
|
}
|
|
|
|
|
2018-11-29 10:45:33 +01:00
|
|
|
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration) {
|
2018-11-16 10:39:21 +01:00
|
|
|
for {
|
|
|
|
tx, err := db.Begin()
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("[%d] could not start transaction: %s", thread, err)
|
|
|
|
continue
|
|
|
|
}
|
2018-12-11 20:21:14 +01:00
|
|
|
rows, err := tx.Query(`select check_id, cmdLine, states, mapping_id
|
2018-12-11 13:02:23 +01:00
|
|
|
from active_checks
|
|
|
|
where next_time < now()
|
|
|
|
and enabled
|
|
|
|
order by next_time
|
|
|
|
for update skip locked
|
|
|
|
limit 1;`)
|
2018-11-16 10:39:21 +01:00
|
|
|
if err != nil {
|
|
|
|
log.Printf("[%d] could not start query: %s", thread, err)
|
|
|
|
tx.Rollback()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
var (
|
|
|
|
id int64
|
|
|
|
cmdLine []string
|
2018-11-18 21:38:04 +01:00
|
|
|
states States
|
2018-12-11 13:02:23 +01:00
|
|
|
mapId int
|
|
|
|
state int
|
2018-11-16 10:39:21 +01:00
|
|
|
)
|
|
|
|
found := false
|
|
|
|
for rows.Next() {
|
|
|
|
if err := rows.Err(); err != nil {
|
|
|
|
log.Printf("could not fetch row: %s", err)
|
|
|
|
tx.Rollback()
|
|
|
|
break
|
|
|
|
}
|
2018-12-11 20:21:14 +01:00
|
|
|
err := rows.Scan(&id, pq.Array(&cmdLine), &states, &mapId)
|
2018-12-11 13:02:23 +01:00
|
|
|
if err != nil {
|
2018-11-16 10:39:21 +01:00
|
|
|
log.Printf("could not scan values: %s", err)
|
|
|
|
tx.Rollback()
|
|
|
|
break
|
|
|
|
}
|
|
|
|
found = true
|
|
|
|
}
|
|
|
|
if !found {
|
|
|
|
time.Sleep(waitDuration)
|
|
|
|
tx.Rollback()
|
|
|
|
continue
|
|
|
|
}
|
2018-11-29 10:45:33 +01:00
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
2018-11-16 10:39:21 +01:00
|
|
|
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
|
|
|
|
output := bytes.NewBuffer([]byte{})
|
|
|
|
cmd.Stdout = output
|
|
|
|
cmd.Stderr = output
|
2018-11-18 21:38:04 +01:00
|
|
|
err = cmd.Run()
|
|
|
|
if err != nil && ctx.Err() == context.DeadlineExceeded {
|
|
|
|
cancel()
|
2018-12-11 13:02:23 +01:00
|
|
|
state = 2
|
2018-11-29 10:45:33 +01:00
|
|
|
fmt.Fprintf(output, "check took longer than %s", timeout)
|
2018-12-10 09:34:00 +01:00
|
|
|
} else if err != nil && cmd.ProcessState == nil {
|
|
|
|
log.Printf("[%d] error running check: %s", id, err)
|
2018-12-11 13:02:23 +01:00
|
|
|
state = 3
|
2018-11-16 10:39:21 +01:00
|
|
|
} else if err != nil {
|
|
|
|
cancel()
|
2018-11-18 21:38:04 +01:00
|
|
|
status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
|
2018-11-16 10:39:21 +01:00
|
|
|
if !ok {
|
|
|
|
log.Printf("[%d]error running check: %s", id, err)
|
2018-12-11 13:02:23 +01:00
|
|
|
state = 2
|
2018-11-16 10:39:21 +01:00
|
|
|
} else {
|
2018-12-11 13:02:23 +01:00
|
|
|
state = status.ExitStatus()
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
cancel()
|
2018-12-11 13:02:23 +01:00
|
|
|
state = 0
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
2018-12-11 13:02:23 +01:00
|
|
|
|
2018-12-12 14:05:46 +01:00
|
|
|
mappedState := state
|
2018-12-11 13:02:23 +01:00
|
|
|
err = db.QueryRow(`select target
|
|
|
|
from mapping_level
|
2018-12-12 14:05:46 +01:00
|
|
|
where mapping_id = $1 and source = $2`, mapId, state).Scan(&mappedState)
|
2018-12-11 13:02:23 +01:00
|
|
|
if err != nil {
|
2018-12-12 14:05:46 +01:00
|
|
|
log.Printf("[%d] could not fetch error mapping for check '%d' and source code '%d': %s", thread, id, state, err)
|
2018-12-11 13:02:23 +01:00
|
|
|
tx.Rollback()
|
|
|
|
continue
|
|
|
|
}
|
2018-12-12 14:05:46 +01:00
|
|
|
states.Add(mappedState)
|
2018-11-18 21:38:04 +01:00
|
|
|
msg := output.String()
|
2018-11-16 10:39:21 +01:00
|
|
|
|
2018-12-11 13:02:23 +01:00
|
|
|
if _, err := tx.Exec(`update active_checks ac
|
2018-11-20 21:47:35 +01:00
|
|
|
set next_time = now() + intval, states = $2, msg = $3, acknowledged = case when $4 then false else acknowledged end
|
|
|
|
where check_id = $1`, id, &states, &msg, states.ToOK()); err != nil {
|
2018-11-16 10:39:21 +01:00
|
|
|
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
|
2018-11-16 20:09:31 +01:00
|
|
|
tx.Rollback()
|
|
|
|
continue
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
2018-12-11 20:21:14 +01:00
|
|
|
if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id)
|
|
|
|
select $1, $2, $3, $4, cn.notifier_id
|
|
|
|
from checks_notify cn
|
|
|
|
where cn.check_id = $1`, &id, &states, &msg, &mapId); err != nil {
|
|
|
|
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
|
|
|
|
tx.Rollback()
|
|
|
|
continue
|
2018-11-16 10:39:21 +01:00
|
|
|
}
|
|
|
|
tx.Commit()
|
|
|
|
}
|
|
|
|
}
|
2018-11-18 21:38:04 +01:00
|
|
|
|
|
|
|
func (s *States) Value() (driver.Value, error) {
|
|
|
|
last := len(*s)
|
|
|
|
if last == 0 {
|
|
|
|
return "{}", nil
|
|
|
|
}
|
|
|
|
result := strings.Builder{}
|
|
|
|
_, err := result.WriteString("{")
|
|
|
|
if err != nil {
|
|
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
|
|
}
|
|
|
|
for i, state := range *s {
|
|
|
|
if _, err := fmt.Fprintf(&result, "%d", state); err != nil {
|
|
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
|
|
}
|
|
|
|
if i < last-1 {
|
|
|
|
if _, err := result.WriteString(","); err != nil {
|
|
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if _, err := result.WriteString("}"); err != nil {
|
|
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
|
|
}
|
|
|
|
return result.String(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *States) Scan(src interface{}) error {
|
|
|
|
switch src := src.(type) {
|
|
|
|
case []byte:
|
|
|
|
tmp := bytes.Trim(src, "{}")
|
|
|
|
states := bytes.Split(tmp, []byte(","))
|
|
|
|
result := make([]int, len(states))
|
|
|
|
for i, state := range states {
|
|
|
|
var err error
|
|
|
|
result[i], err = strconv.Atoi(string(state))
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("could not parse element %s: %s", state, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
*s = result
|
|
|
|
return nil
|
|
|
|
default:
|
|
|
|
return fmt.Errorf("could not convert %T to states", src)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Append prepends the new state before all others.
|
|
|
|
func (s *States) Add(state int) {
|
|
|
|
vals := *s
|
|
|
|
statePos := 5
|
|
|
|
if len(vals) < 6 {
|
|
|
|
statePos = len(vals)
|
|
|
|
}
|
|
|
|
*s = append([]int{state}, vals[:statePos]...)
|
|
|
|
return
|
|
|
|
}
|
2018-11-20 21:47:35 +01:00
|
|
|
|
|
|
|
// ToOK returns true when the state returns from != 0 to 0.
|
|
|
|
func (s *States) ToOK() bool {
|
|
|
|
vals := *s
|
|
|
|
if len(vals) == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if len(vals) <= 1 {
|
|
|
|
return vals[0] == 0
|
|
|
|
}
|
|
|
|
if vals[0] == 0 && vals[1] > 0 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|