Code

postgresql plugin: Added generic support for writing values to a database.
authorSebastian Harl <sh@tokkee.org>
Sat, 18 Aug 2012 15:23:42 +0000 (17:23 +0200)
committerSebastian Harl <sh@tokkee.org>
Sat, 18 Aug 2012 15:23:42 +0000 (17:23 +0200)
This has been implemented by requiring the user to specify an SQL statement to
be used for storing a value-list in PostgreSQL. Usually, this should be done
by creating custom functions to take care of that.

The user specified statement will then be called by collectd with eight
arguments: time, host, plugin, plugin instance (or NULL), type, type instance
(or NULL), array of value names (data source names) and an array of the
values.

Two elements have been added to the config parser: A <Writer> block may be
used to name an SQL statement (specified using the 'Statement' option) to be
used for writing data. In a <Database> block, the new option 'Writer' may be
used to apply a writer to a database connection.

The current approach has two benefits: for one, a user may chose whatever
database layout best suits her needs. Also, it is very easy to experiment with
different approaches on how to structure the data in a database without the
need to modify the plugin. This can be done in SQL, which (hopefully) is the
language that people working with PostgreSQL databases like most ;-)
If it happens to turn out that some approach is rather superior, it may still
be re-implemented in a specific plugin in later versions.

src/collectd.conf.pod
src/postgresql.c

index 66ead9c57e83ddc5fdf784b3c5d8468c1dda290c..7d287cc2fdf21f4a41dd4377f0431557376df003 100644 (file)
@@ -3450,6 +3450,13 @@ which are available in a PostgreSQL database or use future or special
 statistics provided by PostgreSQL without the need to upgrade your collectd
 installation.
 
+Starting with version 5.2, the C<postgresql> plugin supports writing data to
+PostgreSQL databases as well. This has been implemented in a generic way. You
+need to specify an SQL statement which will then be executed by collectd in
+order to write the data (see below for details). The benefit of that approach
+is that there is no fixed database layout. Rather, the layout may be optimized
+for the current setup.
+
 The B<PostgreSQL Documentation> manual can be found at
 L<http://www.postgresql.org/docs/manuals/>.
 
@@ -3479,6 +3486,10 @@ L<http://www.postgresql.org/docs/manuals/>.
       </Result>
     </Query>
 
+    <Writer sqlstore>
+      Statement "SELECT collectd_insert($1, $2, $3, $4, $5, $6, $7, $8);"
+    </Writer>
+
     <Database foo>
       Host "hostname"
       Port "5432"
@@ -3495,6 +3506,11 @@ L<http://www.postgresql.org/docs/manuals/>.
       Query backend # predefined
       Query rt36_tickets
     </Database>
+
+    <Database qux>
+      # ...
+      Writer sqlstore
+    </Database>
   </Plugin>
 
 The B<Query> block defines one database query which may later be used by a
@@ -3656,6 +3672,71 @@ This query collects the on-disk size of the database in bytes.
 
 =back
 
+The B<Writer> block defines a PostgreSQL writer backend. It accepts a single
+mandatory argument specifying the name of the writer. This will then be used
+in the B<Database> specification in order to activate the writer instance. The
+names of all writers have to be unique. The following options may be
+specified:
+
+=over 4
+
+=item B<Statement> I<sql statement>
+
+This mandatory option specifies the SQL statement that will be executed for
+each submitted value. A single SQL statement is allowed only. Anything after
+the first semicolon will be ignored.
+
+Eight parameters will be passed to the statement and should be specified as
+tokens B<$1>, B<$2>, through B<$8> in the statement string. The following
+values are made available through those parameters:
+
+=over 4
+
+=item B<$1>
+
+The timestamp of the queried value as a floating point number.
+
+=item B<$2>
+
+The hostname of the queried value.
+
+=item B<$3>
+
+The plugin name of the queried value.
+
+=item B<$4>
+
+The plugin instance of the queried value. This value may be B<NULL> if there
+is no plugin instance.
+
+=item B<$5>
+
+The type of the queried value (cf. L<types.db(5)>).
+
+=item B<$6>
+
+The type instance of the queried value. This value may be B<NULL> if there is
+no type instance.
+
+=item B<$7>
+
+An array of names for the submitted values (i.E<nbsp>e., the name of the data
+sources of the submitted value-list).
+
+=item B<$8>
+
+An array of the submitted values. The dimensions of the value name and value
+arrays match.
+
+=back
+
+In general, it is advisable to create and call a custom function in the
+PostgreSQL database for this purpose. Any procedural language supported by
+PostgreSQL will do (see chapter "Server Programming" in the PostgreSQL manual
+for details).
+
+=back
+
 The B<Database> block defines one PostgreSQL database for which to collect
 statistics. It accepts a single mandatory argument which specifies the
 database name. None of the other options are required. PostgreSQL will use
