X-Git-Url: https://git.tokkee.org/?a=blobdiff_plain;f=src%2Fpinba.c;h=6879733501cbedaa32cbfaa4afde3ca527771894;hb=31ee0e5282b59f89ac5c9bcaebb993450e45dba1;hp=40cc55e560eda7af50aee64fbb6de3d9504b0169;hpb=4dadeaf742196b930cd634e3a2bdb10e11e278bd;p=collectd.git diff --git a/src/pinba.c b/src/pinba.c index 40cc55e5..68797335 100644 --- a/src/pinba.c +++ b/src/pinba.c @@ -1,6 +1,8 @@ /** - * collectd - src/pinba.c + * collectd - src/pinba.c (based on code from pinba_engine 0.0.5) + * Copyright (c) 2007-2009 Antony Dovgal * Copyright (C) 2010 Phoenix Kayo + * Copyright (C) 2010 Florian Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -16,11 +18,11 @@ * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA * * Authors: + * Antony Dovgal * Phoenix Kayo + * Florian Forster **/ -#define _XOPEN_SOURCE 500 - #include "collectd.h" #include "common.h" #include "plugin.h" @@ -28,95 +30,121 @@ #include #include -#include -#include +#include +#include #include "pinba.pb-c.h" -typedef uint8_t u_char; - -#include - /* - * Service declaration section + * Defines */ #ifndef PINBA_UDP_BUFFER_SIZE # define PINBA_UDP_BUFFER_SIZE 65536 #endif -typedef struct _pinba_statres_ pinba_statres; -struct _pinba_statres_ { - const char *name; - double req_per_sec; - double req_time; - double ru_utime; - double ru_stime; - double doc_size; - double mem_peak; -}; +#ifndef PINBA_DEFAULT_NODE +# define PINBA_DEFAULT_NODE "::0" +#endif -typedef struct _pinba_socket_ pinba_socket; -struct _pinba_socket_ { - int listen_sock; - struct event *accept_event; -}; +#ifndef PINBA_DEFAULT_SERVICE +# define PINBA_DEFAULT_SERVICE "30002" +#endif -typedef double pinba_time; -typedef uint32_t pinba_size; +#ifndef PINBA_MAX_SOCKETS +# define PINBA_MAX_SOCKETS 16 +#endif -static pinba_time now (void) +/* + * Private data structures + */ +/* {{{ */ +struct pinba_socket_s { - static struct timeval tv; - - gettimeofday (&tv, /* tz = */ NULL); - - return (double)tv.tv_sec+((double)tv.tv_usec/(double)1000000); -} - -static pthread_rwlock_t temp_lock; - -static struct event_base *temp_base = NULL; + struct pollfd fd[PINBA_MAX_SOCKETS]; + nfds_t fd_num; +}; +typedef struct pinba_socket_s pinba_socket_t; -static pinba_socket *temp_sock = NULL; +/* Fixed point counter value. n is the decimal part multiplied by 10^9. */ +struct float_counter_s +{ + uint64_t i; + uint64_t n; /* nanos */ +}; +typedef struct float_counter_s float_counter_t; -static pthread_t temp_thrd; +struct pinba_statnode_s +{ + /* collector name, used as plugin instance */ + char *name; -typedef struct _pinba_statnode_ pinba_statnode; -struct _pinba_statnode_{ - /* collector name */ - char* name; /* query data */ char *host; char *server; char *script; - /* collected data */ - pinba_time last_coll; - pinba_size req_count; - pinba_time req_time; - pinba_time ru_utime; - pinba_time ru_stime; - pinba_size doc_size; - pinba_size mem_peak; + + derive_t req_count; + + float_counter_t req_time; + float_counter_t ru_utime; + float_counter_t ru_stime; + + derive_t doc_size; + gauge_t mem_peak; }; +typedef struct pinba_statnode_s pinba_statnode_t; +/* }}} */ -static unsigned int stat_nodes_count=0; +/* + * Module global variables + */ +/* {{{ */ +static pinba_statnode_t *stat_nodes = NULL; +static unsigned int stat_nodes_num = 0; +static pthread_mutex_t stat_nodes_lock; -static pinba_statnode *stat_nodes = NULL; +static char *conf_node = NULL; +static char *conf_service = NULL; -char service_status=0; -char *service_address = PINBA_DEFAULT_ADDRESS; -unsigned int service_port=PINBA_DEFAULT_PORT; +static _Bool collector_thread_running = 0; +static _Bool collector_thread_do_shutdown = 0; +static pthread_t collector_thread_id; +/* }}} */ -static void service_statnode_reset (pinba_statnode *node) /* {{{ */ +/* + * Functions + */ +static void float_counter_add (float_counter_t *fc, float val) /* {{{ */ { - node->last_coll=now(); - node->req_count=0; - node->req_time=0.0; - node->ru_utime=0.0; - node->ru_stime=0.0; - node->doc_size=0; - node->mem_peak=0; -} /* }}} void service_statnode_reset */ + uint64_t tmp; + + if (val < 0.0) + return; + + tmp = (uint64_t) val; + val -= (double) tmp; + + fc->i += tmp; + fc->n += (uint64_t) ((val * 1000000000.0) + .5); + + if (fc->n >= 1000000000) + { + fc->i += 1; + fc->n -= 1000000000; + assert (fc->n < 1000000000); + } +} /* }}} void float_counter_add */ + +static derive_t float_counter_get (const float_counter_t *fc, /* {{{ */ + uint64_t factor) +{ + derive_t ret; + + ret = (derive_t) (fc->i * factor); + ret += (derive_t) (fc->n / (1000000000 / factor)); + + return (ret); +} /* }}} derive_t float_counter_get */ static void strset (char **str, const char *new) /* {{{ */ { @@ -138,516 +166,585 @@ static void service_statnode_add(const char *name, /* {{{ */ const char *server, const char *script) { - pinba_statnode *node; - DEBUG("adding node `%s' to collector { %s, %s, %s }", name, host?host:"", server?server:"", script?script:""); + pinba_statnode_t *node; - stat_nodes=realloc(stat_nodes, sizeof(pinba_statnode)*(stat_nodes_count+1)); - if(!stat_nodes){ - ERROR("Realloc failed!"); - exit(-1); + node = realloc (stat_nodes, + sizeof (*stat_nodes) * (stat_nodes_num + 1)); + if (node == NULL) + { + ERROR ("pinba plugin: realloc failed"); + return; } - - node=&stat_nodes[stat_nodes_count]; - - /* reset stat data */ - service_statnode_reset(node); + stat_nodes = node; + + node = stat_nodes + stat_nodes_num; + memset (node, 0, sizeof (*node)); /* reset strings */ - node->name=NULL; - node->host=NULL; - node->server=NULL; - node->script=NULL; + node->name = NULL; + node->host = NULL; + node->server = NULL; + node->script = NULL; + + node->mem_peak = NAN; /* fill query data */ - strset(&node->name, name); - strset(&node->host, host); - strset(&node->server, server); - strset(&node->script, script); + strset (&node->name, name); + strset (&node->host, host); + strset (&node->server, server); + strset (&node->script, script); /* increment counter */ - stat_nodes_count++; + stat_nodes_num++; } /* }}} void service_statnode_add */ -static void service_statnode_free (void) -{ - unsigned int i; - - if(stat_nodes_count < 1) - return; - - for (i = 0; i < stat_nodes_count; i++) - { - sfree (stat_nodes[i].name); - sfree (stat_nodes[i].host); - sfree (stat_nodes[i].server); - sfree (stat_nodes[i].script); - } - - sfree (stat_nodes); - stat_nodes_count = 0; - - pthread_rwlock_destroy (&temp_lock); -} - -static void service_statnode_init (void) +/* Copy the data from the global "stat_nodes" list into the buffer pointed to + * by "res", doing the derivation in the process. Returns the next index or + * zero if the end of the list has been reached. */ +static unsigned int service_statnode_collect (pinba_statnode_t *res, /* {{{ */ + unsigned int index) { - /* only total info collect by default */ - service_statnode_free(); + pinba_statnode_t *node; - DEBUG("initializing collector.."); - pthread_rwlock_init(&temp_lock, 0); -} - -static void service_statnode_begin (void) -{ - service_statnode_init(); - pthread_rwlock_wrlock(&temp_lock); - - service_statnode_add("total", NULL, NULL, NULL); -} - -static void service_statnode_end (void) -{ - pthread_rwlock_unlock(&temp_lock); -} - -static unsigned int service_statnode_collect (pinba_statres *res, - unsigned int i) -{ - pinba_statnode* node; - - if(stat_nodes_count==0) return 0; + if (stat_nodes_num == 0) + return 0; /* begin collecting */ - if(i==0){ - pthread_rwlock_wrlock(&temp_lock); - } - - /* find non-empty node */ - //for(node=stat_nodes+i; node->req_count==0 && ++i=stat_nodes_count){ - pthread_rwlock_unlock(&temp_lock); + if (index >= stat_nodes_num) + { + pthread_mutex_unlock (&stat_nodes_lock); return 0; } + + node = stat_nodes + index; + memcpy (res, node, sizeof (*res)); + + /* reset node */ + node->mem_peak = NAN; - node=stat_nodes+i; - - pinba_time delta=now()-node->last_coll; - - res->name=node->name; - - res->req_per_sec=node->req_count/delta; - - if(node->req_count==0)node->req_count=1; - res->req_time=node->req_time/node->req_count; - res->ru_utime=node->ru_utime/node->req_count; - res->ru_stime=node->ru_stime/node->req_count; - res->ru_stime=node->ru_stime/node->req_count; - res->doc_size=node->doc_size/node->req_count; - res->mem_peak=node->mem_peak/node->req_count; - - service_statnode_reset(node); - return ++i; -} + return (index + 1); +} /* }}} unsigned int service_statnode_collect */ -static void service_statnode_process (pinba_statnode *node, +static void service_statnode_process (pinba_statnode_t *node, /* {{{ */ Pinba__Request* request) { node->req_count++; - node->req_time+=request->request_time; - node->ru_utime+=request->ru_utime; - node->ru_stime+=request->ru_stime; - node->doc_size+=request->document_size; - node->mem_peak+=request->memory_peak; -} - -static void service_process_request (Pinba__Request *request) + + float_counter_add (&node->req_time, request->request_time); + float_counter_add (&node->ru_utime, request->ru_utime); + float_counter_add (&node->ru_stime, request->ru_stime); + + node->doc_size += request->document_size; + + if (isnan (node->mem_peak) + || (node->mem_peak < ((gauge_t) request->memory_peak))) + node->mem_peak = (gauge_t) request->memory_peak; + +} /* }}} void service_statnode_process */ + +static void service_process_request (Pinba__Request *request) /* {{{ */ { unsigned int i; - pthread_rwlock_wrlock (&temp_lock); + pthread_mutex_lock (&stat_nodes_lock); - for (i = 0; i < stat_nodes_count; i++) + for (i = 0; i < stat_nodes_num; i++) { - if(stat_nodes[i].host && strcmp(request->hostname, stat_nodes[i].host)) + if ((stat_nodes[i].host != NULL) + && (strcmp (request->hostname, stat_nodes[i].host) != 0)) continue; - if(stat_nodes[i].server && strcmp(request->server_name, stat_nodes[i].server)) + + if ((stat_nodes[i].server != NULL) + && (strcmp (request->server_name, stat_nodes[i].server) != 0)) continue; - if(stat_nodes[i].script && strcmp(request->script_name, stat_nodes[i].script)) + + if ((stat_nodes[i].script != NULL) + && (strcmp (request->script_name, stat_nodes[i].script) != 0)) continue; service_statnode_process(&stat_nodes[i], request); } - pthread_rwlock_unlock(&temp_lock); -} + pthread_mutex_unlock(&stat_nodes_lock); +} /* }}} void service_process_request */ -static void *pinba_main (void *arg) +static int pb_del_socket (pinba_socket_t *s, /* {{{ */ + nfds_t index) { - DEBUG("entering listen-loop.."); - - service_status=1; - event_base_dispatch(temp_base); - - /* unreachable */ - return NULL; -} + if (index >= s->fd_num) + return (EINVAL); + + close (s->fd[index].fd); + s->fd[index].fd = -1; + + /* When deleting the last element in the list, no memmove is necessary. */ + if (index < (s->fd_num - 1)) + { + memmove (&s->fd[index], &s->fd[index + 1], + sizeof (s->fd[0]) * (s->fd_num - (index + 1))); + } -static void pinba_socket_free (pinba_socket *socket) /* {{{ */ + s->fd_num--; + return (0); +} /* }}} int pb_del_socket */ + +static int pb_add_socket (pinba_socket_t *s, /* {{{ */ + const struct addrinfo *ai) { - if (!socket) - return; - - if (socket->listen_sock >= 0) + int fd; + int tmp; + int status; + + if (s->fd_num == PINBA_MAX_SOCKETS) { - close(socket->listen_sock); - socket->listen_sock = -1; + WARNING ("pinba plugin: Sorry, you have hit the built-in limit of " + "%i sockets. Please complain to the collectd developers so we can " + "raise the limit.", PINBA_MAX_SOCKETS); + return (-1); } - - if (socket->accept_event) + + fd = socket (ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (fd < 0) { - event_del(socket->accept_event); - free(socket->accept_event); - socket->accept_event = NULL; + char errbuf[1024]; + ERROR ("pinba plugin: socket(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (0); } - - free(socket); -} /* }}} void pinba_socket_free */ -static int pinba_process_stats_packet (const unsigned char *buf, - int buf_len) -{ - Pinba__Request *request; - - request = pinba__request__unpack(NULL, buf_len, buf); - - if (!request) { - return P_FAILURE; - } else { - service_process_request(request); - - pinba__request__free_unpacked(request, NULL); - - return P_SUCCESS; + tmp = 1; + status = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &tmp, sizeof (tmp)); + if (status != 0) + { + char errbuf[1024]; + WARNING ("pinba plugin: setsockopt(SO_REUSEADDR) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); } -} -static void pinba_udp_read_callback_fn (int sock, short event, void *arg) -{ - if (event & EV_READ) { - int ret; - unsigned char buf[PINBA_UDP_BUFFER_SIZE]; - struct sockaddr_in from; - socklen_t fromlen = sizeof(struct sockaddr_in); - - ret = recvfrom(sock, buf, PINBA_UDP_BUFFER_SIZE-1, MSG_DONTWAIT, (struct sockaddr *)&from, &fromlen); - if (ret > 0) { - if (pinba_process_stats_packet(buf, ret) != P_SUCCESS) { - DEBUG("failed to parse data received from %s", inet_ntoa(from.sin_addr)); - } - } else if (ret < 0) { - if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { - return; - } - WARNING("recv() failed: %s (%d)", strerror(errno), errno); - } else { - WARNING("recv() returned 0"); - } + status = bind (fd, ai->ai_addr, ai->ai_addrlen); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: bind(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (0); } -} -static pinba_socket *pinba_socket_open (const char *ip, /* {{{ */ - int listen_port) + s->fd[s->fd_num].fd = fd; + s->fd[s->fd_num].events = POLLIN | POLLPRI; + s->fd[s->fd_num].revents = 0; + s->fd_num++; + + return (0); +} /* }}} int pb_add_socket */ + +static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ + const char *service) { - struct sockaddr_in addr; - pinba_socket *s; - int sfd, flags, yes = 1; - - if ((sfd = socket(AF_INET, SOCK_DGRAM, 0)) == -1) { - ERROR("socket() failed: %s (%d)", strerror(errno), errno); - return NULL; - } - - if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { - close(sfd); - return NULL; - } - - if(setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { - close(sfd); - return NULL; - } - - s = (pinba_socket *)calloc(1, sizeof(pinba_socket)); - if (!s) { - return NULL; - } - s->listen_sock = sfd; - - memset(&addr, 0, sizeof(addr)); - - addr.sin_family = AF_INET; - addr.sin_port = htons(listen_port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (ip && *ip) { - struct in_addr tmp; - - if (inet_aton(ip, &tmp)) { - addr.sin_addr.s_addr = tmp.s_addr; - } else { - WARNING("inet_aton(%s) failed, listening on ANY IP-address", ip); - } - } - - if (bind(s->listen_sock, (struct sockaddr *)&addr, sizeof(addr))) { - pinba_socket_free(s); - ERROR("bind() failed: %s (%d)", strerror(errno), errno); - return NULL; + pinba_socket_t *s; + struct addrinfo *ai_list; + struct addrinfo *ai_ptr; + struct addrinfo ai_hints; + int status; + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = AI_PASSIVE; + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_DGRAM; + ai_hints.ai_addr = NULL; + ai_hints.ai_canonname = NULL; + ai_hints.ai_next = NULL; + + if (node == NULL) + node = PINBA_DEFAULT_NODE; + + if (service == NULL) + service = PINBA_DEFAULT_SERVICE; + + ai_list = NULL; + status = getaddrinfo (node, service, + &ai_hints, &ai_list); + if (status != 0) + { + ERROR ("pinba plugin: getaddrinfo(3) failed: %s", + gai_strerror (status)); + return (NULL); } - - s->accept_event = (struct event *)calloc(1, sizeof(struct event)); - if (!s->accept_event) { - ERROR("calloc() failed: %s (%d)", strerror(errno), errno); - pinba_socket_free(s); - return NULL; + assert (ai_list != NULL); + + s = malloc (sizeof (*s)); + if (s == NULL) + { + freeaddrinfo (ai_list); + ERROR ("pinba plugin: malloc failed."); + return (NULL); } + memset (s, 0, sizeof (*s)); + + for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + status = pb_add_socket (s, ai_ptr); + if (status != 0) + break; + } /* for (ai_list) */ - event_set(s->accept_event, s->listen_sock, EV_READ | EV_PERSIST, pinba_udp_read_callback_fn, s); - event_base_set(temp_base, s->accept_event); - event_add(s->accept_event, NULL); - - return s; -} /* }}} */ + freeaddrinfo (ai_list); -static int service_cleanup (void) -{ - DEBUG("closing socket.."); - if(temp_sock){ - pthread_rwlock_wrlock(&temp_lock); - pinba_socket_free(temp_sock); - pthread_rwlock_unlock(&temp_lock); + if (s->fd_num < 1) + { + WARNING ("pinba plugin: Unable to open socket for address %s.", node); + sfree (s); + s = NULL; } - - DEBUG("shutdowning event.."); - event_base_free(temp_base); - - DEBUG("shutting down.."); - return (0); -} + return (s); +} /* }}} pinba_socket_open */ -static int service_start(void) +static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */ { - DEBUG("starting up.."); - - DEBUG("initializing event.."); - temp_base = event_base_new(); - - DEBUG("opening socket.."); - - temp_sock = pinba_socket_open(service_address, service_port); - - if (!temp_sock) { - service_cleanup(); - return 1; - } + nfds_t i; + + if (!socket) + return; - if (pthread_create(&temp_thrd, NULL, pinba_main, NULL)) { - service_cleanup(); - return 1; + for (i = 0; i < socket->fd_num; i++) + { + if (socket->fd[i].fd < 0) + continue; + close (socket->fd[i].fd); + socket->fd[i].fd = -1; } - return 0; -} + sfree(socket); +} /* }}} void pinba_socket_free */ -static int service_stop (void) +static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */ + size_t buffer_size) { - pthread_cancel(temp_thrd); - pthread_join(temp_thrd, NULL); - service_status=0; - DEBUG("terminating listen-loop.."); + Pinba__Request *request; - service_cleanup(); + request = pinba__request__unpack (NULL, buffer_size, buffer); - return 0; -} + if (!request) + return (-1); -static void service_config (const char *address, unsigned int port) /* {{{ */ + service_process_request(request); + pinba__request__free_unpacked (request, NULL); + + return (0); +} /* }}} int pinba_process_stats_packet */ + +static int pinba_udp_read_callback_fn (int sock) /* {{{ */ { - int need_restart = 0; + uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; + size_t buffer_size; + int status; - if (address && service_address && (strcmp(service_address, address) != 0)) + while (42) { - strset (&service_address, address); - need_restart++; - } + buffer_size = sizeof (buffer); + status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); + if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) +#ifdef EWOULDBLOCK + || (errno == EWOULDBLOCK) +#endif + || (errno == EAGAIN)) + { + continue; + } + + WARNING("pinba plugin: recvfrom(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + else if (status == 0) + { + DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero."); + return (-1); + } + else /* if (status > 0) */ + { + assert (((size_t) status) < buffer_size); + buffer_size = (size_t) status; + buffer[buffer_size] = 0; + + status = pinba_process_stats_packet (buffer, buffer_size); + if (status != 0) + DEBUG("pinba plugin: Parsing packet failed."); + return (status); + } + } /* while (42) */ + + /* not reached */ + assert (23 == 42); + return (-1); +} /* }}} void pinba_udp_read_callback_fn */ + +static int receive_loop (void) /* {{{ */ +{ + pinba_socket_t *s; - if ((port > 0) && (port < 65536) && (service_port != port)) + s = pinba_socket_open (conf_node, conf_service); + if (s == NULL) { - service_port=port; - need_restart++; + ERROR ("pinba plugin: Collector thread is exiting prematurely."); + return (-1); } - if(service_status && need_restart) + while (!collector_thread_do_shutdown) { - service_stop(); - service_start(); - } -} /* }}} void service_config */ + int status; + nfds_t i; + + if (s->fd_num < 1) + break; + + status = poll (s->fd, s->fd_num, /* timeout = */ 1000); + if (status == 0) /* timeout */ + { + continue; + } + else if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + + ERROR ("pinba plugin: poll(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + pinba_socket_free (s); + return (-1); + } + + for (i = 0; i < s->fd_num; i++) + { + if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) + { + pb_del_socket (s, i); + i--; + } + else if (s->fd[i].revents & (POLLIN | POLLPRI)) + { + pinba_udp_read_callback_fn (s->fd[i].fd); + } + } /* for (s->fd) */ + } /* while (!collector_thread_do_shutdown) */ + + pinba_socket_free (s); + s = NULL; + + return (0); +} /* }}} int receive_loop */ + +static void *collector_thread (void *arg) /* {{{ */ +{ + receive_loop (); + + memset (&collector_thread_id, 0, sizeof (collector_thread_id)); + collector_thread_running = 0; + pthread_exit (NULL); + return (NULL); +} /* }}} void *collector_thread */ /* * Plugin declaration section */ - -static int config_set (char **var, const char *value) +static int pinba_config_view (const oconfig_item_t *ci) /* {{{ */ { - /* code from nginx plugin for collectd */ - if (*var != NULL) { - free (*var); - *var = NULL; + char *name = NULL; + char *host = NULL; + char *server = NULL; + char *script = NULL; + int status; + int i; + + status = cf_util_get_string (ci, &name); + if (status != 0) + return (status); + + for (i = 0; i < ci->children_num; i++) + { + oconfig_item_t *child = ci->children + i; + + if (strcasecmp ("Host", child->key) == 0) + status = cf_util_get_string (child, &host); + else if (strcasecmp ("Server", child->key) == 0) + status = cf_util_get_string (child, &server); + else if (strcasecmp ("Script", child->key) == 0) + status = cf_util_get_string (child, &script); + else + { + WARNING ("pinba plugin: Unknown config option: %s", child->key); + status = -1; + } + + if (status != 0) + break; } - - if ((*var = strdup (value)) == NULL) return (1); - else return (0); -} -static int plugin_config (oconfig_item_t *ci) + if (status == 0) + service_statnode_add (name, host, server, script); + + sfree (name); + sfree (host); + sfree (server); + sfree (script); + + return (status); +} /* }}} int pinba_config_view */ + +static int plugin_config (oconfig_item_t *ci) /* {{{ */ { - unsigned int i, o; - int pinba_port = 0; - char *pinba_address = NULL; - - INFO("Pinba Configure.."); - - service_statnode_begin(); + int i; - /* Set default values */ - config_set(&pinba_address, PINBA_DEFAULT_ADDRESS); - pinba_port = PINBA_DEFAULT_PORT; - - for (i = 0; i < ci->children_num; i++) { + /* The lock should not be necessary in the config callback, but let's be + * sure.. */ + pthread_mutex_lock (&stat_nodes_lock); + + for (i = 0; i < ci->children_num; i++) + { oconfig_item_t *child = ci->children + i; - if (strcasecmp ("Address", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING)){ - WARNING ("pinba plugin: `Address' needs exactly one string argument."); - return (-1); - } - config_set(&pinba_address, child->values[0].value.string); - } else if (strcasecmp ("Port", child->key) == 0) { - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_NUMBER)){ - WARNING ("pinba plugin: `Port' needs exactly one number argument."); - return (-1); - } - pinba_port=child->values[0].value.number; - } else if (strcasecmp ("View", child->key) == 0) { - const char *name=NULL, *host=NULL, *server=NULL, *script=NULL; - if ((child->values_num != 1) || (child->values[0].type != OCONFIG_TYPE_STRING) || strlen(child->values[0].value.string)==0){ - WARNING ("pinba plugin: `View' needs exactly one non-empty string argument."); - return (-1); - } - name = child->values[0].value.string; - for(o=0; ochildren_num; o++){ - oconfig_item_t *node = child->children + o; - if (strcasecmp ("Host", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Host' needs exactly one non-empty string argument."); - return (-1); - } - host = node->values[0].value.string; - } else if (strcasecmp ("Server", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Server' needs exactly one non-empty string argument."); - return (-1); - } - server = node->values[0].value.string; - } else if (strcasecmp ("Script", node->key) == 0) { - if ((node->values_num != 1) || (node->values[0].type != OCONFIG_TYPE_STRING) || strlen(node->values[0].value.string)==0){ - WARNING ("pinba plugin: `View->Script' needs exactly one non-empty string argument."); - return (-1); - } - script = node->values[0].value.string; - } else { - WARNING ("pinba plugin: In `' context allowed only `Host', `Server' and `Script' options but not the `%s'.", node->key); - return (-1); - } - } - /* add new statnode */ - service_statnode_add(name, host, server, script); - } else { - WARNING ("pinba plugin: In `' context allowed only `Address', `Port' and `Observe' options but not the `%s'.", child->key); - return (-1); - } + + if (strcasecmp ("Address", child->key) == 0) + cf_util_get_string (child, &conf_node); + else if (strcasecmp ("Port", child->key) == 0) + cf_util_get_service (child, &conf_service); + else if (strcasecmp ("View", child->key) == 0) + pinba_config_view (child); + else + WARNING ("pinba plugin: Unknown config option: %s", child->key); } + + pthread_mutex_unlock(&stat_nodes_lock); - service_statnode_end(); - - service_config(pinba_address, pinba_port); -} /* int pinba_config */ + return (0); +} /* }}} int pinba_config */ -static int plugin_init (void) +static int plugin_init (void) /* {{{ */ { - INFO("Pinba Starting.."); - service_start(); - return 0; -} + int status; + + if (stat_nodes == NULL) + { + /* Collect the "total" data by default. */ + service_statnode_add ("total", + /* host = */ NULL, + /* server = */ NULL, + /* script = */ NULL); + } + + if (collector_thread_running) + return (0); + + status = plugin_thread_create (&collector_thread_id, + /* attrs = */ NULL, + collector_thread, + /* args = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_create(3) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + collector_thread_running = 1; -static int plugin_shutdown (void) + return (0); +} /* }}} */ + +static int plugin_shutdown (void) /* {{{ */ { - INFO("Pinba Stopping.."); - service_stop(); - service_statnode_free(); - return 0; -} + if (collector_thread_running) + { + int status; + + DEBUG ("pinba plugin: Shutting down collector thread."); + collector_thread_do_shutdown = 1; + + status = pthread_join (collector_thread_id, /* retval = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_join(3) failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + } + + collector_thread_running = 0; + collector_thread_do_shutdown = 0; + } /* if (collector_thread_running) */ -static int -plugin_submit (const char *plugin_instance, - const char *type, - const pinba_statres *res) { - value_t values[6]; + return (0); +} /* }}} int plugin_shutdown */ + +static int plugin_submit (const pinba_statnode_t *res) /* {{{ */ +{ + value_t value; value_list_t vl = VALUE_LIST_INIT; - values[0].gauge = res->req_per_sec; - values[1].gauge = res->req_time; - values[2].gauge = res->ru_utime; - values[3].gauge = res->ru_stime; - values[4].gauge = res->doc_size; - values[5].gauge = res->mem_peak; - - vl.values = values; - vl.values_len = 6; + vl.values = &value; + vl.values_len = 1; sstrncpy (vl.host, hostname_g, sizeof (vl.host)); sstrncpy (vl.plugin, "pinba", sizeof (vl.plugin)); - sstrncpy (vl.plugin_instance, plugin_instance, - sizeof(vl.plugin_instance)); - sstrncpy (vl.type, type, sizeof (vl.type)); - INFO("Pinba Dispatch"); + sstrncpy (vl.plugin_instance, res->name, sizeof (vl.plugin_instance)); + + value.derive = res->req_count; + sstrncpy (vl.type, "total_requests", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->req_time, /* factor = */ 1000); + sstrncpy (vl.type, "total_time_in_ms", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = res->doc_size; + sstrncpy (vl.type, "total_bytes", sizeof (vl.type)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_utime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "user", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.derive = float_counter_get (&res->ru_stime, /* factor = */ 100); + sstrncpy (vl.type, "cpu", sizeof (vl.type)); + sstrncpy (vl.type_instance, "system", sizeof (vl.type_instance)); + plugin_dispatch_values (&vl); + + value.gauge = res->mem_peak; + sstrncpy (vl.type, "memory", sizeof (vl.type)); + sstrncpy (vl.type_instance, "peak", sizeof (vl.type_instance)); plugin_dispatch_values (&vl); return (0); -} +} /* }}} int plugin_submit */ -static int plugin_read (void) +static int plugin_read (void) /* {{{ */ { unsigned int i=0; - static pinba_statres res; + pinba_statnode_t data; - while ((i = service_statnode_collect (&res, i)) != 0) + while ((i = service_statnode_collect (&data, i)) != 0) { - plugin_submit(res.name, "pinba_view", &res); + plugin_submit (&data); } return 0; -} +} /* }}} int plugin_read */ -void module_register (void) +void module_register (void) /* {{{ */ { plugin_register_complex_config ("pinba", plugin_config); plugin_register_init ("pinba", plugin_init); plugin_register_read ("pinba", plugin_read); plugin_register_shutdown ("pinba", plugin_shutdown); -} /* void module_register */ +} /* }}} void module_register */ -/* vim: set sw=2 sts=2 et : */ +/* vim: set sw=2 sts=2 et fdm=marker : */