Copyright 2021, 2022 Red Hat, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
|
package push
|
Documentation in literate-programming-style is available at:
https://redhatinsights.github.io/insights-operator-utils/packages/metrics/push/push.html
|
import (
"context"
"errors"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/rs/zerolog/log"
)
|
collectors stores the prometheus collectors that will be pushed to the push gateway
|
var collectors [ ] prometheus . Collector
|
UnregisterMetrics unregister all prometheus collectors safe in the collectors variable
|
func UnregisterMetrics ( ) error {
var errMsg = "cannot unregister metric"
log . Debug ( ) .
Int ( "total" , len ( collectors ) ) .
Int ( "progress" , 0 ) .
Msg ( "Unregistering metrics" )
for i , c := range collectors {
if ok := prometheus . Unregister ( c ) ; ! ok {
log . Warn ( ) . Msg ( errMsg )
return errors . New ( errMsg )
}
log . Debug ( ) . Msg ( "metric unregistered" )
log . Debug ( ) .
Int ( "total" , len ( collectors ) ) .
Int ( "progress" , i + 1 ) .
Msg ( "Unregistering metrics" )
}
collectors = [ ] prometheus . Collector { }
return nil
}
|
InitMetrics fills the collector variables with some Prometheus metrics and automatically registers them.
|
func InitMetrics ( initFunctions [ ] func ( ) ( prometheus . Collector , error ) ) ( err error ) {
|
Reset the collector slice
|
if len ( collectors ) > 0 {
if err := UnregisterMetrics ( ) ; err != nil {
return err
}
}
collectors = [ ] prometheus . Collector { }
for _ , f := range initFunctions {
if coll , err := f ( ) ; err == nil {
collectors = append ( collectors , coll )
} else {
return err
}
}
return nil
}
|
SendMetrics pushes the metrics to the configured prometheus push gateway
|
func SendMetrics ( job , gatewayURL , gatewayAuthToken string ) error {
|
Creates a pusher to the gateway "$PUSHGWURL/metrics/job/$(jobname)
|
log . Debug ( ) .
Str ( "Job" , job ) .
Str ( "url" , gatewayURL ) .
Msg ( "Pushing metrics" )
pusher := push . New ( gatewayURL , job )
err := pushCollectors ( pusher , gatewayAuthToken , collectors )
if err != nil {
log . Err ( err ) . Msg ( "Couldn't push prometheus metrics" )
return err
}
log . Info ( ) . Msg ( "Metrics pushed successfully." )
return nil
}
|
SendMetricsInLoop pushes the metrics in a loop until context is done
|
func SendMetricsInLoop ( ctx context . Context , job , gatewayURL , gatewayAuthToken string , timeBetweenPush time . Duration ) {
if timeBetweenPush < time . Second * 1 {
log . Warn ( ) . Msgf ( "You are trying to push the metrics every %f seconds. This may overload the push gateway, so this operation is blocked." , timeBetweenPush . Seconds ( ) )
return
}
ticker := time . NewTicker ( timeBetweenPush )
for {
select {
case <- ticker . C :
log . Debug ( ) . Msg ( "Pushing metrics" )
_ = SendMetrics ( job , gatewayURL , gatewayAuthToken )
case <- ctx . Done ( ) :
return
}
}
}
|