| package clients |
| |
| import ( |
| "fmt" |
| "log" |
| "sync" |
| "time" |
| |
| "github.com/bazelbuild/continuous-integration/metrics/data" |
| "github.com/buildkite/go-buildkite/buildkite" |
| ) |
| |
| type BuildkiteClient interface { |
| GetMostRecentBuilds(*data.PipelineID, int) ([]buildkite.Build, error) |
| GetAgents(string) ([]buildkite.Agent, error) |
| } |
| |
| type cacheEntry struct { |
| sampled time.Time |
| values []interface{} |
| } |
| |
| type Clock interface { |
| CurrentTime() time.Time |
| } |
| |
| type DefaultClock struct{} |
| |
| func (DefaultClock) CurrentTime() time.Time { |
| return time.Now() |
| } |
| |
| type CachedBuildkiteClient struct { |
| api BuildkiteAPI |
| |
| mu sync.Mutex |
| cacheTimeout time.Duration |
| agentCache map[string]cacheEntry |
| buildCache map[string]cacheEntry |
| clock Clock |
| } |
| |
| func CreateCachedBuildkiteClient(api BuildkiteAPI, cacheTimeout time.Duration) *CachedBuildkiteClient { |
| return &CachedBuildkiteClient{ |
| api: api, |
| cacheTimeout: cacheTimeout, |
| agentCache: make(map[string]cacheEntry), |
| buildCache: make(map[string]cacheEntry), |
| clock: DefaultClock{}, |
| } |
| } |
| |
| func (client *CachedBuildkiteClient) GetMostRecentBuilds(pipeline *data.PipelineID, atLeastNBuilds int) ([]buildkite.Build, error) { |
| var listFunc func(int, int) ([]buildkite.Build, int, error) |
| if pipeline.Slug == "all" { |
| listFunc = func(page, perPage int) ([]buildkite.Build, int, error) { |
| return client.api.ListBuildyByOrg(pipeline.Org, page, perPage) |
| } |
| } else { |
| listFunc = func(page, perPage int) ([]buildkite.Build, int, error) { |
| return client.api.ListBuildsByPipeline(pipeline.Org, pipeline.Slug, page, perPage) |
| } |
| } |
| |
| wrapperFunc := func(page, perPage int) ([]interface{}, int, error) { |
| builds, lastPage, err := listFunc(page, perPage) |
| interfaces := make([]interface{}, len(builds)) |
| for i, b := range builds { |
| interfaces[i] = b |
| } |
| return interfaces, lastPage, err |
| } |
| |
| results, err := client.getResults(wrapperFunc, atLeastNBuilds, client.buildCache, pipeline.String()) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to retrieve builds for pipeline %s: %v", pipeline, err) |
| } |
| |
| builds := make([]buildkite.Build, len(results)) |
| for i, r := range results { |
| builds[i] = r.(buildkite.Build) |
| } |
| return builds, nil |
| } |
| |
| func (client *CachedBuildkiteClient) GetAgents(org string) ([]buildkite.Agent, error) { |
| list := func(page, perPage int) ([]interface{}, int, error) { |
| agents, lastPage, err := client.api.ListAgents(org, page, perPage) |
| interfaces := make([]interface{}, len(agents)) |
| for i, a := range agents { |
| interfaces[i] = a |
| } |
| return interfaces, lastPage, err |
| } |
| |
| results, err := client.getResults(list, -1, client.agentCache, org) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to retrieve agents: %v", err) |
| } |
| |
| agents := make([]buildkite.Agent, len(results)) |
| for i, r := range results { |
| agents[i] = r.(buildkite.Agent) |
| } |
| return agents, nil |
| } |
| |
| func (client *CachedBuildkiteClient) getResults(listFunc func(int, int) ([]interface{}, int, error), lastN int, cache map[string]cacheEntry, cacheKey string) ([]interface{}, error) { |
| client.mu.Lock() |
| defer client.mu.Unlock() |
| if entry, ok := cache[cacheKey]; ok { |
| if client.clock.CurrentTime().Sub(entry.sampled) <= client.cacheTimeout { |
| if lastN < 0 { |
| return entry.values[:], nil |
| } else if lastN <= len(entry.values) { |
| return entry.values[:lastN], nil |
| } |
| } |
| delete(cache, cacheKey) |
| } |
| |
| results, err := client.getUncachedResults(listFunc, lastN, cacheKey) |
| if err != nil { |
| return nil, err |
| } |
| cache[cacheKey] = cacheEntry{ |
| values: results, |
| sampled: client.clock.CurrentTime(), |
| } |
| return results, nil |
| } |
| |
| func (client *CachedBuildkiteClient) getUncachedResults(listFunc func(int, int) ([]interface{}, int, error), lastN int, cacheKey string) ([]interface{}, error) { |
| all_results := make([]interface{}, 0) |
| perPage := 100 |
| if 0 < lastN && lastN < perPage { |
| perPage = lastN |
| } |
| currPage := 1 |
| lastPage := 1 |
| |
| for currPage <= lastPage { |
| log.Printf("Buildkite: Fetching page %d for '%s' (last=%d).\n", currPage, cacheKey, lastPage) |
| var results []interface{} |
| var err error |
| results, lastPage, err = listFunc(currPage, perPage) |
| if err != nil { |
| return nil, fmt.Errorf("Could not get page %d: %v", currPage, err) |
| } |
| |
| all_results = append(all_results, results...) |
| currPage += 1 |
| |
| if lastN > -1 && len(all_results) >= lastN { |
| break |
| } |
| } |
| |
| if 0 < lastN && lastN < len(all_results) { |
| all_results = all_results[:lastN] |
| } |
| |
| return all_results, nil |
| } |
| |
| func (client *CachedBuildkiteClient) setClock(clock Clock) { |
| client.clock = clock |
| } |