X-Git-Url: https://git.tokkee.org/?p=collectd.git;a=blobdiff_plain;f=src%2Fplugin.c;h=d3767d1283b9678bec65e8665d5feb7b8c64bef7;hp=b0c82e397c4e7cd6aebf052d77b3b8489d6d57d0;hb=a320c4a23fee0e900a7cd2310c771b8dbd2a2d82;hpb=0e835c8e64bb4bc9119948a3324a8a370bd10356 diff --git a/src/plugin.c b/src/plugin.c index b0c82e39..d3767d12 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -1,6 +1,6 @@ /** * collectd - src/plugin.c - * Copyright (C) 2005-2011 Florian octo Forster + * Copyright (C) 2005-2013 Florian octo Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -21,22 +21,22 @@ **/ #include "collectd.h" -#include "utils_complain.h" - -#include - -#if HAVE_PTHREAD_H -# include -#endif - #include "common.h" #include "plugin.h" #include "configfile.h" +#include "filter_chain.h" #include "utils_avltree.h" +#include "utils_cache.h" +#include "utils_complain.h" #include "utils_llist.h" #include "utils_heap.h" -#include "utils_cache.h" -#include "filter_chain.h" +#include "utils_time.h" + +#if HAVE_PTHREAD_H +# include +#endif + +#include /* * Private structures @@ -63,12 +63,21 @@ struct read_func_s char rf_group[DATA_MAX_NAME_LEN]; char rf_name[DATA_MAX_NAME_LEN]; int rf_type; - struct timespec rf_interval; - struct timespec rf_effective_interval; - struct timespec rf_next_read; + cdtime_t rf_interval; + cdtime_t rf_effective_interval; + cdtime_t rf_next_read; }; typedef struct read_func_s read_func_t; +struct write_queue_s; +typedef struct write_queue_s write_queue_t; +struct write_queue_s +{ + value_list_t *vl; + plugin_ctx_t ctx; + write_queue_t *next; +}; + /* * Private variables */ @@ -95,12 +104,22 @@ static pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; static pthread_t *read_threads = NULL; static int read_threads_num = 0; +static write_queue_t *write_queue_head; +static write_queue_t *write_queue_tail; +static _Bool write_loop = 1; +static pthread_mutex_t write_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; +static pthread_t *write_threads = NULL; +static size_t write_threads_num = 0; + static pthread_key_t plugin_ctx_key; static _Bool plugin_ctx_key_initialized = 0; /* * Static functions */ +static int plugin_dispatch_values_internal (value_list_t *vl); + static const char *plugin_get_dir (void) { if (plugindir == NULL) @@ -341,13 +360,6 @@ static int plugin_load_file (char *file, uint32_t flags) return (0); } -static _Bool timeout_reached(struct timespec timeout) -{ - struct timeval now; - gettimeofday(&now, NULL); - return (now.tv_sec >= timeout.tv_sec && now.tv_usec >= (timeout.tv_nsec / 1000)); -} - static void *plugin_read_thread (void __attribute__((unused)) *args) { while (read_loop != 0) @@ -373,18 +385,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) } pthread_mutex_unlock (&read_lock); - if ((rf->rf_interval.tv_sec == 0) && (rf->rf_interval.tv_nsec == 0)) + if (rf->rf_interval == 0) { /* this should not happen, because the interval is set * for each plugin when loading it * XXX: issue a warning? */ - now = cdtime (); - - CDTIME_T_TO_TIMESPEC (plugin_get_interval (), &rf->rf_interval); - + rf->rf_interval = plugin_get_interval (); rf->rf_effective_interval = rf->rf_interval; - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = cdtime (); } /* sleep until this entry is due, @@ -396,11 +405,15 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * pthread_cond_timedwait returns. */ rc = 0; while ((read_loop != 0) - && !timeout_reached(rf->rf_next_read) + && (cdtime () < rf->rf_next_read) && rc == 0) { + struct timespec ts = { 0 }; + + CDTIME_T_TO_TIMESPEC (rf->rf_next_read, &ts); + rc = pthread_cond_timedwait (&read_cond, &read_lock, - &rf->rf_next_read); + &ts); } /* Must hold `read_lock' when accessing `rf->rf_type'. */ @@ -456,20 +469,14 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) * intervals in which it will be called. */ if (status != 0) { - rf->rf_effective_interval.tv_sec *= 2; - rf->rf_effective_interval.tv_nsec *= 2; - NORMALIZE_TIMESPEC (rf->rf_effective_interval); - - if (rf->rf_effective_interval.tv_sec >= 86400) - { - rf->rf_effective_interval.tv_sec = 86400; - rf->rf_effective_interval.tv_nsec = 0; - } + rf->rf_effective_interval *= 2; + if (rf->rf_effective_interval > TIME_T_TO_CDTIME_T (86400)) + rf->rf_effective_interval = TIME_T_TO_CDTIME_T (86400); NOTICE ("read-function of plugin `%s' failed. " - "Will suspend it for %i seconds.", + "Will suspend it for %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); } else { @@ -481,32 +488,26 @@ static void *plugin_read_thread (void __attribute__((unused)) *args) now = cdtime (); DEBUG ("plugin_read_thread: Effective interval of the " - "%s plugin is %i.%09i.", + "%s plugin is %.3f seconds.", rf->rf_name, - (int) rf->rf_effective_interval.tv_sec, - (int) rf->rf_effective_interval.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_effective_interval)); /* Calculate the next (absolute) time at which this function * should be called. */ - rf->rf_next_read.tv_sec = rf->rf_next_read.tv_sec - + rf->rf_effective_interval.tv_sec; - rf->rf_next_read.tv_nsec = rf->rf_next_read.tv_nsec - + rf->rf_effective_interval.tv_nsec; - NORMALIZE_TIMESPEC (rf->rf_next_read); + rf->rf_next_read += rf->rf_effective_interval; /* Check, if `rf_next_read' is in the past. */ - if (TIMESPEC_TO_CDTIME_T (&rf->rf_next_read) < now) + if (rf->rf_next_read < now) { /* `rf_next_read' is in the past. Insert `now' * so this value doesn't trail off into the * past too much. */ - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = now; } - DEBUG ("plugin_read_thread: Next read of the %s plugin at %i.%09i.", + DEBUG ("plugin_read_thread: Next read of the %s plugin at %.3f.", rf->rf_name, - (int) rf->rf_next_read.tv_sec, - (int) rf->rf_next_read.tv_nsec); + CDTIME_T_TO_DOUBLE (rf->rf_next_read)); /* Re-insert this read function into the heap again. */ c_heap_insert (read_heap, rf); @@ -573,6 +574,244 @@ static void stop_read_threads (void) read_threads_num = 0; } /* void stop_read_threads */ +static void plugin_value_list_free (value_list_t *vl) /* {{{ */ +{ + if (vl == NULL) + return; + + meta_data_destroy (vl->meta); + sfree (vl->values); + sfree (vl); +} /* }}} void plugin_value_list_free */ + +static value_list_t *plugin_value_list_clone (value_list_t const *vl_orig) /* {{{ */ +{ + value_list_t *vl; + + if (vl_orig == NULL) + return (NULL); + + vl = malloc (sizeof (*vl)); + if (vl == NULL) + return (NULL); + memcpy (vl, vl_orig, sizeof (*vl)); + + vl->values = calloc (vl_orig->values_len, sizeof (*vl->values)); + if (vl->values == NULL) + { + plugin_value_list_free (vl); + return (NULL); + } + memcpy (vl->values, vl_orig->values, + vl_orig->values_len * sizeof (*vl->values)); + + vl->meta = meta_data_clone (vl->meta); + if ((vl_orig->meta != NULL) && (vl->meta == NULL)) + { + plugin_value_list_free (vl); + return (NULL); + } + + if (vl->time == 0) + vl->time = cdtime (); + + /* Fill in the interval from the thread context, if it is zero. */ + if (vl->interval == 0) + { + plugin_ctx_t ctx = plugin_get_ctx (); + + if (ctx.interval != 0) + vl->interval = ctx.interval; + else + { + char name[6 * DATA_MAX_NAME_LEN]; + FORMAT_VL (name, sizeof (name), vl); + ERROR ("plugin_value_list_clone: Unable to determine " + "interval from context for " + "value list \"%s\". " + "This indicates a broken plugin. " + "Please report this problem to the " + "collectd mailing list or at " + ".", name); + vl->interval = cf_get_default_interval (); + } + } + + return (vl); +} /* }}} value_list_t *plugin_value_list_clone */ + +static int plugin_write_enqueue (value_list_t const *vl) /* {{{ */ +{ + write_queue_t *q; + + q = malloc (sizeof (*q)); + if (q == NULL) + return (ENOMEM); + q->next = NULL; + + q->vl = plugin_value_list_clone (vl); + if (q->vl == NULL) + { + sfree (q); + return (ENOMEM); + } + + /* Store context of caller (read plugin); otherwise, it would not be + * available to the write plugins when actually dispatching the + * value-list later on. */ + q->ctx = plugin_get_ctx (); + + pthread_mutex_lock (&write_lock); + + if (write_queue_tail == NULL) + { + write_queue_head = q; + write_queue_tail = q; + } + else + { + write_queue_tail->next = q; + write_queue_tail = q; + } + + pthread_cond_signal (&write_cond); + pthread_mutex_unlock (&write_lock); + + return (0); +} /* }}} int plugin_write_enqueue */ + +static value_list_t *plugin_write_dequeue (void) /* {{{ */ +{ + write_queue_t *q; + value_list_t *vl; + + pthread_mutex_lock (&write_lock); + + while (write_loop && (write_queue_head == NULL)) + pthread_cond_wait (&write_cond, &write_lock); + + if (write_queue_head == NULL) + { + pthread_mutex_unlock (&write_lock); + return (NULL); + } + + q = write_queue_head; + write_queue_head = q->next; + if (write_queue_head == NULL) + write_queue_tail = NULL; + + pthread_mutex_unlock (&write_lock); + + (void) plugin_set_ctx (q->ctx); + + vl = q->vl; + sfree (q); + return (vl); +} /* }}} value_list_t *plugin_write_dequeue */ + +static void *plugin_write_thread (void __attribute__((unused)) *args) /* {{{ */ +{ + while (write_loop) + { + value_list_t *vl = plugin_write_dequeue (); + if (vl == NULL) + continue; + + plugin_dispatch_values_internal (vl); + + plugin_value_list_free (vl); + } + + pthread_exit (NULL); + return ((void *) 0); +} /* }}} void *plugin_write_thread */ + +static void start_write_threads (size_t num) /* {{{ */ +{ + size_t i; + + if (write_threads != NULL) + return; + + write_threads = (pthread_t *) calloc (num, sizeof (pthread_t)); + if (write_threads == NULL) + { + ERROR ("plugin: start_write_threads: calloc failed."); + return; + } + + write_threads_num = 0; + for (i = 0; i < num; i++) + { + int status; + + status = pthread_create (write_threads + write_threads_num, + /* attr = */ NULL, + plugin_write_thread, + /* arg = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin: start_write_threads: pthread_create failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return; + } + + write_threads_num++; + } /* for (i) */ +} /* }}} void start_write_threads */ + +static void stop_write_threads (void) /* {{{ */ +{ + write_queue_t *q; + int i; + + if (write_threads == NULL) + return; + + INFO ("collectd: Stopping %zu write threads.", write_threads_num); + + pthread_mutex_lock (&write_lock); + write_loop = 0; + DEBUG ("plugin: stop_write_threads: Signalling `write_cond'"); + pthread_cond_broadcast (&write_cond); + pthread_mutex_unlock (&write_lock); + + for (i = 0; i < write_threads_num; i++) + { + if (pthread_join (write_threads[i], NULL) != 0) + { + ERROR ("plugin: stop_write_threads: pthread_join failed."); + } + write_threads[i] = (pthread_t) 0; + } + sfree (write_threads); + write_threads_num = 0; + + pthread_mutex_lock (&write_lock); + i = 0; + for (q = write_queue_head; q != NULL; ) + { + write_queue_t *q1 = q; + plugin_value_list_free (q->vl); + q = q->next; + sfree (q1); + i++; + } + write_queue_head = NULL; + write_queue_tail = NULL; + pthread_mutex_unlock (&write_lock); + + if (i > 0) + { + WARNING ("plugin: %i value list%s left after shutting down " + "the write threads.", + i, (i == 1) ? " was" : "s were"); + } +} /* }}} void stop_write_threads */ + /* * Public functions */ @@ -604,8 +843,6 @@ int plugin_load (const char *type, uint32_t flags) struct dirent *de; int status; - DEBUG ("type = %s", type); - dir = plugin_get_dir (); ret = 1; @@ -614,7 +851,7 @@ int plugin_load (const char *type, uint32_t flags) status = ssnprintf (typename, sizeof (typename), "%s.so", type); if ((status < 0) || ((size_t) status >= sizeof (typename))) { - WARNING ("snprintf: truncated: `%s.so'", type); + WARNING ("plugin_load: Filename too long: \"%s.so\"", type); return (-1); } typename_len = strlen (typename); @@ -622,7 +859,7 @@ int plugin_load (const char *type, uint32_t flags) if ((dh = opendir (dir)) == NULL) { char errbuf[1024]; - ERROR ("opendir (%s): %s", dir, + ERROR ("plugin_load: opendir (%s) failed: %s", dir, sstrerror (errno, errbuf, sizeof (errbuf))); return (-1); } @@ -636,25 +873,29 @@ int plugin_load (const char *type, uint32_t flags) "%s/%s", dir, de->d_name); if ((status < 0) || ((size_t) status >= sizeof (filename))) { - WARNING ("snprintf: truncated: `%s/%s'", dir, de->d_name); + WARNING ("plugin_load: Filename too long: \"%s/%s\"", + dir, de->d_name); continue; } if (lstat (filename, &statbuf) == -1) { char errbuf[1024]; - WARNING ("stat %s: %s", filename, + WARNING ("plugin_load: stat (\"%s\") failed: %s", + filename, sstrerror (errno, errbuf, sizeof (errbuf))); continue; } else if (!S_ISREG (statbuf.st_mode)) { /* don't follow symlinks */ - WARNING ("stat %s: not a regular file", filename); + WARNING ("plugin_load: %s is not a regular file.", + filename); continue; } - if (plugin_load_file (filename, flags) == 0) + status = plugin_load_file (filename, flags); + if (status == 0) { /* success */ ret = 0; @@ -662,14 +903,16 @@ int plugin_load (const char *type, uint32_t flags) } else { - fprintf (stderr, "Unable to load plugin %s.\n", type); + ERROR ("plugin_load: Load plugin \"%s\" failed with " + "status %i.", type, status); } } closedir (dh); - if (filename[0] == '\0') - fprintf (stderr, "Could not find plugin %s.\n", type); + if (filename[0] == 0) + ERROR ("plugin_load: Could not find plugin \"%s\" in %s", + type, dir); return (ret); } @@ -706,13 +949,9 @@ static int plugin_compare_read_func (const void *arg0, const void *arg1) rf0 = arg0; rf1 = arg1; - if (rf0->rf_next_read.tv_sec < rf1->rf_next_read.tv_sec) + if (rf0->rf_next_read < rf1->rf_next_read) return (-1); - else if (rf0->rf_next_read.tv_sec > rf1->rf_next_read.tv_sec) - return (1); - else if (rf0->rf_next_read.tv_nsec < rf1->rf_next_read.tv_nsec) - return (-1); - else if (rf0->rf_next_read.tv_nsec > rf1->rf_next_read.tv_nsec) + else if (rf0->rf_next_read > rf1->rf_next_read) return (1); else return (0); @@ -726,8 +965,8 @@ static int plugin_insert_read (read_func_t *rf) int status; llentry_t *le; - cdtime_t now = cdtime (); - CDTIME_T_TO_TIMESPEC (now, &rf->rf_next_read); + rf->rf_next_read = cdtime (); + rf->rf_effective_interval = rf->rf_interval; pthread_mutex_lock (&read_lock); @@ -790,43 +1029,12 @@ static int plugin_insert_read (read_func_t *rf) return (0); } /* int plugin_insert_read */ -static int read_cb_wrapper (user_data_t *ud) -{ - int (*callback) (void); - - if (ud == NULL) - return -1; - - callback = ud->data; - return callback(); -} /* int read_cb_wrapper */ - int plugin_register_read (const char *name, int (*callback) (void)) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; - if (ctx.interval != 0) { - /* If ctx.interval is not zero (== use the plugin or global - * interval), we need to use the "complex" read callback, - * because only that allows to specify a different interval. - * Wrap the callback using read_cb_wrapper(). */ - struct timespec interval; - user_data_t user_data; - - user_data.data = callback; - user_data.free_func = NULL; - - CDTIME_T_TO_TIMESPEC (ctx.interval, &interval); - return plugin_register_complex_read (/* group = */ NULL, - name, read_cb_wrapper, &interval, &user_data); - } - - DEBUG ("plugin_register_read: default_interval = %.3f", - CDTIME_T_TO_DOUBLE(plugin_get_interval ())); - rf = malloc (sizeof (*rf)); if (rf == NULL) { @@ -838,13 +1046,11 @@ int plugin_register_read (const char *name, rf->rf_callback = (void *) callback; rf->rf_udata.data = NULL; rf->rf_udata.free_func = NULL; - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); rf->rf_group[0] = '\0'; sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_SIMPLE; - rf->rf_interval.tv_sec = 0; - rf->rf_interval.tv_nsec = 0; - rf->rf_effective_interval = rf->rf_interval; + rf->rf_interval = plugin_get_interval (); status = plugin_insert_read (rf); if (status != 0) @@ -859,7 +1065,6 @@ int plugin_register_complex_read (const char *group, const char *name, user_data_t *user_data) { read_func_t *rf; - plugin_ctx_t ctx = plugin_get_ctx (); int status; rf = malloc (sizeof (*rf)); @@ -878,18 +1083,9 @@ int plugin_register_complex_read (const char *group, const char *name, sstrncpy (rf->rf_name, name, sizeof (rf->rf_name)); rf->rf_type = RF_COMPLEX; if (interval != NULL) - { - rf->rf_interval = *interval; - } - else if (ctx.interval != 0) - { - CDTIME_T_TO_TIMESPEC (ctx.interval, &rf->rf_interval); - } - rf->rf_effective_interval = rf->rf_interval; - - DEBUG ("plugin_register_read: interval = %i.%09i", - (int) rf->rf_interval.tv_sec, - (int) rf->rf_interval.tv_nsec); + rf->rf_interval = TIMESPEC_TO_CDTIME_T (interval); + else + rf->rf_interval = plugin_get_interval (); /* Set user data */ if (user_data == NULL) @@ -902,7 +1098,7 @@ int plugin_register_complex_read (const char *group, const char *name, rf->rf_udata = *user_data; } - rf->rf_ctx = ctx; + rf->rf_ctx = plugin_get_ctx (); status = plugin_insert_read (rf); if (status != 0) @@ -1190,6 +1386,15 @@ void plugin_init_all (void) chain_name = global_option_get ("PostCacheChain"); post_cache_chain = fc_chain_get_by_name (chain_name); + { + char const *tmp = global_option_get ("WriteThreads"); + int num = atoi (tmp); + + if (num < 1) + num = 5; + + start_write_threads ((size_t) num); + } if ((list_init == NULL) && (read_heap == NULL)) return; @@ -1459,6 +1664,8 @@ void plugin_shutdown_all (void) plugin_set_ctx (old_ctx); } + stop_write_threads (); + /* Write plugins which use the `user_data' pointer usually need the * same data available to the flush callback. If this is the case, set * the free_function to NULL when registering the flush callback and to @@ -1516,7 +1723,7 @@ int plugin_dispatch_missing (const value_list_t *vl) /* {{{ */ return (0); } /* int }}} plugin_dispatch_missing */ -int plugin_dispatch_values (value_list_t *vl) +static int plugin_dispatch_values_internal (value_list_t *vl) { int status; static c_complain_t no_write_complaint = C_COMPLAIN_INIT_STATIC; @@ -1567,29 +1774,10 @@ int plugin_dispatch_values (value_list_t *vl) return (-1); } - if (vl->time == 0) - vl->time = cdtime (); - - if (vl->interval <= 0) - { - plugin_ctx_t ctx = plugin_get_ctx (); - - if (ctx.interval != 0) - vl->interval = ctx.interval; - else - { - char name[6 * DATA_MAX_NAME_LEN]; - FORMAT_VL (name, sizeof (name), vl); - ERROR ("plugin_dispatch_values: Unable to determine " - "interval from context for " - "value list \"%s\". " - "This indicates a broken plugin. " - "Please report this problem to the " - "collectd mailing list or at " - ".", name); - vl->interval = cf_get_default_interval (); - } - } + /* Assured by plugin_value_list_clone(). The time is determined at + * _enqueue_ time. */ + assert (vl->time != 0); + assert (vl->interval != 0); DEBUG ("plugin_dispatch_values: time = %.3f; interval = %.3f; " "host = %s; " @@ -1710,53 +1898,24 @@ int plugin_dispatch_values (value_list_t *vl) } return (0); -} /* int plugin_dispatch_values */ +} /* int plugin_dispatch_values_internal */ -int plugin_dispatch_values_secure (const value_list_t *vl) +int plugin_dispatch_values (value_list_t const *vl) { - value_list_t vl_copy; - int status; - - if (vl == NULL) - return EINVAL; - - memcpy (&vl_copy, vl, sizeof (vl_copy)); - - /* Write callbacks must not change the values and meta pointers, so we can - * savely skip copying those and make this more efficient. */ - if ((pre_cache_chain == NULL) && (post_cache_chain == NULL)) - return (plugin_dispatch_values (&vl_copy)); - - /* Set pointers to NULL, just to be on the save side. */ - vl_copy.values = NULL; - vl_copy.meta = NULL; - - vl_copy.values = malloc (sizeof (*vl_copy.values) * vl->values_len); - if (vl_copy.values == NULL) - { - ERROR ("plugin_dispatch_values_secure: malloc failed."); - return (ENOMEM); - } - memcpy (vl_copy.values, vl->values, sizeof (*vl_copy.values) * vl->values_len); - - if (vl->meta != NULL) - { - vl_copy.meta = meta_data_clone (vl->meta); - if (vl_copy.meta == NULL) - { - ERROR ("plugin_dispatch_values_secure: meta_data_clone failed."); - free (vl_copy.values); - return (ENOMEM); - } - } /* if (vl->meta) */ - - status = plugin_dispatch_values (&vl_copy); + int status; - meta_data_destroy (vl_copy.meta); - free (vl_copy.values); + status = plugin_write_enqueue (vl); + if (status != 0) + { + char errbuf[1024]; + ERROR ("plugin_dispatch_values: plugin_write_enqueue failed " + "with status %i (%s).", status, + sstrerror (status, errbuf, sizeof (errbuf))); + return (status); + } - return (status); -} /* int plugin_dispatch_values_secure */ + return (0); +} int plugin_dispatch_notification (const notification_t *notif) { @@ -1880,6 +2039,12 @@ const data_set_t *plugin_get_ds (const char *name) { data_set_t *ds; + if (data_sets == NULL) + { + ERROR ("plugin_get_ds: No data sets are defined yet."); + return (NULL); + } + if (c_avl_get (data_sets, name, (void *) &ds) != 0) { DEBUG ("No such dataset registered: %s", name);