Code

collectd, plugin: Added support for "flush" callbacks.
authorSebastian Harl <sh@tokkee.org>
Tue, 26 Feb 2008 17:12:07 +0000 (18:12 +0100)
committerFlorian Forster <octo@huhu.verplant.org>
Wed, 27 Feb 2008 07:21:13 +0000 (08:21 +0100)
A flush callback may be provided to make it possible to flush internal caches
(e.g. the rrdtool plugin's data cache) from outside the plugin. On SIGUSR1,
flush callback is invoked for all plugins. As flushing large amounts of data
might take some time a new thread is started to handle the request
asynchronously.

Thanks to Stefan Völkel for proposing this.

Signed-off-by: Sebastian Harl <sh@tokkee.org>
Signed-off-by: Florian Forster <octo@huhu.verplant.org>
src/collectd.c
src/plugin.c
src/plugin.h

index 984ff757f3911fdc156717261bbb9c195907930a..4c9aafc212d61d843d751ee0628bf9c5eab9aad7 100644 (file)
@@ -27,6 +27,8 @@
 #include <sys/socket.h>
 #include <netdb.h>
 
+#include <pthread.h>
+
 #include "plugin.h"
 #include "configfile.h"
 
@@ -41,6 +43,15 @@ kstat_ctl_t *kc;
 
 static int loop = 0;
 
+static void *do_flush (void *arg)
+{
+       INFO ("Flushing all data.");
+       plugin_flush_all (-1);
+       INFO ("Finished flushing all data.");
+       pthread_exit (NULL);
+       return NULL;
+}
+
 static void sigIntHandler (int signal)
 {
        loop++;
@@ -51,6 +62,18 @@ static void sigTermHandler (int signal)
        loop++;
 }
 
+static void sigUsr1Handler (int signal)
+{
+       pthread_t      thread;
+       pthread_attr_t attr;
+
+       /* flushing the data might take a while,
+        * so it should be done asynchronously */
+       pthread_attr_init (&attr);
+       pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+       pthread_create (&thread, &attr, do_flush, NULL);
+}
+
 static int init_hostname (void)
 {
        const char *str;
@@ -367,6 +390,7 @@ int main (int argc, char **argv)
 {
        struct sigaction sigIntAction;
        struct sigaction sigTermAction;
+       struct sigaction sigUsr1Action;
        char *configfile = CONFIGFILE;
        int test_config  = 0;
        const char *basedir;
@@ -519,6 +543,10 @@ int main (int argc, char **argv)
        sigTermAction.sa_handler = sigTermHandler;
        sigaction (SIGTERM, &sigTermAction, NULL);
 
+       memset (&sigUsr1Action, '\0', sizeof (sigUsr1Action));
+       sigUsr1Action.sa_handler = sigUsr1Handler;
+       sigaction (SIGUSR1, &sigUsr1Action, NULL);
+
        /*
         * run the actual loops
         */
index 1dd6daf33a9963ef18a897fc5e4221308c3ba80e..ca6193be7973e9dbd147bd4252afaad38a0ad8ca 100644 (file)
@@ -53,6 +53,7 @@ typedef struct read_func_s read_func_t;
 static llist_t *list_init;
 static llist_t *list_read;
 static llist_t *list_write;
+static llist_t *list_flush;
 static llist_t *list_shutdown;
 static llist_t *list_log;
 static llist_t *list_notification;
@@ -433,6 +434,11 @@ int plugin_register_write (const char *name,
        return (register_callback (&list_write, name, (void *) callback));
 } /* int plugin_register_write */
 
+int plugin_register_flush (const char *name, int (*callback) (const int))
+{
+       return (register_callback (&list_flush, name, (void *) callback));
+} /* int plugin_register_flush */
+
 int plugin_register_shutdown (char *name,
                int (*callback) (void))
 {
@@ -527,6 +533,11 @@ int plugin_unregister_write (const char *name)
        return (plugin_unregister (list_write, name));
 }
 
+int plugin_unregister_flush (const char *name)
+{
+       return (plugin_unregister (list_flush, name));
+}
+
 int plugin_unregister_shutdown (const char *name)
 {
        return (plugin_unregister (list_shutdown, name));
@@ -639,6 +650,24 @@ void plugin_read_all (void)
        pthread_mutex_unlock (&read_lock);
 } /* void plugin_read_all */
 
+void plugin_flush_all (int timeout)
+{
+       int (*callback) (int);
+       llentry_t *le;
+
+       if (list_flush == NULL)
+               return;
+
+       le = llist_head (list_flush);
+       while (le != NULL)
+       {
+               callback = (int (*) (int)) le->value;
+               le = le->next;
+
+               (*callback) (timeout);
+       }
+} /* void plugin_flush_all */
+
 void plugin_shutdown_all (void)
 {
        int (*callback) (void);
index 25c745cbbccede74eb1398da38653d129566b248..aea0e4dfce6bc97235cce626741495ed80b8faf2 100644 (file)
@@ -149,6 +149,7 @@ int plugin_load (const char *name);
 
 void plugin_init_all (void);
 void plugin_read_all (void);
+void plugin_flush_all (int timeout);
 void plugin_shutdown_all (void);
 
 /*
@@ -167,6 +168,8 @@ int plugin_register_read (const char *name,
                int (*callback) (void));
 int plugin_register_write (const char *name,
                int (*callback) (const data_set_t *ds, const value_list_t *vl));
+int plugin_register_flush (const char *name,
+               int (*callback) (const int));
 int plugin_register_shutdown (char *name,
                int (*callback) (void));
 int plugin_register_data_set (const data_set_t *ds);
@@ -180,6 +183,7 @@ int plugin_unregister_complex_config (const char *name);
 int plugin_unregister_init (const char *name);
 int plugin_unregister_read (const char *name);
 int plugin_unregister_write (const char *name);
+int plugin_unregister_flush (const char *name);
 int plugin_unregister_shutdown (const char *name);
 int plugin_unregister_data_set (const char *name);
 int plugin_unregister_log (const char *name);