2018-11-16 10:39:21 +01:00
package main
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"flag"
"io/ioutil"
"log"
"os/exec"
"sync"
"syscall"
"time"
"github.com/lib/pq"
)
var (
configPath = flag . String ( "config" , "moncheck.conf" , "path to the config file" )
)
type (
Config struct {
DB string ` json:"db" `
Wait string ` json:"wait" `
}
)
func main ( ) {
flag . Parse ( )
raw , err := ioutil . ReadFile ( * configPath )
if err != nil {
log . Fatalf ( "could not read config: %s" , err )
}
config := Config { }
if err := json . Unmarshal ( raw , & config ) ; err != nil {
log . Fatalf ( "could not parse config: %s" , err )
}
waitDuration , err := time . ParseDuration ( config . Wait )
if err != nil {
log . Fatalf ( "could not parse wait duration: %s" , err )
}
db , err := sql . Open ( "postgres" , config . DB )
if err != nil {
log . Fatalf ( "could not open database connection: %s" , err )
}
for i := 0 ; i < 25 ; i ++ {
go check ( i , db , waitDuration )
}
wg := sync . WaitGroup { }
wg . Add ( 1 )
wg . Wait ( )
}
func check ( thread int , db * sql . DB , waitDuration time . Duration ) {
for {
tx , err := db . Begin ( )
if err != nil {
log . Printf ( "[%d] could not start transaction: %s" , thread , err )
continue
}
2018-11-16 20:09:31 +01:00
rows , err := tx . Query ( "select check_id, cmdLine, states, notify from active_checks where next_time < now() and enabled order by next_time for update skip locked limit 1;" )
2018-11-16 10:39:21 +01:00
if err != nil {
log . Printf ( "[%d] could not start query: %s" , thread , err )
tx . Rollback ( )
continue
}
var (
id int64
cmdLine [ ] string
states [ ] int64
notify bool
)
found := false
for rows . Next ( ) {
if err := rows . Err ( ) ; err != nil {
log . Printf ( "could not fetch row: %s" , err )
tx . Rollback ( )
break
}
if err := rows . Scan ( & id , pq . Array ( & cmdLine ) , pq . Array ( & states ) , & notify ) ; err != nil {
log . Printf ( "could not scan values: %s" , err )
tx . Rollback ( )
break
}
found = true
}
if ! found {
time . Sleep ( waitDuration )
tx . Rollback ( )
continue
}
statePos := 5
if len ( states ) < 6 {
statePos = len ( states )
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , 1 * time . Second )
cmd := exec . CommandContext ( ctx , cmdLine [ 0 ] , cmdLine [ 1 : ] ... )
output := bytes . NewBuffer ( [ ] byte { } )
cmd . Stdout = output
cmd . Stderr = output
if err != nil && err == context . Canceled {
log . Printf ( "[%d] check took too long: %s" , id , err )
// TODO which state to choose?
// TODO add notification handler
// TODO all this casting should be done better
states = append ( [ ] int64 { 1 } , states [ : statePos ] ... )
} else if err != nil {
cancel ( )
exitErr , ok := err . ( * exec . ExitError )
if ! ok {
log . Printf ( "[%d]error running check: %s" , id , err )
states = append ( [ ] int64 { 1 } , states [ : statePos ] ... )
} else {
status , ok := exitErr . Sys ( ) . ( syscall . WaitStatus )
if ! ok {
} else {
states = append ( [ ] int64 { int64 ( status . ExitStatus ( ) ) } , states [ : statePos ] ... )
}
}
} else {
cancel ( )
states = append ( [ ] int64 { 0 } , states [ : statePos ] ... )
}
2018-11-16 20:09:31 +01:00
if _ , err := tx . Exec ( "update active_checks set next_time = now() + intval, states = $2 where check_id = $1" , id , pq . Array ( & states ) ) ; err != nil {
2018-11-16 10:39:21 +01:00
log . Printf ( "[%d] could not update row '%d': %s" , thread , id , err )
2018-11-16 20:09:31 +01:00
tx . Rollback ( )
continue
2018-11-16 10:39:21 +01:00
}
if notify {
2018-11-16 20:09:31 +01:00
if _ , err := tx . Exec ( "insert into notifications(check_id, states, output) values ($1, $2, $3);" , & id , pq . Array ( & states ) , output . Bytes ( ) ) ; err != nil {
2018-11-16 10:39:21 +01:00
log . Printf ( "[%d] could not create notification for '%d': %s" , thread , id , err )
2018-11-16 20:09:31 +01:00
tx . Rollback ( )
continue
2018-11-16 10:39:21 +01:00
}
}
tx . Commit ( )
}
}