From 54a5ed57aa80b0214040a537d22b4d116119c546 Mon Sep 17 00:00:00 2001 From: Sebastian Harl Date: Thu, 25 Oct 2012 11:00:09 +0200 Subject: [PATCH] postgresql plugin: Added 'CommitInterval' config option. If specified, this option causes a writer to put several updates into a single transaction. This transaction will last for the specified amount of time (in seconds). By default, each update would be executed in a separate transaction causing quite some overhead. --- src/postgresql.c | 75 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/src/postgresql.c b/src/postgresql.c index 594fa32e..9b264d9e 100644 --- a/src/postgresql.c +++ b/src/postgresql.c @@ -56,6 +56,7 @@ #define log_err(...) ERROR ("postgresql: " __VA_ARGS__) #define log_warn(...) WARNING ("postgresql: " __VA_ARGS__) #define log_info(...) INFO ("postgresql: " __VA_ARGS__) +#define log_debug(...) DEBUG ("postgresql: " __VA_ARGS__) #ifndef C_PSQL_DEFAULT_CONF # define C_PSQL_DEFAULT_CONF PKGDATADIR "/postgresql_default.conf" @@ -136,6 +137,10 @@ typedef struct { cdtime_t interval; + /* writer "caching" settings */ + cdtime_t commit_interval; + cdtime_t next_commit; + char *host; char *port; char *database; @@ -196,6 +201,9 @@ static c_psql_database_t *c_psql_database_new (const char *name) db->interval = 0; + db->commit_interval = 0; + db->next_commit = 0; + db->database = sstrdup (name); db->host = NULL; db->port = NULL; @@ -230,6 +238,10 @@ static void c_psql_database_delete (void *data) sfree (db->writers); db->writers_num = 0; + /* wait for the lock to be released by the last writer */ + pthread_mutex_lock (&db->db_lock); + pthread_mutex_unlock (&db->db_lock); + pthread_mutex_destroy (&db->db_lock); sfree (db->database); @@ -725,6 +737,45 @@ static char *values_to_sqlarray (const data_set_t *ds, const value_list_t *vl, return string; } /* values_to_sqlarray */ +static int c_psql_begin (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "BEGIN"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime() + db->commit_interval; + status = 0; + } + else + log_warn ("Failed to initiate ('BEGIN') transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_begin */ + +static int c_psql_commit (c_psql_database_t *db) +{ + PGresult *r = PQexec (db->conn, "COMMIT"); + + int status = 1; + + if (r != NULL) { + if (PGRES_COMMAND_OK == PQresultStatus (r)) { + db->next_commit = cdtime () + db->commit_interval; + log_debug ("Successfully committed transaction."); + status = 0; + } + else + log_warn ("Failed to commit transaction: %s", + PQerrorMessage (db->conn)); + PQclear (r); + } + return status; +} /* c_psql_commit */ + static int c_psql_write (const data_set_t *ds, const value_list_t *vl, user_data_t *ud) { @@ -777,6 +828,10 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, return -1; } + if ((db->commit_interval > 0) + && (db->next_commit == 0)) + c_psql_begin (db); + for (i = 0; i < db->writers_num; ++i) { c_psql_writer_t *writer; PGresult *res; @@ -785,13 +840,17 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, if (values_type_to_sqlarray (ds, values_type_str, sizeof (values_type_str), - writer->store_rates) == NULL) + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); return -1; + } if (values_to_sqlarray (ds, vl, values_str, sizeof (values_str), - writer->store_rates) == NULL) + writer->store_rates) == NULL) { + pthread_mutex_unlock (&db->db_lock); return -1; + } params[7] = values_type_str; params[8] = values_str; @@ -833,6 +892,11 @@ static int c_psql_write (const data_set_t *ds, const value_list_t *vl, success = 1; } + if ((db->next_commit > 0) + && (cdtime () > db->next_commit)) + if (c_psql_commit (db) == 0) + c_psql_begin (db); + pthread_mutex_unlock (&db->db_lock); if (! success) @@ -1055,6 +1119,8 @@ static int c_psql_config_database (oconfig_item_t *ci) &db->writers, &db->writers_num); else if (0 == strcasecmp (c->key, "Interval")) cf_util_get_cdtime (c, &db->interval); + else if (strcasecmp ("CommitInterval", c->key) == 0) + cf_util_get_cdtime (c, &db->commit_interval); else log_warn ("Ignoring unknown config key \"%s\".", c->key); } @@ -1109,6 +1175,11 @@ static int c_psql_config_database (oconfig_item_t *ci) if (db->writers_num > 0) { plugin_register_write (cb_name, c_psql_write, &ud); } + else if (db->commit_interval > 0) { + log_warn ("Database '%s': You do not have any writers assigned to " + "this database connection. Setting 'CommitInterval' does " + "not have any effect.", db->database); + } return 0; } /* c_psql_config_database */ -- 2.30.2