X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fpinba.c;h=6879733501cbedaa32cbfaa4afde3ca527771894;hb=31ee0e5282b59f89ac5c9bcaebb993450e45dba1;hp=1f10e31a00b3c3879b953416e39a3fabc607bfc4;hpb=edd9adbd031584bd54ed3a472d10c7afb0130d3f;p=collectd.git diff --git a/src/pinba.c b/src/pinba.c index 1f10e31a..68797335 100644 --- a/src/pinba.c +++ b/src/pinba.c @@ -36,101 +36,115 @@ #include "pinba.pb-c.h" /* - * Service declaration section + * Defines */ #ifndef PINBA_UDP_BUFFER_SIZE # define PINBA_UDP_BUFFER_SIZE 65536 #endif -#ifndef PINBA_DEFAULT_ADDRESS -# define PINBA_DEFAULT_ADDRESS "127.0.0.1" /* FIXME */ +#ifndef PINBA_DEFAULT_NODE +# define PINBA_DEFAULT_NODE "::0" #endif -#ifndef PINBA_DEFAULT_PORT -# define PINBA_DEFAULT_PORT 12345 /* FIXME */ +#ifndef PINBA_DEFAULT_SERVICE +# define PINBA_DEFAULT_SERVICE "30002" #endif #ifndef PINBA_MAX_SOCKETS # define PINBA_MAX_SOCKETS 16 #endif -#ifndef NI_MAXSERV -# define NI_MAXSERV 32 -#endif - /* * Private data structures */ -typedef struct _pinba_statres_ pinba_statres; -struct _pinba_statres_ { - const char *name; - double req_per_sec; - double req_time; - double ru_utime; - double ru_stime; - double doc_size; - double mem_peak; -}; - -struct pinba_socket_s { +/* {{{ */ +struct pinba_socket_s +{ struct pollfd fd[PINBA_MAX_SOCKETS]; nfds_t fd_num; }; typedef struct pinba_socket_s pinba_socket_t; -typedef double pinba_time_t; -typedef uint32_t pinba_size_t; +/* Fixed point counter value. n is the decimal part multiplied by 10^9. */ +struct float_counter_s +{ + uint64_t i; + uint64_t n; /* nanos */ +}; +typedef struct float_counter_s float_counter_t; -typedef struct pinba_statnode_s pinba_statnode_t; struct pinba_statnode_s { - /* collector name */ + /* collector name, used as plugin instance */ char *name; + /* query data */ char *host; char *server; char *script; - /* collected data */ - pinba_time_t last_coll; - pinba_size_t req_count; - pinba_time_t req_time; - pinba_time_t ru_utime; - pinba_time_t ru_stime; - pinba_size_t doc_size; - pinba_size_t mem_peak; + + derive_t req_count; + + float_counter_t req_time; + float_counter_t ru_utime; + float_counter_t ru_stime; + + derive_t doc_size; + gauge_t mem_peak; }; +typedef struct pinba_statnode_s pinba_statnode_t; +/* }}} */ +/* + * Module global variables + */ +/* {{{ */ static pinba_statnode_t *stat_nodes = NULL; static unsigned int stat_nodes_num = 0; static pthread_mutex_t stat_nodes_lock; -char service_status=0; -char *service_address = PINBA_DEFAULT_ADDRESS; -unsigned int service_port=PINBA_DEFAULT_PORT; +static char *conf_node = NULL; +static char *conf_service = NULL; static _Bool collector_thread_running = 0; static _Bool collector_thread_do_shutdown = 0; static pthread_t collector_thread_id; +/* }}} */ -static pinba_time_t now (void) /* {{{ */ +/* + * Functions + */ +static void float_counter_add (float_counter_t *fc, float val) /* {{{ */ { - static struct timeval tv; - - gettimeofday (&tv, /* tz = */ NULL); - - return (double)tv.tv_sec+((double)tv.tv_usec/(double)1000000); -} /* }}} pinba_time_t now */ + uint64_t tmp; + + if (val < 0.0) + return; + + tmp = (uint64_t) val; + val -= (double) tmp; + + fc->i += tmp; + fc->n += (uint64_t) ((val * 1000000000.0) + .5); -static void service_statnode_reset (pinba_statnode_t *node) /* {{{ */ + if (fc->n >= 1000000000) + { + fc->i += 1; + fc->n -= 1000000000; + assert (fc->n < 1000000000); + } +} /* }}} void float_counter_add */ + +static derive_t float_counter_get (const float_counter_t *fc, /* {{{ */ + uint64_t factor) { - node->last_coll=now(); - node->req_count=0; - node->req_time=0.0; - node->ru_utime=0.0; - node->ru_stime=0.0; - node->doc_size=0; - node->mem_peak=0; -} /* }}} void service_statnode_reset */ + derive_t ret; + + ret = (derive_t) (fc->i * factor); + ret += (derive_t) (fc->n / (1000000000 / factor)); + + return (ret); +} /* }}} derive_t float_counter_get */ static void strset (char **str, const char *new) /* {{{ */ { @@ -153,83 +167,44 @@ static void service_statnode_add(const char *name, /* {{{ */ const char *script) { pinba_statnode_t *node; - DEBUG("adding node `%s' to collector { %s, %s, %s }", name, host?host:"", server?server:"", script?script:""); - stat_nodes=realloc(stat_nodes, sizeof(pinba_statnode_t)*(stat_nodes_num+1)); - if(!stat_nodes){ - ERROR("Realloc failed!"); - exit(-1); + node = realloc (stat_nodes, + sizeof (*stat_nodes) * (stat_nodes_num + 1)); + if (node == NULL) + { + ERROR ("pinba plugin: realloc failed"); + return; } - - node=&stat_nodes[stat_nodes_num]; - - /* reset stat data */ - service_statnode_reset(node); + stat_nodes = node; + + node = stat_nodes + stat_nodes_num; + memset (node, 0, sizeof (*node)); /* reset strings */ - node->name=NULL; - node->host=NULL; - node->server=NULL; - node->script=NULL; + node->name = NULL; + node->host = NULL; + node->server = NULL; + node->script = NULL; + + node->mem_peak = NAN; /* fill query data */ - strset(&node->name, name); - strset(&node->host, host); - strset(&node->server, server); - strset(&node->script, script); + strset (&node->name, name); + strset (&node->host, host); + strset (&node->server, server); + strset (&node->script, script); /* increment counter */ stat_nodes_num++; } /* }}} void service_statnode_add */ -static void service_statnode_free (void) /* {{{ */ -{ - unsigned int i; - - if(stat_nodes_num < 1) - return; - - for (i = 0; i < stat_nodes_num; i++) - { - sfree (stat_nodes[i].name); - sfree (stat_nodes[i].host); - sfree (stat_nodes[i].server); - sfree (stat_nodes[i].script); - } - - sfree (stat_nodes); - stat_nodes_num = 0; - - pthread_mutex_destroy (&stat_nodes_lock); -} /* }}} void service_statnode_free */ - -static void service_statnode_init (void) /* {{{ */ -{ - /* only total info collect by default */ - service_statnode_free(); - - DEBUG("initializing collector.."); - pthread_mutex_init(&stat_nodes_lock, 0); -} /* }}} void service_statnode_init */ - -static void service_statnode_begin (void) /* {{{ */ -{ - service_statnode_init(); - pthread_mutex_lock(&stat_nodes_lock); - - service_statnode_add("total", NULL, NULL, NULL); -} /* }}} void service_statnode_begin */ - -static void service_statnode_end (void) /* {{{ */ -{ - pthread_mutex_unlock(&stat_nodes_lock); -} /* }}} void service_statnode_end */ - -static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */ +/* Copy the data from the global "stat_nodes" list into the buffer pointed to + * by "res", doing the derivation in the process. Returns the next index or + * zero if the end of the list has been reached. */ +static unsigned int service_statnode_collect (pinba_statnode_t *res, /* {{{ */ unsigned int index) { - pinba_statnode_t* node; - pinba_time_t delta; + pinba_statnode_t *node; if (stat_nodes_num == 0) return 0; @@ -244,24 +219,13 @@ static unsigned int service_statnode_collect (pinba_statres *res, /* {{{ */ pthread_mutex_unlock (&stat_nodes_lock); return 0; } - + node = stat_nodes + index; - delta = now() - node->last_coll; - - res->name = node->name; - res->req_per_sec = node->req_count / delta; - - if (node->req_count == 0) - node->req_count = 1; - - res->req_time = node->req_time / node->req_count; - res->ru_utime = node->ru_utime / node->req_count; - res->ru_stime = node->ru_stime / node->req_count; - res->ru_stime = node->ru_stime / node->req_count; - res->doc_size = node->doc_size / node->req_count; - res->mem_peak = node->mem_peak / node->req_count; + memcpy (res, node, sizeof (*res)); + + /* reset node */ + node->mem_peak = NAN; - service_statnode_reset (node); return (index + 1); } /* }}} unsigned int service_statnode_collect */ @@ -269,11 +233,17 @@ static void service_statnode_process (pinba_statnode_t *node, /* {{{ */ Pinba__Request* request) { node->req_count++; - node->req_time+=request->request_time; - node->ru_utime+=request->ru_utime; - node->ru_stime+=request->ru_stime; - node->doc_size+=request->document_size; - node->mem_peak+=request->memory_peak; + + float_counter_add (&node->req_time, request->request_time); + float_counter_add (&node->ru_utime, request->ru_utime); + float_counter_add (&node->ru_stime, request->ru_stime); + + node->doc_size += request->document_size; + + if (isnan (node->mem_peak) + || (node->mem_peak < ((gauge_t) request->memory_peak))) + node->mem_peak = (gauge_t) request->memory_peak; + } /* }}} void service_statnode_process */ static void service_process_request (Pinba__Request *request) /* {{{ */ @@ -284,11 +254,16 @@ static void service_process_request (Pinba__Request *request) /* {{{ */ for (i = 0; i < stat_nodes_num; i++) { - if(stat_nodes[i].host && strcmp(request->hostname, stat_nodes[i].host)) + if ((stat_nodes[i].host != NULL) + && (strcmp (request->hostname, stat_nodes[i].host) != 0)) continue; - if(stat_nodes[i].server && strcmp(request->server_name, stat_nodes[i].server)) + + if ((stat_nodes[i].server != NULL) + && (strcmp (request->server_name, stat_nodes[i].server) != 0)) continue; - if(stat_nodes[i].script && strcmp(request->script_name, stat_nodes[i].script)) + + if ((stat_nodes[i].script != NULL) + && (strcmp (request->script_name, stat_nodes[i].script) != 0)) continue; service_statnode_process(&stat_nodes[i], request); @@ -368,7 +343,7 @@ static int pb_add_socket (pinba_socket_t *s, /* {{{ */ } /* }}} int pb_add_socket */ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ - int listen_port) + const char *service) { pinba_socket_t *s; struct addrinfo *ai_list; @@ -376,9 +351,6 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ struct addrinfo ai_hints; int status; - char service[NI_MAXSERV]; /* FIXME */ - snprintf (service, sizeof (service), "%i", listen_port); - memset (&ai_hints, 0, sizeof (ai_hints)); ai_hints.ai_flags = AI_PASSIVE; ai_hints.ai_family = AF_UNSPEC; @@ -387,6 +359,12 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ ai_hints.ai_canonname = NULL; ai_hints.ai_next = NULL; + if (node == NULL) + node = PINBA_DEFAULT_NODE; + + if (service == NULL) + service = PINBA_DEFAULT_SERVICE; + ai_list = NULL; status = getaddrinfo (node, service, &ai_hints, &ai_list); @@ -399,7 +377,7 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ assert (ai_list != NULL); s = malloc (sizeof (*s)); - if (s != NULL) + if (s == NULL) { freeaddrinfo (ai_list); ERROR ("pinba plugin: malloc failed."); @@ -514,7 +492,7 @@ static int receive_loop (void) /* {{{ */ { pinba_socket_t *s; - s = pinba_socket_open (service_address, service_port); + s = pinba_socket_open (conf_node, conf_service); if (s == NULL) { ERROR ("pinba plugin: Collector thread is exiting prematurely."); @@ -580,100 +558,94 @@ static void *collector_thread (void *arg) /* {{{ */ /* * Plugin declaration section */ - -static int config_set (char **var, const char *value) /* {{{ */ +static int pinba_config_view (const oconfig_item_t *ci) /* {{{ */ { - /* code from nginx plugin for collectd */ - if (*var != NULL) { - free (*var); - *var = NULL; + char *name = NULL; + char *host = NULL; + char *server = NULL; + char *script = NULL; + int status; + int i; + + status = cf_util_get_string (ci, &name); + if (status != 0) + return (status); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &host); + else if (strcasecmp ("Server", child->key) == 0) + status = cf_util_get_string (child, &server); + else if (strcasecmp ("Script", child->key) == 0) + status = cf_util_get_string (child, &script); + else + { + WARNING ("pinba plugin: Unknown config option: %s", child->key); + status = -1; + } + + if (status != 0) + break; } - - if ((*var = strdup (value)) == NULL) return (1); - else return (0); -} /* }}} int config_set */ + + if (status == 0) + service_statnode_add (name, host, server, script); + + sfree (name); + sfree (host); + sfree (server); + sfree (script); + + return (status); +} /* }}} int pinba_config_view */ static int plugin_config (oconfig_item_t *ci) /* {{{ */ { - unsigned int i, o; - int pinba_port = 0; - char *pinba_address = NULL; - - INFO("Pinba Configure.."); - - service_statnode_begin(); - - /* Set default values */ - config_set(&pinba_address, PINBA_DEFAULT_ADDRESS); - pinba_port = PINBA_DEFAULT_PORT; + int i; - for (i = 0; i < ci->children_num; i++) { + /* The lock should not be necessary in the config callback, but let's be + * sure.. */ + pthread_mutex_lock (&stat_nodes_lock); + + for (i = 0; i < ci->children_num; i++) + { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Address", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){ - WARNING ("pinba plugin: `Address' needs exactly one string argument."); - return (-1); - } - config_set(&pinba_address, child->values[0].value.string); - } else if (strcasecmp ("Port", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_NUMBER)){ - WARNING ("pinba plugin: `Port' needs exactly one number argument."); - return (-1); - } - pinba_port=child->values[0].value.number; - } else if (strcasecmp ("View", child->key) == 0) { - const char *name=NULL, *host=NULL, *server=NULL, *script=NULL; - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING) || strlen(child->values[0].value.string)==0){ - WARNING ("pinba plugin: `View' needs exactly one non-empty string argument."); - return (-1); - } - name = child->values[0].value.string; - for(o=0; ochildren_num; o++){ - oconfig_item_t *node = child->children + o; - if (strcasecmp ("Host", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Host' needs exactly one non-empty string argument."); - return (-1); - } - host = node->values[0].value.string; - } else if (strcasecmp ("Server", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Server' needs exactly one non-empty string argument."); - return (-1); - } - server = node->values[0].value.string; - } else if (strcasecmp ("Script", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Script' needs exactly one non-empty string argument."); - return (-1); - } - script = node->values[0].value.string; - } else { - WARNING ("pinba plugin: In `' context allowed only `Host', `Server' and `Script' options but not the `%s'.", node->key); - return (-1); - } - } - /* add new statnode */ - service_statnode_add(name, host, server, script); - } else { - WARNING ("pinba plugin: In `' context allowed only `Address', `Port' and `Observe' options but not the `%s'.", child->key); - return (-1); - } + + if (strcasecmp ("Address", child->key) == 0) + cf_util_get_string (child, &conf_node); + else if (strcasecmp ("Port", child->key) == 0) + cf_util_get_service (child, &conf_service); + else if (strcasecmp ("View", child->key) == 0) + pinba_config_view (child); + else + WARNING ("pinba plugin: Unknown config option: %s", child->key); } - - service_statnode_end(); + + pthread_mutex_unlock(&stat_nodes_lock); return (0); -} /* int pinba_config */ +} /* }}} int pinba_config */ static int plugin_init (void) /* {{{ */ { int status; + if (stat_nodes == NULL) + { + /* Collect the "total" data by default. */ + service_statnode_add ("total", + /* host = */ NULL, + /* server = */ NULL, + /* script = */ NULL); + } + if (collector_thread_running) return (0); - status = pthread_create (&collector_thread_id, + status = plugin_thread_create (&collector_thread_id, /* attrs = */ NULL, collector_thread, /* args = */ NULL); @@ -713,27 +685,42 @@ static int plugin_shutdown (void) /* {{{ */ return (0); } /* }}} int plugin_shutdown */ -static int plugin_submit (const char *plugin_instance, /* {{{ */ - const char *type, - const pinba_statres *res) { - value_t values[6]; +static int plugin_submit (const pinba_statnode_t *res) /* {{{ */ +{ + value_t value; value_list_t vl = VALUE_LIST_INIT; - values[0].gauge = res->req_per_sec; - values[1].gauge = res->req_time; - values[2].gauge = res->ru_utime; - values[3].gauge = res->ru_stime; - values[4].gauge = res->doc_size; - values[5].gauge = res->mem_peak; - - vl.values = values; - vl.values_len = 6; + vl.values = &value; + vl.values_len = 1; sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "pinba", sizeof (vl.plugin)); - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof(vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - INFO("Pinba Dispatch"); + sstrncpy (vl.plugin_instance, res->name, sizeof (vl.plugin_instance)); + + value.derive = res->req_count; + sstrncpy (vl.type, "total_requests", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->req_time, /* factor = */ 1000); + sstrncpy (vl.type, "total_time_in_ms", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = res->doc_size; + sstrncpy (vl.type, "total_bytes", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_utime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "user", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_stime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "system", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.gauge = res->mem_peak; + sstrncpy (vl.type, "memory", sizeof (vl.type)); + sstrncpy (vl.type_instance, "peak", sizeof (vl.type_instance)); plugin_dispatch_values (&vl); return (0); @@ -742,11 +729,11 @@ static int plugin_submit (const char *plugin_instance, /* {{{ */ static int plugin_read (void) /* {{{ */ { unsigned int i=0; - static pinba_statres res; + pinba_statnode_t data; - while ((i = service_statnode_collect (&res, i)) != 0) + while ((i = service_statnode_collect (&data, i)) != 0) { - plugin_submit(res.name, "pinba_view", &res); + plugin_submit (&data); } return 0;