2018-11-16 10:39:21 +01:00
package main
import (
"bytes"
"context"
"database/sql"
2018-11-18 21:38:04 +01:00
"database/sql/driver"
2018-11-16 10:39:21 +01:00
"encoding/json"
"flag"
2018-11-18 21:38:04 +01:00
"fmt"
2018-11-16 10:39:21 +01:00
"io/ioutil"
"log"
"os/exec"
2018-11-18 21:38:04 +01:00
"strconv"
"strings"
2018-11-16 10:39:21 +01:00
"sync"
"syscall"
"time"
"github.com/lib/pq"
)
var (
configPath = flag . String ( "config" , "moncheck.conf" , "path to the config file" )
)
type (
Config struct {
DB string ` json:"db" `
Wait string ` json:"wait" `
}
2018-11-18 21:38:04 +01:00
States [ ] int
2018-11-16 10:39:21 +01:00
)
func main ( ) {
flag . Parse ( )
raw , err := ioutil . ReadFile ( * configPath )
if err != nil {
log . Fatalf ( "could not read config: %s" , err )
}
config := Config { }
if err := json . Unmarshal ( raw , & config ) ; err != nil {
log . Fatalf ( "could not parse config: %s" , err )
}
waitDuration , err := time . ParseDuration ( config . Wait )
if err != nil {
log . Fatalf ( "could not parse wait duration: %s" , err )
}
db , err := sql . Open ( "postgres" , config . DB )
if err != nil {
log . Fatalf ( "could not open database connection: %s" , err )
}
for i := 0 ; i < 25 ; i ++ {
go check ( i , db , waitDuration )
}
wg := sync . WaitGroup { }
wg . Add ( 1 )
wg . Wait ( )
}
func check ( thread int , db * sql . DB , waitDuration time . Duration ) {
for {
tx , err := db . Begin ( )
if err != nil {
log . Printf ( "[%d] could not start transaction: %s" , thread , err )
continue
}
2018-11-16 20:09:31 +01:00
rows , err := tx . Query ( "select check_id, cmdLine, states, notify from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;" )
2018-11-16 10:39:21 +01:00
if err != nil {
log . Printf ( "[%d] could not start query: %s" , thread , err )
tx . Rollback ( )
continue
}
var (
id int64
cmdLine [ ] string
2018-11-18 21:38:04 +01:00
states States
2018-11-16 10:39:21 +01:00
notify bool
)
found := false
for rows . Next ( ) {
if err := rows . Err ( ) ; err != nil {
log . Printf ( "could not fetch row: %s" , err )
tx . Rollback ( )
break
}
2018-11-18 21:38:04 +01:00
if err := rows . Scan ( & id , pq . Array ( & cmdLine ) , & states , & notify ) ; err != nil {
2018-11-16 10:39:21 +01:00
log . Printf ( "could not scan values: %s" , err )
tx . Rollback ( )
break
}
found = true
}
if ! found {
time . Sleep ( waitDuration )
tx . Rollback ( )
continue
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , 1 * time . Second )
cmd := exec . CommandContext ( ctx , cmdLine [ 0 ] , cmdLine [ 1 : ] ... )
output := bytes . NewBuffer ( [ ] byte { } )
cmd . Stdout = output
cmd . Stderr = output
2018-11-18 21:38:04 +01:00
err = cmd . Run ( )
if err != nil && ctx . Err ( ) == context . DeadlineExceeded {
2018-11-16 10:39:21 +01:00
log . Printf ( "[%d] check took too long: %s" , id , err )
2018-11-18 21:38:04 +01:00
cancel ( )
2018-11-16 10:39:21 +01:00
// TODO which state to choose?
// TODO add notification handler
// TODO all this casting should be done better
2018-11-18 21:38:04 +01:00
states . Add ( 99 )
output . Write ( [ ] byte ( ctx . Err ( ) . Error ( ) ) )
2018-11-16 10:39:21 +01:00
} else if err != nil {
cancel ( )
2018-11-18 21:38:04 +01:00
status , ok := cmd . ProcessState . Sys ( ) . ( syscall . WaitStatus )
2018-11-16 10:39:21 +01:00
if ! ok {
log . Printf ( "[%d]error running check: %s" , id , err )
2018-11-18 21:38:04 +01:00
states . Add ( 1 )
2018-11-16 10:39:21 +01:00
} else {
2018-11-18 21:38:04 +01:00
log . Printf ( "%s" , cmd . ProcessState . String ( ) )
states . Add ( status . ExitStatus ( ) )
2018-11-16 10:39:21 +01:00
}
} else {
cancel ( )
2018-11-18 21:38:04 +01:00
states . Add ( 0 )
2018-11-16 10:39:21 +01:00
}
2018-11-18 21:38:04 +01:00
msg := output . String ( )
2018-11-16 10:39:21 +01:00
2018-11-18 21:38:04 +01:00
if _ , err := tx . Exec ( "update active_checks set next_time = now() + intval, states = $2 where check_id = $1" , id , & states ) ; err != nil {
2018-11-16 10:39:21 +01:00
log . Printf ( "[%d] could not update row '%d': %s" , thread , id , err )
2018-11-16 20:09:31 +01:00
tx . Rollback ( )
continue
2018-11-16 10:39:21 +01:00
}
2018-11-18 21:38:04 +01:00
log . Printf ( "meh %s" , output . String ( ) )
2018-11-16 10:39:21 +01:00
if notify {
2018-11-18 21:38:04 +01:00
if _ , err := tx . Exec ( "insert into notifications(check_id, states, output) values ($1, $2, $3);" , & id , & states , & msg ) ; err != nil {
2018-11-16 10:39:21 +01:00
log . Printf ( "[%d] could not create notification for '%d': %s" , thread , id , err )
2018-11-16 20:09:31 +01:00
tx . Rollback ( )
continue
2018-11-16 10:39:21 +01:00
}
}
tx . Commit ( )
}
}
2018-11-18 21:38:04 +01:00
func ( s * States ) Value ( ) ( driver . Value , error ) {
last := len ( * s )
if last == 0 {
return "{}" , nil
}
result := strings . Builder { }
_ , err := result . WriteString ( "{" )
if err != nil {
return "" , fmt . Errorf ( "could not write to buffer: %s" , err )
}
for i , state := range * s {
if _ , err := fmt . Fprintf ( & result , "%d" , state ) ; err != nil {
return "" , fmt . Errorf ( "could not write to buffer: %s" , err )
}
if i < last - 1 {
if _ , err := result . WriteString ( "," ) ; err != nil {
return "" , fmt . Errorf ( "could not write to buffer: %s" , err )
}
}
}
if _ , err := result . WriteString ( "}" ) ; err != nil {
return "" , fmt . Errorf ( "could not write to buffer: %s" , err )
}
return result . String ( ) , nil
}
func ( s * States ) Scan ( src interface { } ) error {
switch src := src . ( type ) {
case [ ] byte :
tmp := bytes . Trim ( src , "{}" )
states := bytes . Split ( tmp , [ ] byte ( "," ) )
result := make ( [ ] int , len ( states ) )
for i , state := range states {
var err error
result [ i ] , err = strconv . Atoi ( string ( state ) )
if err != nil {
return fmt . Errorf ( "could not parse element %s: %s" , state , err )
}
}
* s = result
return nil
default :
return fmt . Errorf ( "could not convert %T to states" , src )
}
}
// Append prepends the new state before all others.
func ( s * States ) Add ( state int ) {
vals := * s
statePos := 5
if len ( vals ) < 6 {
statePos = len ( vals )
}
* s = append ( [ ] int { state } , vals [ : statePos ] ... )
return
}