index 0a5e66c25d934acd4de939cd049e6070a3121ca9..d6e04a85ad2d75baac05b2d3807211e870dfe930 100644 (file)
 #include "utils_db_query.h"
 #include "utils_complain.h"
 
+#if HAVE_PTHREAD_H
+# include <pthread.h>
+#endif
+
 #include <pg_config_manual.h>
 #include <libpq-fe.h>
 
@@ -103,6 +107,11 @@ typedef struct {
        int             params_num;
 } c_psql_user_data_t;
 
+typedef struct {
+       char *name;
+       char *statement;
+} c_psql_writer_t;
+
 typedef struct {
        PGconn      *conn;
        c_complain_t conn_complaint;
@@ -117,6 +126,12 @@ typedef struct {
        udb_query_t    **queries;
        size_t           queries_num;
 
+       c_psql_writer_t **writers;
+       size_t            writers_num;
+
+       /* make sure we don't access the database object in parallel */
+       pthread_mutex_t   db_lock;
+
        cdtime_t interval;
 
        char *host;
@@ -146,6 +161,9 @@ static int def_queries_num = STATIC_ARRAY_SIZE (def_queries);
 static udb_query_t      **queries       = NULL;
 static size_t             queries_num   = 0;
 
+static c_psql_writer_t   *writers       = NULL;
+static size_t             writers_num   = 0;
+
 static c_psql_database_t *c_psql_database_new (const char *name)
 {
        c_psql_database_t *db;
@@ -169,6 +187,11 @@ static c_psql_database_t *c_psql_database_new (const char *name)
        db->queries        = NULL;
        db->queries_num    = 0;
 
+       db->writers        = NULL;
+       db->writers_num    = 0;
+
+       pthread_mutex_init (&db->db_lock, /* attrs = */ NULL);
+
        db->interval   = 0;
 
        db->database   = sstrdup (name);
@@ -202,6 +225,11 @@ static void c_psql_database_delete (void *data)
        sfree (db->queries);
        db->queries_num = 0;
 
+       sfree (db->writers);
+       db->writers_num = 0;
+
+       pthread_mutex_destroy (&db->db_lock);
+
        sfree (db->database);
        sfree (db->host);
        sfree (db->port);
@@ -351,6 +379,7 @@ static PGresult *c_psql_exec_query_params (c_psql_database_t *db,
                        NULL, NULL, /* return text data */ 0);
 } /* c_psql_exec_query_params */
 
+/* db->db_lock must be locked when calling this function */
 static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                udb_query_preparation_area_t *prep_area)
 {
@@ -384,23 +413,32 @@ static int c_psql_exec_query (c_psql_database_t *db, udb_query_t *q,
                return -1;
        }
 
+       /* give c_psql_write() a chance to acquire the lock if called recursively
+        * through dispatch_values(); this will happen if, both, queries and
+        * writers are configured for a single connection */
+       pthread_mutex_unlock (&db->db_lock);
+
        column_names = NULL;
        column_values = NULL;
 
-#define BAIL_OUT(status) \
-       sfree (column_names); \
-       sfree (column_values); \
-       PQclear (res); \
-       return status
-
        if (PGRES_TUPLES_OK != PQresultStatus (res)) {
+               pthread_mutex_lock (&db->db_lock);
+
                log_err ("Failed to execute SQL query: %s",
                                PQerrorMessage (db->conn));
                log_info ("SQL query was: %s",
                                udb_query_get_statement (q));
-               BAIL_OUT (-1);
+               PQclear (res);
+               return -1;
        }
 
+#define BAIL_OUT(status) \
+       sfree (column_names); \
+       sfree (column_values); \
+       PQclear (res); \
+       pthread_mutex_lock (&db->db_lock); \
+       return status
+
        rows_num = PQntuples (res);
        if (1 > rows_num) {
                BAIL_OUT (0);
@@ -487,9 +525,14 @@ static int c_psql_read (user_data_t *ud)
        db = ud->data;
 
        assert (NULL != db->database);
+       assert (NULL != db->queries);
 
-       if (0 != c_psql_check_connection (db))
+       pthread_mutex_lock (&db->db_lock);
+
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
                return -1;
+       }
 
        for (i = 0; i < db->queries_num; ++i)
        {
@@ -507,11 +550,212 @@ static int c_psql_read (user_data_t *ud)
                        success = 1;
        }
 
+       pthread_mutex_unlock (&db->db_lock);
+
        if (! success)
                return -1;
        return 0;
 } /* c_psql_read */
 
+static char *values_name_to_sqlarray (const data_set_t *ds,
+               char *string, size_t string_len)
+{
+       char  *str_ptr;
+       size_t str_len;
+
+       int i;
+
+       str_ptr = string;
+       str_len = string_len;
+
+       for (i = 0; i < ds->ds_num; ++i) {
+               int status = ssnprintf (str_ptr, str_len, ",'%s'", ds->ds[i].name);
+
+               if (status < 1)
+                       return NULL;
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value names");
+               return NULL;
+       }
+
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+
+       return string;
+} /* values_name_to_sqlarray */
+
+static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl,
+               char *string, size_t string_len)
+{
+       char  *str_ptr;
+       size_t str_len;
+
+       int i;
+
+       str_ptr = string;
+       str_len = string_len;
+
+       for (i = 0; i < vl->values_len; ++i) {
+               int status;
+
+               if (ds->ds[i].type == DS_TYPE_GAUGE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%f", vl->values[i].gauge);
+               else if (ds->ds[i].type == DS_TYPE_COUNTER)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%llu", vl->values[i].counter);
+               else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIi64, vl->values[i].derive);
+               else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+                       status = ssnprintf (str_ptr, str_len,
+                                       ",%"PRIu64, vl->values[i].absolute);
+               else {
+                       log_err ("c_psql_write: Unknown data source type: %i",
+                                       ds->ds[i].type);
+                       return NULL;
+               }
+
+               if (status < 1)
+                       return NULL;
+               else if ((size_t)status >= str_len) {
+                       str_len = 0;
+                       break;
+               }
+               else {
+                       str_ptr += status;
+                       str_len -= (size_t)status;
+               }
+       }
+
+       if (str_len <= 2) {
+               log_err ("c_psql_write: Failed to stringify value list");
+               return NULL;
+       }
+
+       /* overwrite the first comma */
+       string[0] = '{';
+       str_ptr[0] = '}';
+       str_ptr[1] = '\0';
+
+       return string;
+} /* values_to_sqlarray */
+
+static int c_psql_write (const data_set_t *ds, const value_list_t *vl,
+               user_data_t *ud)
+{
+       c_psql_database_t *db;
+
+       char time_str[1024];
+       char values_name_str[1024];
+       char values_str[1024];
+
+       const char *params[8];
+
+       int success = 0;
+       int i;
+
+       if ((ud == NULL) || (ud->data == NULL)) {
+               log_err ("c_psql_write: Invalid user data.");
+               return -1;
+       }
+
+       db = ud->data;
+       assert (db->database != NULL);
+       assert (db->writers != NULL);
+
+       ssnprintf (time_str, sizeof (time_str),
+                       "%f", CDTIME_T_TO_DOUBLE (vl->time));
+
+       if (values_name_to_sqlarray (ds,
+                               values_name_str, sizeof (values_name_str)) == NULL)
+               return -1;
+
+       if (values_to_sqlarray (ds, vl, values_str, sizeof (values_str)) == NULL)
+               return -1;
+
+#define VALUE_OR_NULL(v) ((((v) == NULL) || (*(v) == '\0')) ? NULL : (v))
+
+       params[0] = time_str;
+       params[1] = vl->host;
+       params[2] = vl->plugin;
+       params[3] = VALUE_OR_NULL(vl->plugin_instance);
+       params[4] = vl->type;
+       params[5] = VALUE_OR_NULL(vl->type_instance);
+       params[6] = values_name_str;
+       params[7] = values_str;
+
+#undef VALUE_OR_NULL
+
+       pthread_mutex_lock (&db->db_lock);
+
+       if (0 != c_psql_check_connection (db)) {
+               pthread_mutex_unlock (&db->db_lock);
+               return -1;
+       }
+
+       for (i = 0; i < db->writers_num; ++i) {
+               c_psql_writer_t *writer;
+               PGresult *res;
+
+               writer = db->writers[i];
+
+               res = PQexecParams (db->conn, writer->statement,
+                               STATIC_ARRAY_SIZE (params), NULL,
+                               (const char *const *)params,
+                               NULL, NULL, /* return text data */ 0);
+
+               if ((PGRES_COMMAND_OK != PQresultStatus (res))
+                               && (PGRES_TUPLES_OK != PQresultStatus (res))) {
+                       if ((CONNECTION_OK != PQstatus (db->conn))
+                                       && (0 == c_psql_check_connection (db))) {
+                               PQclear (res);
+
+                               /* try again */
+                               res = PQexecParams (db->conn, writer->statement,
+                                               STATIC_ARRAY_SIZE (params), NULL,
+                                               (const char *const *)params,
+                                               NULL, NULL, /* return text data */ 0);
+
+                               if ((PGRES_COMMAND_OK == PQresultStatus (res))
+                                               || (PGRES_TUPLES_OK == PQresultStatus (res))) {
+                                       success = 1;
+                                       continue;
+                               }
+                       }
+
+                       log_err ("Failed to execute SQL query: %s",
+                                       PQerrorMessage (db->conn));
+                       log_info ("SQL query was: '%s', "
+                                       "params: %s, %s, %s, %s, %s, %s, %s, %s",
+                                       writer->statement,
+                                       params[0], params[1], params[2], params[3],
+                                       params[4], params[5], params[6], params[7]);
+                       pthread_mutex_unlock (&db->db_lock);
+                       return -1;
+               }
+               success = 1;
+       }
+
+       pthread_mutex_unlock (&db->db_lock);
+
+       if (! success)
+               return -1;
+       return 0;
+} /* c_psql_write */
+
 static int c_psql_shutdown (void)
 {
        plugin_unregister_read_group ("postgresql");
@@ -520,6 +764,10 @@ static int c_psql_shutdown (void)
        queries = NULL;
        queries_num = 0;
 
+       sfree (writers);
+       writers = NULL;
+       writers_num = 0;
+
        return 0;
 } /* c_psql_shutdown */
 
