support new checker api
With this, it is now possible to support multiple check instances and balance the load.
This commit is contained in:
parent
88b50033d0
commit
e2b479c34f
@ -32,6 +32,7 @@ type (
|
|||||||
Wait string `json:"wait"`
|
Wait string `json:"wait"`
|
||||||
Path []string `json:"path"`
|
Path []string `json:"path"`
|
||||||
Workers int `json:"workers"`
|
Workers int `json:"workers"`
|
||||||
|
CheckerID int `json:"checker_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
States []int
|
States []int
|
||||||
@ -73,14 +74,14 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < config.Workers; i++ {
|
for i := 0; i < config.Workers; i++ {
|
||||||
go check(i, db, waitDuration, timeout, hostname)
|
go check(i, db, waitDuration, timeout, hostname, config.CheckerID)
|
||||||
}
|
}
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string) {
|
func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname string, checker_id int) {
|
||||||
for {
|
for {
|
||||||
tx, err := db.Begin()
|
tx, err := db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -91,14 +92,10 @@ func check(thread int, db *sql.DB, waitDuration, timeout time.Duration, hostname
|
|||||||
from active_checks
|
from active_checks
|
||||||
where next_time < now()
|
where next_time < now()
|
||||||
and enabled
|
and enabled
|
||||||
|
and checker_id = $1
|
||||||
order by next_time
|
order by next_time
|
||||||
for update skip locked
|
for update skip locked
|
||||||
limit 1;`)
|
limit 1;`, checker_id)
|
||||||
if err != nil {
|
|
||||||
log.Printf("[%d] could not start query: %s", thread, err)
|
|
||||||
tx.Rollback()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var (
|
var (
|
||||||
id int64
|
id int64
|
||||||
cmdLine []string
|
cmdLine []string
|
||||||
|
@ -226,13 +226,13 @@ var (
|
|||||||
where c.last_refresh < c.updated or c.last_refresh is null
|
where c.last_refresh < c.updated or c.last_refresh is null
|
||||||
limit 1
|
limit 1
|
||||||
for update of c skip locked;`
|
for update of c skip locked;`
|
||||||
SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, msg, mapping_id)
|
SQLRefreshActiveCheck = `insert into active_checks(check_id, cmdline, intval, enabled, msg, mapping_id, checker_id)
|
||||||
select c.id, $2, c.intval, c.enabled, case when ac.msg is null then '' else ac.msg end, case when c.mapping_id is not null then c.mapping_id when n.mapping_id is not null then n.mapping_id else 1 end
|
select c.id, $2, c.intval, c.enabled, case when ac.msg is null then '' else ac.msg end, case when c.mapping_id is not null then c.mapping_id when n.mapping_id is not null then n.mapping_id else 1 end, c.checker_id
|
||||||
from checks c
|
from checks c
|
||||||
left join active_checks ac on c.id = ac.check_id
|
left join active_checks ac on c.id = ac.check_id
|
||||||
left join nodes n on c.node_id = n.id
|
left join nodes n on c.node_id = n.id
|
||||||
where c.id = $1
|
where c.id = $1
|
||||||
on conflict(check_id)
|
on conflict(check_id)
|
||||||
do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, mapping_id = excluded.mapping_id;`
|
do update set cmdline = $2, intval = excluded.intval, enabled = excluded.enabled, mapping_id = excluded.mapping_id, checker_id = excluded.checker_id;`
|
||||||
SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;`
|
SQLUpdateLastRefresh = `update checks set last_refresh = now() where id = $1;`
|
||||||
)
|
)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
{
|
{
|
||||||
"db": "user=moncheck dbname=monzero",
|
"db": "user=moncheck dbname=monzero",
|
||||||
|
"checker_id": 1,
|
||||||
"timeout": "5s",
|
"timeout": "5s",
|
||||||
"wait": "5s",
|
"wait": "5s",
|
||||||
"path": [
|
"path": [
|
||||||
|
Loading…
Reference in New Issue
Block a user