From 893f2b75eb8c0cedc1e665b69f9ccc1b7e82a874 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Sat, 18 Aug 2012 17:23:42 +0200 Subject: [PATCH] postgresql plugin: Added generic support for writing values to a database. This has been implemented by requiring the user to specify an SQL statement to be used for storing a value-list in PostgreSQL. Usually, this should be done by creating custom functions to take care of that. The user specified statement will then be called by collectd with eight arguments: time, host, plugin, plugin instance (or NULL), type, type instance (or NULL), array of value names (data source names) and an array of the values. Two elements have been added to the config parser: A block may be used to name an SQL statement (specified using the 'Statement' option) to be used for writing data. In a block, the new option 'Writer' may be used to apply a writer to a database connection. The current approach has two benefits: for one, a user may chose whatever database layout best suits her needs. Also, it is very easy to experiment with different approaches on how to structure the data in a database without the need to modify the plugin. This can be done in SQL, which (hopefully) is the language that people working with PostgreSQL databases like most ;-) If it happens to turn out that some approach is rather superior, it may still be re-implemented in a specific plugin in later versions. --- src/collectd.conf.pod | 81 +++++++++ src/postgresql.c | 378 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 446 insertions(+), 13 deletions(-) diff --git a/src/collectd.conf.pod b/src/collectd.conf.pod index 66ead9c5..7d287cc2 100644 --- a/src/collectd.conf.pod +++ b/src/collectd.conf.pod @@ -3450,6 +3450,13 @@ which are available in a PostgreSQL database or use future or special statistics provided by PostgreSQL without the need to upgrade your collectd installation. +Starting with version 5.2, the C plugin supports writing data to +PostgreSQL databases as well. This has been implemented in a generic way. You +need to specify an SQL statement which will then be executed by collectd in +order to write the data (see below for details). The benefit of that approach +is that there is no fixed database layout. Rather, the layout may be optimized +for the current setup. + The B manual can be found at L. @@ -3479,6 +3486,10 @@ L. + + Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8);" + + Host "hostname" Port "5432" @@ -3495,6 +3506,11 @@ L. Query backend # predefined Query rt36_tickets + + + # ... + Writer sqlstore + The B block defines one database query which may later be used by a @@ -3656,6 +3672,71 @@ This query collects the on-disk size of the database in bytes. =back +The B block defines a PostgreSQL writer backend. It accepts a single +mandatory argument specifying the name of the writer. This will then be used +in the B specification in order to activate the writer instance. The +names of all writers have to be unique. The following options may be +specified: + +=over 4 + +=item B I + +This mandatory option specifies the SQL statement that will be executed for +each submitted value. A single SQL statement is allowed only. Anything after +the first semicolon will be ignored. + +Eight parameters will be passed to the statement and should be specified as +tokens B<$1>, B<$2>, through B<$8> in the statement string. The following +values are made available through those parameters: + +=over 4 + +=item B<$1> + +The timestamp of the queried value as a floating point number. + +=item B<$2> + +The hostname of the queried value. + +=item B<$3> + +The plugin name of the queried value. + +=item B<$4> + +The plugin instance of the queried value. This value may be B if there +is no plugin instance. + +=item B<$5> + +The type of the queried value (cf. L). + +=item B<$6> + +The type instance of the queried value. This value may be B if there is +no type instance. + +=item B<$7> + +An array of names for the submitted values (i.Ee., the name of the data +sources of the submitted value-list). + +=item B<$8> + +An array of the submitted values. The dimensions of the value name and value +arrays match. + +=back + +In general, it is advisable to create and call a custom function in the +PostgreSQL database for this purpose. Any procedural language supported by +PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual +for details). + +=back + The B block defines one PostgreSQL database for which to collect statistics. It accepts a single mandatory argument which specifies the database name. None of the other options are required. PostgreSQL will use diff --git a/src/postgresql.c b/src/postgresql.c index 0a5e66c2..d6e04a85 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -45,6 +45,10 @@ #include "utils_db_query.h" #include "utils_complain.h" +#if HAVE_PTHREAD_H +# include +#endif + #include #include @@ -103,6 +107,11 @@ typedef struct { int params_num; } c_psql_user_data_t; +typedef struct { + char *name; + char *statement; +} c_psql_writer_t; + typedef struct { PGconn *conn; c_complain_t conn_complaint; @@ -117,6 +126,12 @@ typedef struct { udb_query_t **queries; size_t queries_num; + c_psql_writer_t **writers; + size_t writers_num; + + /* make sure we don't access the database object in parallel */ + pthread_mutex_t db_lock; + cdtime_t interval; char *host; @@ -146,6 +161,9 @@ static int def_queries_num = STATIC_ARRAY_SIZE (def_queries); static udb_query_t **queries = NULL; static size_t queries_num = 0; +static c_psql_writer_t *writers = NULL; +static size_t writers_num = 0; + static c_psql_database_t *c_psql_database_new (const char *name) { c_psql_database_t *db; @@ -169,6 +187,11 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->queries = NULL; db->queries_num = 0; + db->writers = NULL; + db->writers_num = 0; + + pthread_mutex_init (&db->db_lock, /* attrs = */ NULL); + db->interval = 0; db->database = sstrdup (name); @@ -202,6 +225,11 @@ static void c_psql_database_delete (void *data) sfree (db->queries); db->queries_num = 0; + sfree (db->writers); + db->writers_num = 0; + + pthread_mutex_destroy (&db->db_lock); + sfree (db->database); sfree (db->host); sfree (db->port); @@ -351,6 +379,7 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db, NULL, NULL, /* return text data */ 0); } /* c_psql_exec_query_params */ +/* db->db_lock must be locked when calling this function */ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, udb_query_preparation_area_t *prep_area) { @@ -384,23 +413,32 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q, return -1; } + /* give c_psql_write() a chance to acquire the lock if called recursively + * through dispatch_values(); this will happen if, both, queries and + * writers are configured for a single connection */ + pthread_mutex_unlock (&db->db_lock); + column_names = NULL; column_values = NULL; -#define BAIL_OUT(status) \ - sfree (column_names); \ - sfree (column_values); \ - PQclear (res); \ - return status - if (PGRES_TUPLES_OK != PQresultStatus (res)) { + pthread_mutex_lock (&db->db_lock); + log_err ("Failed to execute SQL query: %s", PQerrorMessage (db->conn)); log_info ("SQL query was: %s", udb_query_get_statement (q)); - BAIL_OUT (-1); + PQclear (res); + return -1; } +#define BAIL_OUT(status) \ + sfree (column_names); \ + sfree (column_values); \ + PQclear (res); \ + pthread_mutex_lock (&db->db_lock); \ + return status + rows_num = PQntuples (res); if (1 > rows_num) { BAIL_OUT (0); @@ -487,9 +525,14 @@ static int c_psql_read (user_data_t *ud) db = ud->data; assert (NULL != db->database); + assert (NULL != db->queries); - if (0 != c_psql_check_connection (db)) + pthread_mutex_lock (&db->db_lock); + + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); return -1; + } for (i = 0; i < db->queries_num; ++i) { @@ -507,11 +550,212 @@ static int c_psql_read (user_data_t *ud) success = 1; } + pthread_mutex_unlock (&db->db_lock); + if (! success) return -1; return 0; } /* c_psql_read */ +static char *values_name_to_sqlarray (const data_set_t *ds, + char *string, size_t string_len) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < ds->ds_num; ++i) { + int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name); + + if (status < 1) + return NULL; + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value names"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_name_to_sqlarray */ + +static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, + char *string, size_t string_len) +{ + char *str_ptr; + size_t str_len; + + int i; + + str_ptr = string; + str_len = string_len; + + for (i = 0; i < vl->values_len; ++i) { + int status; + + if (ds->ds[i].type == DS_TYPE_GAUGE) + status = ssnprintf (str_ptr, str_len, + ",%f", vl->values[i].gauge); + else if (ds->ds[i].type == DS_TYPE_COUNTER) + status = ssnprintf (str_ptr, str_len, + ",%llu", vl->values[i].counter); + else if (ds->ds[i].type == DS_TYPE_DERIVE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIi64, vl->values[i].derive); + else if (ds->ds[i].type == DS_TYPE_ABSOLUTE) + status = ssnprintf (str_ptr, str_len, + ",%"PRIu64, vl->values[i].absolute); + else { + log_err ("c_psql_write: Unknown data source type: %i", + ds->ds[i].type); + return NULL; + } + + if (status < 1) + return NULL; + else if ((size_t)status >= str_len) { + str_len = 0; + break; + } + else { + str_ptr += status; + str_len -= (size_t)status; + } + } + + if (str_len <= 2) { + log_err ("c_psql_write: Failed to stringify value list"); + return NULL; + } + + /* overwrite the first comma */ + string[0] = '{'; + str_ptr[0] = '}'; + str_ptr[1] = '\0'; + + return string; +} /* values_to_sqlarray */ + +static int c_psql_write (const data_set_t *ds, const value_list_t *vl, + user_data_t *ud) +{ + c_psql_database_t *db; + + char time_str[1024]; + char values_name_str[1024]; + char values_str[1024]; + + const char *params[8]; + + int success = 0; + int i; + + if ((ud == NULL) || (ud->data == NULL)) { + log_err ("c_psql_write: Invalid user data."); + return -1; + } + + db = ud->data; + assert (db->database != NULL); + assert (db->writers != NULL); + + ssnprintf (time_str, sizeof (time_str), + "%f", CDTIME_T_TO_DOUBLE (vl->time)); + + if (values_name_to_sqlarray (ds, + values_name_str, sizeof (values_name_str)) == NULL) + return -1; + + if (values_to_sqlarray (ds, vl, values_str, sizeof (values_str)) == NULL) + return -1; + +#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v)) + + params[0] = time_str; + params[1] = vl->host; + params[2] = vl->plugin; + params[3] = VALUE_OR_NULL(vl->plugin_instance); + params[4] = vl->type; + params[5] = VALUE_OR_NULL(vl->type_instance); + params[6] = values_name_str; + params[7] = values_str; + +#undef VALUE_OR_NULL + + pthread_mutex_lock (&db->db_lock); + + if (0 != c_psql_check_connection (db)) { + pthread_mutex_unlock (&db->db_lock); + return -1; + } + + for (i = 0; i < db->writers_num; ++i) { + c_psql_writer_t *writer; + PGresult *res; + + writer = db->writers[i]; + + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK != PQresultStatus (res)) + && (PGRES_TUPLES_OK != PQresultStatus (res))) { + if ((CONNECTION_OK != PQstatus (db->conn)) + && (0 == c_psql_check_connection (db))) { + PQclear (res); + + /* try again */ + res = PQexecParams (db->conn, writer->statement, + STATIC_ARRAY_SIZE (params), NULL, + (const char *const *)params, + NULL, NULL, /* return text data */ 0); + + if ((PGRES_COMMAND_OK == PQresultStatus (res)) + || (PGRES_TUPLES_OK == PQresultStatus (res))) { + success = 1; + continue; + } + } + + log_err ("Failed to execute SQL query: %s", + PQerrorMessage (db->conn)); + log_info ("SQL query was: '%s', " + "params: %s, %s, %s, %s, %s, %s, %s, %s", + writer->statement, + params[0], params[1], params[2], params[3], + params[4], params[5], params[6], params[7]); + pthread_mutex_unlock (&db->db_lock); + return -1; + } + success = 1; + } + + pthread_mutex_unlock (&db->db_lock); + + if (! success) + return -1; + return 0; +} /* c_psql_write */ + static int c_psql_shutdown (void) { plugin_unregister_read_group ("postgresql"); @@ -520,6 +764,10 @@ static int c_psql_shutdown (void) queries = NULL; queries_num = 0; + sfree (writers); + writers = NULL; + writers_num = 0; + return 0; } /* c_psql_shutdown */ @@ -579,6 +827,100 @@ static int config_query_callback (udb_query_t *q, oconfig_item_t *ci) return (-1); } /* config_query_callback */ +static int config_add_writer (oconfig_item_t *ci, + c_psql_writer_t *src_writers, size_t src_writers_num, + c_psql_writer_t ***dst_writers, size_t *dst_writers_num) +{ + char *name; + + size_t i; + + if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL)) + return -1; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err ("`Writer' expects a single string argument."); + return 1; + } + + name = ci->values[0].value.string; + + for (i = 0; i < src_writers_num; ++i) { + c_psql_writer_t **tmp; + + if (strcasecmp (name, src_writers[i].name) != 0) + continue; + + tmp = (c_psql_writer_t **)realloc (*dst_writers, + sizeof (**dst_writers) * (*dst_writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + tmp[*dst_writers_num] = src_writers + i; + + *dst_writers = tmp; + ++(*dst_writers_num); + break; + } + + if (i >= src_writers_num) { + log_err ("No such writer: `%s'", name); + return -1; + } + + return 0; +} /* config_add_writer */ + +static int c_psql_config_writer (oconfig_item_t *ci) +{ + c_psql_writer_t *writer; + c_psql_writer_t *tmp; + + int status = 0; + int i; + + if ((ci->values_num != 1) + || (ci->values[0].type != OCONFIG_TYPE_STRING)) { + log_err (" expects a single string argument."); + return 1; + } + + tmp = (c_psql_writer_t *)realloc (writers, + sizeof (*writers) * (writers_num + 1)); + if (tmp == NULL) { + log_err ("Out of memory."); + return -1; + } + + writers = tmp; + writer = writers + writers_num; + ++writers_num; + + writer->name = sstrdup (ci->values[0].value.string); + writer->statement = NULL; + + for (i = 0; i < ci->children_num; ++i) { + oconfig_item_t *c = ci->children + i; + + if (strcasecmp ("Statement", c->key) == 0) + status = cf_util_get_string (c, &writer->statement); + else + log_warn ("Ignoring unknown config key \"%s\".", c->key); + } + + if (status != 0) { + sfree (writer->statement); + sfree (writer->name); + sfree (writer); + return status; + } + + return 0; +} /* c_psql_config_writer */ + static int c_psql_config_database (oconfig_item_t *ci) { c_psql_database_t *db; @@ -621,6 +963,9 @@ static int c_psql_config_database (oconfig_item_t *ci) else if (0 == strcasecmp (c->key, "Query")) udb_query_pick_from_list (c, queries, queries_num, &db->queries, &db->queries_num); + else if (0 == strcasecmp (c->key, "Writer")) + config_add_writer (c, writers, writers_num, + &db->writers, &db->writers_num); else if (0 == strcasecmp (c->key, "Interval")) cf_util_get_cdtime (c, &db->interval); else @@ -628,7 +973,7 @@ static int c_psql_config_database (oconfig_item_t *ci) } /* If no `Query' options were given, add the default queries.. */ - if (db->queries_num == 0) { + if ((db->queries_num == 0) && (db->writers_num == 0)){ for (i = 0; i < def_queries_num; i++) udb_query_pick_from_list_by_name (def_queries[i], queries, queries_num, @@ -667,11 +1012,16 @@ static int c_psql_config_database (oconfig_item_t *ci) ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database); - CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); + if (db->queries_num > 0) { + CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval); - plugin_register_complex_read ("postgresql", cb_name, c_psql_read, - /* interval = */ (db->interval > 0) ? &cb_interval : NULL, - &ud); + plugin_register_complex_read ("postgresql", cb_name, c_psql_read, + /* interval = */ (db->interval > 0) ? &cb_interval : NULL, + &ud); + } + if (db->writers_num > 0) { + plugin_register_write (cb_name, c_psql_write, &ud); + } return 0; } /* c_psql_config_database */ @@ -703,6 +1053,8 @@ static int c_psql_config (oconfig_item_t *ci) if (0 == strcasecmp (c->key, "Query")) udb_query_create (&queries, &queries_num, c, /* callback = */ config_query_callback); + else if (0 == strcasecmp (c->key, "Writer")) + c_psql_config_writer (c); else if (0 == strcasecmp (c->key, "Database")) c_psql_config_database (c); else -- 2.30.2