@@ -579,6 +827,100 @@ static int config_query_callback (udb_query_t *q, oconfig_item_t *ci)
        return (-1);
 } /* config_query_callback */
 
+static int config_add_writer (oconfig_item_t *ci,
+               c_psql_writer_t *src_writers, size_t src_writers_num,
+               c_psql_writer_t ***dst_writers, size_t *dst_writers_num)
+{
+       char *name;
+
+       size_t i;
+
+       if ((ci == NULL) || (dst_writers == NULL) || (dst_writers_num == NULL))
+               return -1;
+
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("`Writer' expects a single string argument.");
+               return 1;
+       }
+
+       name = ci->values[0].value.string;
+
+       for (i = 0; i < src_writers_num; ++i) {
+               c_psql_writer_t **tmp;
+
+               if (strcasecmp (name, src_writers[i].name) != 0)
+                       continue;
+
+               tmp = (c_psql_writer_t **)realloc (*dst_writers,
+                               sizeof (**dst_writers) * (*dst_writers_num + 1));
+               if (tmp == NULL) {
+                       log_err ("Out of memory.");
+                       return -1;
+               }
+
+               tmp[*dst_writers_num] = src_writers + i;
+
+               *dst_writers = tmp;
+               ++(*dst_writers_num);
+               break;
+       }
+
+       if (i >= src_writers_num) {
+               log_err ("No such writer: `%s'", name);
+               return -1;
+       }
+
+       return 0;
+} /* config_add_writer */
+
+static int c_psql_config_writer (oconfig_item_t *ci)
+{
+       c_psql_writer_t *writer;
+       c_psql_writer_t *tmp;
+
+       int status = 0;
+       int i;
+
+       if ((ci->values_num != 1)
+                       || (ci->values[0].type != OCONFIG_TYPE_STRING)) {
+               log_err ("<Writer> expects a single string argument.");
+               return 1;
+       }
+
+       tmp = (c_psql_writer_t *)realloc (writers,
+                       sizeof (*writers) * (writers_num + 1));
+       if (tmp == NULL) {
+               log_err ("Out of memory.");
+               return -1;
+       }
+
+       writers = tmp;
+       writer  = writers + writers_num;
+       ++writers_num;
+
+       writer->name = sstrdup (ci->values[0].value.string);
+       writer->statement = NULL;
+
+       for (i = 0; i < ci->children_num; ++i) {
+               oconfig_item_t *c = ci->children + i;
+
+               if (strcasecmp ("Statement", c->key) == 0)
+                       status = cf_util_get_string (c, &writer->statement);
+               else
+                       log_warn ("Ignoring unknown config key \"%s\".", c->key);
+       }
+
+       if (status != 0) {
+               sfree (writer->statement);
+               sfree (writer->name);
+               sfree (writer);
+               return status;
+       }
+
+       return 0;
+} /* c_psql_config_writer */
+
 static int c_psql_config_database (oconfig_item_t *ci)
 {
        c_psql_database_t *db;
@@ -621,6 +963,9 @@ static int c_psql_config_database (oconfig_item_t *ci)
                else if (0 == strcasecmp (c->key, "Query"))
                        udb_query_pick_from_list (c, queries, queries_num,
                                        &db->queries, &db->queries_num);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       config_add_writer (c, writers, writers_num,
+                                       &db->writers, &db->writers_num);
                else if (0 == strcasecmp (c->key, "Interval"))
                        cf_util_get_cdtime (c, &db->interval);
                else
@@ -628,7 +973,7 @@ static int c_psql_config_database (oconfig_item_t *ci)
        }
 
        /* If no `Query' options were given, add the default queries.. */
