blob: 4b04ba05ae0305e4fd976970cc8e10d8bac9a465 [file] [log] [blame]
package publishers
import (
"database/sql"
"fmt"
"log"
"os"
"strings"
"github.com/bazelbuild/continuous-integration/metrics/data"
"github.com/bazelbuild/continuous-integration/metrics/metrics"
_ "github.com/go-sql-driver/mysql"
)
const insertTemplate = "INSERT INTO %s (%s) VALUES (%s)"
type statement struct {
prepared *sql.Stmt
text string
}
type CloudSql struct {
conn *sql.DB
statements map[string]*statement
gc *CloudSqlGc
}
func (c *CloudSql) Name() string {
return "Cloud SQL"
}
func (c *CloudSql) RegisterMetric(metric metrics.Metric) error {
err := c.ensureTableExists(metric)
if err != nil {
return err
}
return c.prepareInsertStatement(metric)
}
func (c *CloudSql) ensureTableExists(metric metrics.Metric) error {
// TODO(fweikert): implement
return nil
}
func (c *CloudSql) prepareInsertStatement(metric metrics.Metric) error {
name := metric.Name()
if _, ok := c.statements[name]; ok {
return fmt.Errorf("Metrics %s has already been registered for publisher %s.", name, c.Name())
}
columnNames := metrics.GetColumnNames(metric.Columns())
placeholder := strings.TrimRight(strings.Repeat("?, ", len(columnNames)), ", ")
insert := fmt.Sprintf(insertTemplate, name, strings.Join(columnNames, ", "), placeholder)
nonKeyColumnNames := make([]string, 0)
for _, c := range metric.Columns() {
if !c.IsKey {
nonKeyColumnNames = append(nonKeyColumnNames, c.Name)
}
}
if len(nonKeyColumnNames) != len(columnNames) {
updates := make([]string, len(nonKeyColumnNames))
for i, c := range nonKeyColumnNames {
updates[i] = fmt.Sprintf("%s=VALUES(%s)", c, c)
}
insert = fmt.Sprintf("%s ON DUPLICATE KEY UPDATE %s", insert, strings.Join(updates, ", "))
}
stmt, err := c.createStatement(name, insert)
if err != nil {
return err
}
c.statements[name] = stmt
return nil
}
func (c *CloudSql) createStatement(metricName string, text string) (*statement, error) {
stmt, err := c.conn.Prepare(text)
if err != nil {
return nil, fmt.Errorf("Failed to prepare statement for metric %s: %v\n\tStatement: %s", metricName, err, text)
}
return &statement{prepared: stmt, text: text}, nil
}
func (c *CloudSql) Publish(metric metrics.Metric, newData data.DataSet) error {
metricName := metric.Name()
stmt := c.statements[metricName]
if stmt == nil {
return fmt.Errorf("Could not find prepared insert statement for metric %s. Have you called RegisterMetric() first?", metricName)
}
for _, row := range newData.GetData().Data {
_, err := stmt.prepared.Exec(row...)
if err != nil {
values := make([]string, len(row))
for i, v := range row {
values[i] = fmt.Sprintf("%v", v)
}
return fmt.Errorf("Could not insert new data for metric %s: %v\n\tStatement: %s\n\tValues: %s", metricName, err, stmt.text, strings.Join(values, ", "))
}
}
if gcm, ok := metric.(metrics.GarbageCollectedMetric); ok {
collected, err := c.gc.Run(gcm)
if err != nil {
return err
}
log.Printf("Cloud SQL: Garbage collection deleted %d entries for metric %s", collected, metricName)
}
return nil
}
func CreateCloudSqlPublisher(user, password, instance, database string, localPort int) (*CloudSql, error) {
conn, err := sql.Open("mysql", getConnectionString(user, password, instance, database, localPort))
if err != nil {
return nil, fmt.Errorf("Could not establish connection to database: %v", err)
}
if err := conn.Ping(); err != nil {
conn.Close()
return nil, fmt.Errorf("Connection to database is bad: %v", err)
}
gc, err := CreateCloudSqlGc(conn)
if err != nil {
conn.Close()
return nil, fmt.Errorf("Failed to create garbage collection module for Cloud SQL: %v", err)
}
return &CloudSql{
conn: conn,
statements: make(map[string]*statement),
gc: gc,
}, nil
}
func getConnectionString(user, password, instance, database string, localPort int) string {
cred := fmt.Sprintf("%s:%s@", user, password)
if os.Getenv("GAE_INSTANCE") != "" {
// Running in production.
return fmt.Sprintf("%sunix(/cloudsql/%s)/%s", cred, instance, database)
}
return fmt.Sprintf("%stcp([localhost]:%d)/%s", cred, localPort, database)
}