Gibheer
3b222e06ed
The default worker count was set to 25, which might be too much depending on the amount of cores available. Now make the default 25 and let people decide on how many they want to use.
253 lines
5.9 KiB
Go
253 lines
5.9 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/lib/pq"
|
|
)
|
|
|
|
var (
|
|
configPath = flag.String("config", "moncheck.conf", "path to the config file")
|
|
)
|
|
|
|
type (
|
|
Config struct {
|
|
DB string `json:"db"`
|
|
Timeout string `json:"timeout"`
|
|
Wait string `json:"wait"`
|
|
Path []string `json:"path"`
|
|
Workers int `json:"workers"`
|
|
}
|
|
|
|
States []int
|
|
)
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
raw, err := ioutil.ReadFile(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("could not read config: %s", err)
|
|
}
|
|
config := Config{Timeout: "30s", Wait: "30s", Workers: 25}
|
|
if err := json.Unmarshal(raw, &config); err != nil {
|
|
log.Fatalf("could not parse config: %s", err)
|
|
}
|
|
|
|
if err := os.Setenv("PATH", strings.Join(config.Path, ":")); err != nil {
|
|
log.Fatalf("could not set PATH: %s", err)
|
|
}
|
|
|
|
waitDuration, err := time.ParseDuration(config.Wait)
|
|
if err != nil {
|
|
log.Fatalf("could not parse wait duration: %s", err)
|
|
}
|
|
timeout, err := time.ParseDuration(config.Timeout)
|
|
if err != nil {
|
|
log.Fatalf("could not parse timeout: %s", err)
|
|
}
|
|
|
|
db, err := sql.Open("postgres", config.DB)
|
|
if err != nil {
|
|
log.Fatalf("could not open database connection: %s", err)
|
|
}
|
|
|
|
for i := 0; i < config.Workers; i++ {
|
|
go check(i, db, waitDuration, timeout)
|
|
}
|
|
wg := sync.WaitGroup{}
|
|
wg.Add(1)
|
|
wg.Wait()
|
|
}
|
|
|
|
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration) {
|
|
for {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
log.Printf("[%d] could not start transaction: %s", thread, err)
|
|
continue
|
|
}
|
|
rows, err := tx.Query(`select check_id, cmdLine, states, mapping_id
|
|
from active_checks
|
|
where next_time < now()
|
|
and enabled
|
|
order by next_time
|
|
for update skip locked
|
|
limit 1;`)
|
|
if err != nil {
|
|
log.Printf("[%d] could not start query: %s", thread, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
var (
|
|
id int64
|
|
cmdLine []string
|
|
states States
|
|
mapId int
|
|
state int
|
|
)
|
|
found := false
|
|
for rows.Next() {
|
|
if err := rows.Err(); err != nil {
|
|
log.Printf("could not fetch row: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
err := rows.Scan(&id, pq.Array(&cmdLine), &states, &mapId)
|
|
if err != nil {
|
|
log.Printf("could not scan values: %s", err)
|
|
tx.Rollback()
|
|
break
|
|
}
|
|
found = true
|
|
}
|
|
if !found {
|
|
time.Sleep(waitDuration)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
cmd := exec.CommandContext(ctx, cmdLine[0], cmdLine[1:]...)
|
|
output := bytes.NewBuffer([]byte{})
|
|
cmd.Stdout = output
|
|
cmd.Stderr = output
|
|
err = cmd.Run()
|
|
if err != nil && ctx.Err() == context.DeadlineExceeded {
|
|
cancel()
|
|
state = 2
|
|
fmt.Fprintf(output, "check took longer than %s", timeout)
|
|
} else if err != nil && cmd.ProcessState == nil {
|
|
log.Printf("[%d] error running check: %s", id, err)
|
|
state = 3
|
|
} else if err != nil {
|
|
cancel()
|
|
status, ok := cmd.ProcessState.Sys().(syscall.WaitStatus)
|
|
if !ok {
|
|
log.Printf("[%d]error running check: %s", id, err)
|
|
state = 2
|
|
} else {
|
|
state = status.ExitStatus()
|
|
}
|
|
} else {
|
|
cancel()
|
|
state = 0
|
|
}
|
|
|
|
mappedState := state
|
|
err = db.QueryRow(`select target
|
|
from mapping_level
|
|
where mapping_id = $1 and source = $2`, mapId, state).Scan(&mappedState)
|
|
if err != nil {
|
|
log.Printf("[%d] could not fetch error mapping for check '%d' and source code '%d': %s", thread, id, state, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
states.Add(mappedState)
|
|
msg := output.String()
|
|
|
|
if _, err := tx.Exec(`update active_checks ac
|
|
set next_time = now() + intval, states = $2, msg = $3, acknowledged = case when $4 then false else acknowledged end
|
|
where check_id = $1`, id, &states, &msg, states.ToOK()); err != nil {
|
|
log.Printf("[%d] could not update row '%d': %s", thread, id, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
if _, err := tx.Exec(`insert into notifications(check_id, states, output, mapping_id, notifier_id)
|
|
select $1, $2, $3, $4, cn.notifier_id
|
|
from checks_notify cn
|
|
where cn.check_id = $1`, &id, &states, &msg, &mapId); err != nil {
|
|
log.Printf("[%d] could not create notification for '%d': %s", thread, id, err)
|
|
tx.Rollback()
|
|
continue
|
|
}
|
|
tx.Commit()
|
|
}
|
|
}
|
|
|
|
func (s *States) Value() (driver.Value, error) {
|
|
last := len(*s)
|
|
if last == 0 {
|
|
return "{}", nil
|
|
}
|
|
result := strings.Builder{}
|
|
_, err := result.WriteString("{")
|
|
if err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
for i, state := range *s {
|
|
if _, err := fmt.Fprintf(&result, "%d", state); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
if i < last-1 {
|
|
if _, err := result.WriteString(","); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
}
|
|
}
|
|
if _, err := result.WriteString("}"); err != nil {
|
|
return "", fmt.Errorf("could not write to buffer: %s", err)
|
|
}
|
|
return result.String(), nil
|
|
}
|
|
|
|
func (s *States) Scan(src interface{}) error {
|
|
switch src := src.(type) {
|
|
case []byte:
|
|
tmp := bytes.Trim(src, "{}")
|
|
states := bytes.Split(tmp, []byte(","))
|
|
result := make([]int, len(states))
|
|
for i, state := range states {
|
|
var err error
|
|
result[i], err = strconv.Atoi(string(state))
|
|
if err != nil {
|
|
return fmt.Errorf("could not parse element %s: %s", state, err)
|
|
}
|
|
}
|
|
*s = result
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("could not convert %T to states", src)
|
|
}
|
|
}
|
|
|
|
// Append prepends the new state before all others.
|
|
func (s *States) Add(state int) {
|
|
vals := *s
|
|
statePos := 5
|
|
if len(vals) < 6 {
|
|
statePos = len(vals)
|
|
}
|
|
*s = append([]int{state}, vals[:statePos]...)
|
|
return
|
|
}
|
|
|
|
// ToOK returns true when the state returns from != 0 to 0.
|
|
func (s *States) ToOK() bool {
|
|
vals := *s
|
|
if len(vals) == 0 {
|
|
return false
|
|
}
|
|
if len(vals) <= 1 {
|
|
return vals[0] == 0
|
|
}
|
|
if vals[0] == 0 && vals[1] > 0 {
|
|
return true
|
|
}
|
|
return false
|
|
}
|