-       if (db->queries_num == 0) {
+       if ((db->queries_num == 0) && (db->writers_num == 0)){
                for (i = 0; i < def_queries_num; i++)
                        udb_query_pick_from_list_by_name (def_queries[i],
                                        queries, queries_num,
@@ -667,11 +1012,16 @@ static int c_psql_config_database (oconfig_item_t *ci)
 
        ssnprintf (cb_name, sizeof (cb_name), "postgresql-%s", db->database);
 
-       CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
+       if (db->queries_num > 0) {
+               CDTIME_T_TO_TIMESPEC (db->interval, &cb_interval);
 
-       plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
-                       /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
-                       &ud);
+               plugin_register_complex_read ("postgresql", cb_name, c_psql_read,
+                               /* interval = */ (db->interval > 0) ? &cb_interval : NULL,
+                               &ud);
+       }
+       if (db->writers_num > 0) {
+               plugin_register_write (cb_name, c_psql_write, &ud);
+       }
        return 0;
 } /* c_psql_config_database */
 
@@ -703,6 +1053,8 @@ static int c_psql_config (oconfig_item_t *ci)
                if (0 == strcasecmp (c->key, "Query"))
                        udb_query_create (&queries, &queries_num, c,
                                        /* callback = */ config_query_callback);
+               else if (0 == strcasecmp (c->key, "Writer"))
+                       c_psql_config_writer (c);
                else if (0 == strcasecmp (c->key, "Database"))
                        c_psql_config_database (c);
                else