Implement cloud_build_status metric (#823)
This metric continuously monitors Cloud Build builds via Cloud Pub/Sub and records their results.
Whenever the metric's Collect() method is invoked, the most recent unprocessed results will be returned.
The metric also supports the Stackdriver publisher, which allows us to use Stackdriver alerting in the future.
It uses one metric type per source (=repo + branch), and publishes a simple boolean (true = build success).
diff --git a/metrics/README.md b/metrics/README.md
index 78ee93e..94cb52b 100644
--- a/metrics/README.md
+++ b/metrics/README.md
@@ -11,6 +11,7 @@
CREATE TABLE build_success (org VARCHAR(255), pipeline VARCHAR(255), build INT, linux VARCHAR(255), macos VARCHAR(255), windows VARCHAR(255), rbe VARCHAR(255), PRIMARY KEY(org, pipeline, build));
CREATE TABLE builds_per_change (org VARCHAR(255), pipeline VARCHAR(255), changelist INT, builds INT, PRIMARY KEY(org, pipeline, changelist));
+CREATE TABLE cloud_build_status (timestamp DATETIME, build VARCHAR(255), source VARCHAR(255), success BOOL, PRIMARY KEY(timestamp, build));
CREATE TABLE critical_path (org VARCHAR(255), pipeline VARCHAR(255), build INT, wait_time_seconds FLOAT, run_time_seconds FLOAT, longest_task_name VARCHAR(255), longest_task_time_seconds FLOAT, result VARCHAR(255), PRIMARY KEY(org, pipeline, build));
CREATE TABLE flakiness (org VARCHAR(255), pipeline VARCHAR(255), build INT, target VARCHAR(255), passed_count INT, failed_count INT, PRIMARY KEY(org, pipeline, build, target));
CREATE TABLE mac_performance (org VARCHAR(255), pipeline VARCHAR(255), build INT, wait_time_seconds FLOAT, run_time_seconds FLOAT, skipped BOOL, PRIMARY KEY(org, pipeline, build));
@@ -23,6 +24,20 @@
CREATE TABLE zombie_instances (cloud_project VARCHAR(255), zone VARCHAR(255), instance VARCHAR(255), status VARCHAR(255), seconds_online FLOAT, timestamp DATETIME, PRIMARY KEY(cloud_project, zone, instance));
```
+## PubSub Setup for Cloud Build Status
+
+The `cloud_build_status` metric requires a PubSub subscription to the `cloud-builds` topic in the `bazel-public` project.
+Moreover, the service account needs to have `Pub/Sub Subscriber` permissions in the `bazel-public` project.
+
+Run the following commands to see if there is already a subscription:
+
+- `gcloud config set project bazel-public`
+- `gcloud pubsub subscriptions list | grep build-status`
+
+The output should contain `projects/bazel-public/subscriptions/build-status`. If that's not the case, please run
+
+- `gcloud pubsub subscriptions create build-status --topic cloud-builds`
+
## Service Deployment
Make sure you have access to the `staging.bazel-untrusted.appspot.com` GCS bucket, then run:
diff --git a/metrics/main.go b/metrics/main.go
index 7e92489..a9d6798 100644
--- a/metrics/main.go
+++ b/metrics/main.go
@@ -1,6 +1,7 @@
package main
import (
+ "context"
"flag"
"fmt"
"log"
@@ -130,6 +131,13 @@
zombieInstances := metrics.CreateZombieInstances(computeClient, settings.CloudProjects, bk, settings.BuildkiteOrgs, minutes(3))
srv.AddMetric(zombieInstances, minutes(5), defaultPublisher)
+ ctx := context.Background()
+ cloudBuildStatus, err := metrics.CreateCloudBuildStatus(ctx, settings.CloudBuildProject, settings.CloudBuildSubscription)
+ if err != nil {
+ log.Fatalf("Failed to set up CloudBuildStatus metric: %v", err)
+ }
+ srv.AddMetric(cloudBuildStatus, minutes(5), defaultPublisher, stackdriver)
+
if *testMode {
logInTestMode("Running all jobs exactly once...")
srv.RunJobsOnce()
diff --git a/metrics/metrics/cloud_build_status.go b/metrics/metrics/cloud_build_status.go
new file mode 100644
index 0000000..48fb585
--- /dev/null
+++ b/metrics/metrics/cloud_build_status.go
@@ -0,0 +1,244 @@
+package metrics
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "log"
+ "strings"
+ "sync"
+ "time"
+
+ pubsub "cloud.google.com/go/pubsub/apiv1"
+ "github.com/bazelbuild/continuous-integration/metrics/data"
+ timestamp "github.com/golang/protobuf/ptypes/timestamp"
+ metricpb "google.golang.org/genproto/googleapis/api/metric"
+ monitoredres "google.golang.org/genproto/googleapis/api/monitoredres"
+ monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
+ pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
+)
+
+const (
+ cloudBuildBaseMetricType = "custom.googleapis.com/bazel/cloudbuild"
+)
+
+var (
+ buildSuccessState = "SUCCESS"
+)
+
+type buildResult struct {
+ ID string `json:"id"`
+ ProjectID string `json:"projectId"`
+ Status string `json:"status"`
+ FinishTime *time.Time `json:"finishTime"`
+ Source struct {
+ RepoSource struct {
+ RepoName string `json:"repoName"`
+ BranchName string `json:"branchName"`
+ } `json:"repoSource"`
+ } `json:"source"`
+}
+
+func (br *buildResult) toRow() (*cloudBuildStatusRow, error) {
+ if br.FinishTime == nil {
+ return nil, fmt.Errorf("build %s hasn't finished yet", br.ID)
+ }
+ src := br.Source.RepoSource
+ return &cloudBuildStatusRow{
+ ts: *br.FinishTime,
+ build: br.ID,
+ repo: src.RepoName,
+ branch: src.BranchName,
+ success: br.success(),
+ }, nil
+}
+
+func (br *buildResult) success() bool {
+ return br.Status == buildSuccessState
+}
+
+func (br *buildResult) finished() bool {
+ return br.FinishTime != nil
+}
+
+type CloudBuildStatus struct {
+ subscriber *pubsub.SubscriberClient
+ subscription string
+ columns []Column
+
+ mux sync.Mutex
+ results []*buildResult
+ errors []string
+}
+
+func (cbs *CloudBuildStatus) Name() string {
+ return "cloud_build_status"
+}
+
+func (cbs *CloudBuildStatus) Columns() []Column {
+ return cbs.columns
+}
+
+func (*CloudBuildStatus) Type() MetricType {
+ return TimeBasedMetric
+}
+
+func (*CloudBuildStatus) RelevantDelta() int {
+ return 2 * 24 * 60 * 60 // Two days in seconds
+}
+
+// CREATE TABLE cloud_build_status (timestamp DATETIME, build VARCHAR(255), source VARCHAR(255), success BOOL, PRIMARY KEY(timestamp, build));
+func CreateCloudBuildStatus(ctx context.Context, projectID, subscriptionID string) (*CloudBuildStatus, error) {
+ subscriber, err := pubsub.NewSubscriberClient(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ columns := []Column{Column{"timestamp", true}, Column{"build", true}, Column{"source", false}, Column{"success", false}}
+ subscription := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
+ results := make([]*buildResult, 0)
+ errors := make([]string, 0)
+ cbs := &CloudBuildStatus{
+ subscriber: subscriber,
+ subscription: subscription,
+ columns: columns,
+ results: results,
+ errors: errors,
+ }
+ go cbs.listen(ctx)
+ return cbs, nil
+}
+
+func (cbs *CloudBuildStatus) listen(ctx context.Context) {
+ req := pubsubpb.PullRequest{
+ Subscription: cbs.subscription,
+ MaxMessages: 10,
+ }
+
+ for {
+ res, err := cbs.subscriber.Pull(ctx, &req)
+ if err != nil {
+ cbs.recordError(err)
+ continue
+ }
+
+ for _, m := range res.ReceivedMessages {
+ if err := cbs.handleMessage(m.Message.Data); err != nil {
+ cbs.recordError(err)
+ }
+ err := cbs.subscriber.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{
+ Subscription: cbs.subscription,
+ AckIds: []string{m.AckId},
+ })
+ if err != nil {
+ cbs.recordError(err)
+ }
+ }
+ }
+}
+
+func (cbs *CloudBuildStatus) handleMessage(data []byte) error {
+ result := new(buildResult)
+ if err := json.Unmarshal(data, result); err != nil {
+ return fmt.Errorf("invalid JSON message: %v", err)
+ }
+
+ if result.finished() {
+ cbs.mux.Lock()
+ cbs.results = append(cbs.results, result)
+ cbs.mux.Unlock()
+ }
+ return nil
+}
+
+func (cbs *CloudBuildStatus) recordError(err error) {
+ cbs.mux.Lock()
+ cbs.errors = append(cbs.errors, err.Error())
+ cbs.mux.Unlock()
+}
+
+func (cbs *CloudBuildStatus) Collect() (data.DataSet, error) {
+ cbs.mux.Lock()
+ defer cbs.mux.Unlock()
+
+ if len(cbs.errors) > 0 {
+ err := fmt.Errorf("failed to collect data due to previous errors:\n%s", strings.Join(cbs.errors, "\n"))
+ cbs.errors = make([]string, 0)
+ return nil, err
+ }
+ result := &cloudBuildStatusSet{headers: GetColumnNames(cbs.columns)}
+ if len(cbs.results) > 0 {
+ for _, r := range cbs.results {
+ if row, err := r.toRow(); err != nil {
+ return nil, err
+ } else {
+ result.rows = append(result.rows, row)
+ }
+ }
+ cbs.results = make([]*buildResult, 0)
+ }
+ return result, nil
+}
+
+// TODO(fweikert): refactor Stackdriver code here and in platform_load
+type cloudBuildStatusRow struct {
+ ts time.Time
+ repo string
+ branch string
+ build string
+ success bool
+}
+
+type cloudBuildStatusSet struct {
+ headers []string
+ rows []*cloudBuildStatusRow
+}
+
+func (s *cloudBuildStatusSet) GetData() *data.LegacyDataSet {
+ rawSet := data.CreateDataSet(s.headers)
+ for _, row := range s.rows {
+ source := fmt.Sprintf("%s/%s", row.repo, row.branch)
+ rawRow := []interface{}{row.ts, row.build, source, row.success}
+ rawSet.Data = append(rawSet.Data, rawRow)
+ }
+ return rawSet
+}
+
+func (s *cloudBuildStatusSet) CreateTimeSeriesRequest(projectID string) *monitoringpb.CreateTimeSeriesRequest {
+ series := make([]*monitoringpb.TimeSeries, len(s.rows))
+ for i, row := range s.rows {
+ series[i] = row.createTimeSeries()
+ }
+ return &monitoringpb.CreateTimeSeriesRequest{
+ Name: "projects/" + projectID,
+ TimeSeries: series,
+ }
+}
+
+func (r *cloudBuildStatusRow) createTimeSeries() *monitoringpb.TimeSeries {
+ ts := ×tamp.Timestamp{
+ Seconds: r.ts.Unix(),
+ }
+ t := fmt.Sprintf("%s/%s/%s", cloudBuildBaseMetricType, r.repo, r.branch)
+ t = strings.Replace(t, "-", "_", -1)
+ log.Printf("Publishing time series for metric '%s'\n", t)
+ return &monitoringpb.TimeSeries{
+ Metric: &metricpb.Metric{
+ Type: t,
+ },
+ Resource: &monitoredres.MonitoredResource{
+ Type: "global",
+ },
+ Points: []*monitoringpb.Point{{
+ Interval: &monitoringpb.TimeInterval{
+ StartTime: ts,
+ EndTime: ts,
+ },
+ Value: &monitoringpb.TypedValue{
+ Value: &monitoringpb.TypedValue_BoolValue{
+ BoolValue: r.success,
+ },
+ },
+ }},
+ }
+}
diff --git a/metrics/publishers/stackdriver.go b/metrics/publishers/stackdriver.go
index 5c3125f..a4a528a 100644
--- a/metrics/publishers/stackdriver.go
+++ b/metrics/publishers/stackdriver.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "log"
"github.com/bazelbuild/continuous-integration/metrics/clients"
"github.com/bazelbuild/continuous-integration/metrics/data"
@@ -29,7 +30,13 @@
return fmt.Errorf("Metric '%s' does not produce a valid StackDriverTimeSeriesDataSet instance", metricName)
}
- err := sd.client.WriteTimeSeries(set.CreateTimeSeriesRequest(sd.projectID))
+ req := set.CreateTimeSeriesRequest(sd.projectID)
+ if len(req.TimeSeries) == 0 {
+ log.Printf("No new data points for metric %s\n", metric.Name())
+ return nil
+ }
+
+ err := sd.client.WriteTimeSeries(req)
if err != nil {
return fmt.Errorf("Could not write time series for metric '%s' in project '%s': %v", metricName, sd.projectID, err)
}
diff --git a/metrics/settings.go b/metrics/settings.go
index 5c87881..10bd0d8 100644
--- a/metrics/settings.go
+++ b/metrics/settings.go
@@ -26,6 +26,8 @@
CloudSqlDatabase string
CloudSqlLocalPort int
CloudProjects []string
+ CloudBuildProject string
+ CloudBuildSubscription string
}
func ReadSettingsFromDatastore(projectID, settingsName string) (*Settings, error) {