X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fpostgresql.c;h=98ceb6d3dc78cec10bd7ed4966ee5d50a7204567;hb=1e74bb66c61d965c4aa261adfaf058f3356cff09;hp=7bc450eaae51bc20c413c2e36269c4760432d62e;hpb=0a1424bcab7d7e8d4d99d5e317fbe31fefd5de8f;p=collectd.git diff --git a/src/postgresql.c b/src/postgresql.c index 7bc450ea..98ceb6d3 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -1,7 +1,7 @@ /** * collectd - src/postgresql.c - * Copyright (C) 2008, 2009 Sebastian Harl - * Copyright (C) 2009 Florian Forster + * Copyright (C) 2008-2012 Sebastian Harl + * Copyright (C) 2009 Florian Forster * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -42,6 +42,7 @@ #include "configfile.h" #include "plugin.h" +#include "utils_cache.h" #include "utils_db_query.h" #include "utils_complain.h" @@ -55,6 +56,7 @@ #define log_err(...) ERROR ("postgresql: " __VA_ARGS__) #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__) #define log_info(...) INFO ("postgresql: " __VA_ARGS__) +#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__) #ifndef C_PSQL_DEFAULT_CONF # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf" @@ -99,6 +101,7 @@ typedef enum { C_PSQL_PARAM_DB, C_PSQL_PARAM_USER, C_PSQL_PARAM_INTERVAL, + C_PSQL_PARAM_INSTANCE, } c_psql_param_t; /* Parameter configuration. Stored as `user data' in the query objects. */ @@ -110,6 +113,7 @@ typedef struct { typedef struct { char *name; char *statement; + _Bool store_rates; } c_psql_writer_t; typedef struct { @@ -134,17 +138,25 @@ typedef struct { cdtime_t interval; + /* writer "caching" settings */ + cdtime_t commit_interval; + cdtime_t next_commit; + char *host; char *port; char *database; char *user; char *password; + char *instance; + char *sslmode; char *krbsrvname; char *service; + + int ref_cnt; } c_psql_database_t; static char *def_queries[] = { @@ -158,22 +170,77 @@ static char *def_queries[] = { }; static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); -static udb_query_t **queries = NULL; -static size_t queries_num = 0; +static c_psql_database_t **databases = NULL; +static size_t databases_num = 0; + +static udb_query_t **queries = NULL; +static size_t queries_num = 0; + +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; -static c_psql_writer_t *writers = NULL; -static size_t writers_num = 0; +static int c_psql_begin (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "BEGIN"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime() + db->commit_interval; + status = 0; + } + else + log_warn ("Failed to initiate ('BEGIN') transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_begin */ + +static int c_psql_commit (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "COMMIT"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = 0; + log_debug ("Successfully committed transaction."); + status = 0; + } + else + log_warn ("Failed to commit transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_commit */ static c_psql_database_t *c_psql_database_new (const char *name) { - c_psql_database_t *db; + c_psql_database_t **tmp; + c_psql_database_t *db; - db = (c_psql_database_t *)malloc (sizeof (*db)); + db = (c_psql_database_t *)malloc (sizeof(*db)); if (NULL == db) { log_err ("Out of memory."); return NULL; } + tmp = (c_psql_database_t **)realloc (databases, + (databases_num + 1) * sizeof (*databases)); + if (NULL == tmp) { + log_err ("Out of memory."); + sfree (db); + return NULL; + } + + databases = tmp; + databases[databases_num] = db; + ++databases_num; + db->conn = NULL; C_COMPLAIN_INIT (&db->conn_complaint); @@ -194,17 +261,24 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->interval = 0; + db->commit_interval = 0; + db->next_commit = 0; + db->database = sstrdup (name); db->host = NULL; db->port = NULL; db->user = NULL; db->password = NULL; + db->instance = sstrdup (name); + db->sslmode = NULL; db->krbsrvname = NULL; db->service = NULL; + + db->ref_cnt = 0; return db; } /* c_psql_database_new */ @@ -214,6 +288,17 @@ static void c_psql_database_delete (void *data) c_psql_database_t *db = data; + --db->ref_cnt; + /* readers and writers may access this database */ + if (db->ref_cnt > 0) + return; + + /* wait for the lock to be released by the last writer */ + pthread_mutex_lock (&db->db_lock); + + if (db->next_commit > 0) + c_psql_commit (db); + PQfinish (db->conn); db->conn = NULL; @@ -228,6 +313,8 @@ static void c_psql_database_delete (void *data) sfree (db->writers); db->writers_num = 0; + pthread_mutex_unlock (&db->db_lock); + pthread_mutex_destroy (&db->db_lock); sfree (db->database); @@ -236,11 +323,18 @@ static void c_psql_database_delete (void *data) sfree (db->user); sfree (db->password); + sfree (db->instance); + sfree (db->sslmode); sfree (db->krbsrvname); sfree (db->service); + + /* don't care about freeing or reordering the 'databases' array + * this is done in 'shutdown'; also, don't free the database instance + * object just to make sure that in case anybody accesses it before + * shutdown won't segfault */ return; } /* c_psql_database_delete */ @@ -251,7 +345,7 @@ static int c_psql_connect (c_psql_database_t *db) int buf_len = sizeof (conninfo); int status; - if (! db) + if ((! db) || (! db->database)) return -1; status = ssnprintf (buf, buf_len, "dbname = '%s'", db->database); @@ -299,8 +393,9 @@ static int c_psql_check_connection (c_psql_database_t *db) if (CONNECTION_OK != PQstatus (db->conn)) { c_complain (LOG_ERR, &db->conn_complaint, - "Failed to connect to database %s: %s", - db->database, PQerrorMessage (db->conn)); + "Failed to connect to database %s (%s): %s", + db->database, db->instance, + PQerrorMessage (db->conn)); return -1; } @@ -365,9 +460,13 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, case C_PSQL_PARAM_INTERVAL: ssnprintf (interval, sizeof (interval), "%.3f", (db->interval > 0) - ? CDTIME_T_TO_DOUBLE (db->interval) : interval_g); + ? CDTIME_T_TO_DOUBLE (db->interval) + : plugin_get_interval ()); params[i] = interval; break; + case C_PSQL_PARAM_INSTANCE: + params[i] = db->instance; + break; default: assert (0); } @@ -406,9 +505,10 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, else if ((NULL == data) || (0 == data->params_num)) res = c_psql_exec_query_noparams (db, q); else { - log_err ("Connection to database \"%s\" does not support parameters " - "(protocol version %d) - cannot execute query \"%s\".", - db->database, db->proto_version, + log_err ("Connection to database \"%s\" (%s) does not support " + "parameters (protocol version %d) - " + "cannot execute query \"%s\".", + db->database, db->instance, db->proto_version, udb_query_get_name (q)); return -1; } @@ -474,7 +574,7 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, host = db->host; status = udb_query_prepare_result (q, prep_area, host, "postgresql", - db->database, column_names, (size_t) column_num, db->interval); + db->instance, column_names, (size_t) column_num, db->interval); if (0 != status) { log_err ("udb_query_prepare_result failed with status %i.", status); @@ -525,6 +625,7 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->instance); assert (NULL != db->queries); pthread_mutex_lock (&db->db_lock); @@ -596,23 +697,94 @@ static char *values_name_to_sqlarray (const data_set_t *ds, return string; } /* values_name_to_sqlarray */ +static char *values_type_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len, _Bool store_rates) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status; + + if (store_rates) + status = ssnprintf(str_ptr, str_len, ",'gauge'"); + else + status = ssnprintf(str_ptr, str_len, ",'%s'", + DS_TYPE_TO_STRING (ds->ds[i].type)); + + if (status < 1) { + str_len = 0; + break; + } + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value types"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_type_to_sqlarray */ + static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, - char *string, size_t string_len) + char *string, size_t string_len, _Bool store_rates) { char *str_ptr; size_t str_len; + gauge_t *rates = NULL; + int i; str_ptr = string; str_len = string_len; for (i = 0; i < vl->values_len; ++i) { - int status; + int status = 0; + + if ((ds->ds[i].type != DS_TYPE_GAUGE) + && (ds->ds[i].type != DS_TYPE_COUNTER) + && (ds->ds[i].type != DS_TYPE_DERIVE) + && (ds->ds[i].type != DS_TYPE_ABSOLUTE)) { + log_err ("c_psql_write: Unknown data source type: %i", + ds->ds[i].type); + sfree (rates); + return NULL; + } if (ds->ds[i].type == DS_TYPE_GAUGE) status = ssnprintf (str_ptr, str_len, ",%f", vl->values[i].gauge); + else if (store_rates) { + if (rates == NULL) + rates = uc_get_rate (ds, vl); + + if (rates == NULL) { + log_err ("c_psql_write: Failed to determine rate"); + return NULL; + } + + status = ssnprintf (str_ptr, str_len, + ",%lf", rates[i]); + } else if (ds->ds[i].type == DS_TYPE_COUNTER) status = ssnprintf (str_ptr, str_len, ",%llu", vl->values[i].counter); @@ -622,14 +794,11 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) status = ssnprintf (str_ptr, str_len, ",%"PRIu64, vl->values[i].absolute); - else { - log_err ("c_psql_write: Unknown data source type: %i", - ds->ds[i].type); - return NULL; - } - if (status < 1) - return NULL; + if (status < 1) { + str_len = 0; + break; + } else if ((size_t)status >= str_len) { str_len = 0; break; @@ -640,6 +809,8 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, } } + sfree (rates); + if (str_len <= 2) { log_err ("c_psql_write: Failed to stringify value list"); return NULL; @@ -660,9 +831,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, char time_str[32]; char values_name_str[1024]; + char values_type_str[1024]; char values_str[1024]; - const char *params[8]; + const char *params[9]; int success = 0; int i; @@ -685,9 +857,6 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, values_name_str, sizeof (values_name_str)) == NULL) return -1; - if (values_to_sqlarray (ds, vl, values_str, sizeof (values_str)) == NULL) - return -1; - #define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v)) params[0] = time_str; @@ -697,7 +866,6 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, params[4] = vl->type; params[5] = VALUE_OR_NULL(vl->type_instance); params[6] = values_name_str; - params[7] = values_str; #undef VALUE_OR_NULL @@ -708,12 +876,33 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, return -1; } + if ((db->commit_interval > 0) + && (db->next_commit == 0)) + c_psql_begin (db); + for (i = 0; i < db->writers_num; ++i) { c_psql_writer_t *writer; PGresult *res; writer = db->writers[i]; + if (values_type_to_sqlarray (ds, + values_type_str, sizeof (values_type_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + if (values_to_sqlarray (ds, vl, + values_str, sizeof (values_str), + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + params[7] = values_type_str; + params[8] = values_str; + res = PQexecParams (db->conn, writer->statement, STATIC_ARRAY_SIZE (params), NULL, (const char *const *)params, @@ -721,10 +910,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK != PQresultStatus (res)) && (PGRES_TUPLES_OK != PQresultStatus (res))) { + PQclear (res); + if ((CONNECTION_OK != PQstatus (db->conn)) && (0 == c_psql_check_connection (db))) { - PQclear (res); - /* try again */ res = PQexecParams (db->conn, writer->statement, STATIC_ARRAY_SIZE (params), NULL, @@ -733,6 +922,7 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if ((PGRES_COMMAND_OK == PQresultStatus (res)) || (PGRES_TUPLES_OK == PQresultStatus (res))) { + PQclear (res); success = 1; continue; } @@ -745,12 +935,23 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, writer->statement, params[0], params[1], params[2], params[3], params[4], params[5], params[6], params[7]); + + /* this will abort any current transaction -> restart */ + if (db->next_commit > 0) + c_psql_commit (db); + pthread_mutex_unlock (&db->db_lock); return -1; } + + PQclear (res); success = 1; } + if ((db->next_commit > 0) + && (cdtime () > db->next_commit)) + c_psql_commit (db); + pthread_mutex_unlock (&db->db_lock); if (! success) @@ -758,10 +959,62 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, return 0; } /* c_psql_write */ +/* We cannot flush single identifiers as all we do is to commit the currently + * running transaction, thus making sure that all written data is actually + * visible to everybody. */ +static int c_psql_flush (cdtime_t timeout, + __attribute__((unused)) const char *ident, + user_data_t *ud) +{ + c_psql_database_t **dbs = databases; + size_t dbs_num = databases_num; + size_t i; + + if ((ud != NULL) && (ud->data != NULL)) { + dbs = (void *)&ud->data; + dbs_num = 1; + } + + for (i = 0; i < dbs_num; ++i) { + c_psql_database_t *db = dbs[i]; + + /* don't commit if the timeout is larger than the regular commit + * interval as in that case all requested data has already been + * committed */ + if ((db->next_commit > 0) && (db->commit_interval > timeout)) + c_psql_commit (db); + } + return 0; +} /* c_psql_flush */ + static int c_psql_shutdown (void) { + size_t i = 0; + + _Bool had_flush = 0; + plugin_unregister_read_group ("postgresql"); + for (i = 0; i < databases_num; ++i) { + c_psql_database_t *db = databases[i]; + + if (db->writers_num > 0) { + char cb_name[DATA_MAX_NAME_LEN]; + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", + db->database); + + if (! had_flush) { + plugin_unregister_flush ("postgresql"); + had_flush = 1; + } + + plugin_unregister_flush (cb_name); + plugin_unregister_write (cb_name); + } + + sfree (db); + } + udb_query_free (queries, queries_num); queries = NULL; queries_num = 0; @@ -770,6 +1023,10 @@ static int c_psql_shutdown (void) writers = NULL; writers_num = 0; + sfree (databases); + databases = NULL; + databases_num = 0; + return 0; } /* c_psql_shutdown */ @@ -808,6 +1065,8 @@ static int config_query_param_add (udb_query_t *q, oconfig_item_t *ci) data->params[data->params_num] = C_PSQL_PARAM_USER; else if (0 == strcasecmp (param_str, "interval")) data->params[data->params_num] = C_PSQL_PARAM_INTERVAL; + else if (0 == strcasecmp (param_str, "instance")) + data->params[data->params_num] = C_PSQL_PARAM_INSTANCE; else { log_err ("Invalid parameter \"%s\".", param_str); return 1; @@ -903,12 +1162,15 @@ static int c_psql_config_writer (oconfig_item_t *ci) writer->name = sstrdup (ci->values[0].value.string); writer->statement = NULL; + writer->store_rates = 1; for (i = 0; i < ci->children_num; ++i) { oconfig_item_t *c = ci->children + i; if (strcasecmp ("Statement", c->key) == 0) status = cf_util_get_string (c, &writer->statement); + else if (strcasecmp ("StoreRates", c->key) == 0) + status = cf_util_get_boolean (c, &writer->store_rates); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } @@ -931,6 +1193,8 @@ static int c_psql_config_database (oconfig_item_t *ci) struct timespec cb_interval = { 0, 0 }; user_data_t ud; + static _Bool have_flush = 0; + int i; if ((1 != ci->values_num) @@ -956,6 +1220,8 @@ static int c_psql_config_database (oconfig_item_t *ci) cf_util_get_string (c, &db->user); else if (0 == strcasecmp (c->key, "Password")) cf_util_get_string (c, &db->password); + else if (0 == strcasecmp (c->key, "Instance")) + cf_util_get_string (c, &db->instance); else if (0 == strcasecmp (c->key, "SSLMode")) cf_util_get_string (c, &db->sslmode); else if (0 == strcasecmp (c->key, "KRBSrvName")) @@ -970,6 +1236,8 @@ static int c_psql_config_database (oconfig_item_t *ci) &db->writers, &db->writers_num); else if (0 == strcasecmp (c->key, "Interval")) cf_util_get_cdtime (c, &db->interval); + else if (strcasecmp ("CommitInterval", c->key) == 0) + cf_util_get_cdtime (c, &db->commit_interval); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } @@ -1012,17 +1280,35 @@ static int c_psql_config_database (oconfig_item_t *ci) ud.data = db; ud.free_func = c_psql_database_delete; - ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); + ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->instance); if (db->queries_num > 0) { CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); + ++db->ref_cnt; plugin_register_complex_read ("postgresql", cb_name, c_psql_read, /* interval = */ (db->interval > 0) ? &cb_interval : NULL, &ud); } if (db->writers_num > 0) { + ++db->ref_cnt; plugin_register_write (cb_name, c_psql_write, &ud); + + if (! have_flush) { + /* flush all */ + plugin_register_flush ("postgresql", + c_psql_flush, /* user data = */ NULL); + have_flush = 1; + } + + /* flush this connection only */ + ++db->ref_cnt; + plugin_register_flush (cb_name, c_psql_flush, &ud); + } + else if (db->commit_interval > 0) { + log_warn ("Database '%s': You do not have any writers assigned to " + "this database connection. Setting 'CommitInterval' does " + "not have any effect.", db->database); } return 0; } /* c_psql_config_database */