From: Florian Forster Date: Fri, 9 Apr 2010 14:21:23 +0000 (+0200) Subject: pinba plugin: Removed the dependency to libevent. X-Git-Tag: collectd-4.10.0~7^2~14 X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=bddede06dcc7a03bca49d4c3b003786ab301793c;p=collectd.git pinba plugin: Removed the dependency to libevent. The filedescriptors are now watched using poll(2). --- diff --git a/src/pinba.c b/src/pinba.c index f5eb69dc..c1bc108f 100644 --- a/src/pinba.c +++ b/src/pinba.c @@ -2,6 +2,7 @@ * collectd - src/pinba.c (based on code from pinba_engine 0.0.5) * Copyright (c) 2007-2009 Antony Dovgal * Copyright (C) 2010 Phoenix Kayo + * Copyright (C) 2010 Florian Forster * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -19,6 +20,7 @@ * Authors: * Antony Dovgal * Phoenix Kayo + * Florian Forster **/ #include "collectd.h" @@ -33,10 +35,6 @@ #include "pinba.pb-c.h" -typedef uint8_t u_char; - -#include - /* * Service declaration section */ @@ -75,9 +73,6 @@ struct _pinba_statres_ { }; struct pinba_socket_s { - int listen_sock; - struct event *accept_event; - struct pollfd fd[PINBA_MAX_SOCKETS]; nfds_t fd_num; }; @@ -86,12 +81,6 @@ typedef struct pinba_socket_s pinba_socket_t; typedef double pinba_time_t; typedef uint32_t pinba_size_t; -static struct event_base *temp_base = NULL; - -static pinba_socket_t *temp_sock = NULL; - -static pthread_t temp_thrd; - typedef struct pinba_statnode_s pinba_statnode_t; struct pinba_statnode_s { @@ -119,6 +108,10 @@ char service_status=0; char *service_address = PINBA_DEFAULT_ADDRESS; unsigned int service_port=PINBA_DEFAULT_PORT; +static _Bool collector_thread_running = 0; +static _Bool collector_thread_do_shutdown = 0; +static pthread_t collector_thread_id; + static pinba_time_t now (void) /* {{{ */ { static struct timeval tv; @@ -304,105 +297,25 @@ static void service_process_request (Pinba__Request *request) pthread_mutex_unlock(&stat_nodes_lock); } -static void *pinba_main (void *arg) +static int pb_del_socket (pinba_socket_t *s, /* {{{ */ + nfds_t index) { - DEBUG("entering listen-loop.."); - - service_status=1; - event_base_dispatch(temp_base); - - /* unreachable */ - return NULL; -} + if (index >= s->fd_num) + return (EINVAL); -static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */ -{ - if (!socket) - return; - - if (socket->listen_sock >= 0) - { - close(socket->listen_sock); - socket->listen_sock = -1; - } - - if (socket->accept_event) + close (s->fd[index].fd); + s->fd[index].fd = -1; + + /* When deleting the last element in the list, no memmove is necessary. */ + if (index < (s->fd_num - 1)) { - event_del(socket->accept_event); - free(socket->accept_event); - socket->accept_event = NULL; + memmove (&s->fd[index], &s->fd[index + 1], + sizeof (s->fd[0]) * (s->fd_num - (index + 1))); } - - free(socket); -} /* }}} void pinba_socket_free */ - -static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */ - size_t buffer_size) -{ - Pinba__Request *request; - - request = pinba__request__unpack (NULL, buffer_size, buffer); - - if (!request) - return (-1); - service_process_request(request); - pinba__request__free_unpacked (request, NULL); - + s->fd_num--; return (0); -} /* }}} int pinba_process_stats_packet */ - -static void pinba_udp_read_callback_fn (int sock, short event, void *arg) /* {{{ */ -{ - uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; - size_t buffer_size; - int status; - - if ((event & EV_READ) == 0) - return; - - while (42) - { - buffer_size = sizeof (buffer); - status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); - if (status < 0) - { - char errbuf[1024]; - - if ((errno == EINTR) -#ifdef EWOULDBLOCK - || (errno == EWOULDBLOCK) -#endif - || (errno == EAGAIN)) - { - continue; - } - - WARNING("pinba plugin: recvfrom(2) failed: %s", - sstrerror (errno, errbuf, sizeof (errbuf))); - return; - } - else if (status == 0) - { - DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero."); - return; - } - else /* if (status > 0) */ - { - assert (((size_t) status) < buffer_size); - buffer_size = (size_t) status; - buffer[buffer_size] = 0; - - status = pinba_process_stats_packet (buffer, buffer_size); - if (status != 0) - DEBUG("pinba plugin: Parsing packet failed."); - return; - } - - /* not reached */ - assert (23 == 42); - } /* while (42) */ -} /* }}} void pinba_udp_read_callback_fn */ +} /* }}} int pb_del_socket */ static int pb_add_socket (pinba_socket_t *s, /* {{{ */ const struct addrinfo *ai) @@ -513,81 +426,156 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */ return (s); } /* }}} pinba_socket_open */ -static int service_cleanup (void) +static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */ { - DEBUG("closing socket.."); - if(temp_sock){ - pthread_mutex_lock(&stat_nodes_lock); - pinba_socket_free(temp_sock); - pthread_mutex_unlock(&stat_nodes_lock); - } - - DEBUG("shutdowning event.."); - event_base_free(temp_base); - - DEBUG("shutting down.."); + nfds_t i; - return (0); -} - -static int service_start(void) -{ - DEBUG("starting up.."); - - DEBUG("initializing event.."); - temp_base = event_base_new(); - - DEBUG("opening socket.."); - - temp_sock = pinba_socket_open(service_address, service_port); - - if (!temp_sock) { - service_cleanup(); - return 1; - } + if (!socket) + return; - if (pthread_create(&temp_thrd, NULL, pinba_main, NULL)) { - service_cleanup(); - return 1; + for (i = 0; i < socket->fd_num; i++) + { + if (socket->fd[i].fd < 0) + continue; + close (socket->fd[i].fd); + socket->fd[i].fd = -1; } - return 0; -} + sfree(socket); +} /* }}} void pinba_socket_free */ -static int service_stop (void) +static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */ + size_t buffer_size) { - pthread_cancel(temp_thrd); - pthread_join(temp_thrd, NULL); - service_status=0; - DEBUG("terminating listen-loop.."); + Pinba__Request *request; - service_cleanup(); + request = pinba__request__unpack (NULL, buffer_size, buffer); - return 0; -} + if (!request) + return (-1); -static void service_config (const char *address, unsigned int port) /* {{{ */ + service_process_request(request); + pinba__request__free_unpacked (request, NULL); + + return (0); +} /* }}} int pinba_process_stats_packet */ + +static int pinba_udp_read_callback_fn (int sock) /* {{{ */ { - int need_restart = 0; + uint8_t buffer[PINBA_UDP_BUFFER_SIZE]; + size_t buffer_size; + int status; - if (address && service_address && (strcmp(service_address, address) != 0)) + while (42) { - strset (&service_address, address); - need_restart++; - } + buffer_size = sizeof (buffer); + status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0); + if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) +#ifdef EWOULDBLOCK + || (errno == EWOULDBLOCK) +#endif + || (errno == EAGAIN)) + { + continue; + } + + WARNING("pinba plugin: recvfrom(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + else if (status == 0) + { + DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero."); + return (-1); + } + else /* if (status > 0) */ + { + assert (((size_t) status) < buffer_size); + buffer_size = (size_t) status; + buffer[buffer_size] = 0; - if ((port > 0) && (port < 65536) && (service_port != port)) + status = pinba_process_stats_packet (buffer, buffer_size); + if (status != 0) + DEBUG("pinba plugin: Parsing packet failed."); + return (status); + } + } /* while (42) */ + + /* not reached */ + assert (23 == 42); + return (-1); +} /* }}} void pinba_udp_read_callback_fn */ + +static int receive_loop (void) /* {{{ */ +{ + pinba_socket_t *s; + + s = pinba_socket_open (service_address, service_port); + if (s == NULL) { - service_port=port; - need_restart++; + ERROR ("pinba plugin: Collector thread is exiting prematurely."); + return (-1); } - if(service_status && need_restart) + while (!collector_thread_do_shutdown) { - service_stop(); - service_start(); - } -} /* }}} void service_config */ + int status; + nfds_t i; + + if (s->fd_num < 1) + break; + + status = poll (s->fd, s->fd_num, /* timeout = */ 1000); + if (status == 0) /* timeout */ + { + continue; + } + else if (status < 0) + { + char errbuf[1024]; + + if ((errno == EINTR) || (errno == EAGAIN)) + continue; + + ERROR ("pinba plugin: poll(2) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + pinba_socket_free (s); + return (-1); + } + + for (i = 0; i < s->fd_num; i++) + { + if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL)) + { + pb_del_socket (s, i); + i--; + } + else if (s->fd[i].revents & (POLLIN | POLLPRI)) + { + pinba_udp_read_callback_fn (s->fd[i].fd); + } + } /* for (s->fd) */ + } /* while (!collector_thread_do_shutdown) */ + + pinba_socket_free (s); + s = NULL; + + return (0); +} /* }}} int receive_loop */ + +static void *collector_thread (void *arg) /* {{{ */ +{ + receive_loop (); + + memset (&collector_thread_id, 0, sizeof (collector_thread_id)); + collector_thread_running = 0; + pthread_exit (NULL); + return (NULL); +} /* }}} void *collector_thread */ /* * Plugin declaration section @@ -675,24 +663,55 @@ static int plugin_config (oconfig_item_t *ci) service_statnode_end(); - service_config(pinba_address, pinba_port); return (0); } /* int pinba_config */ -static int plugin_init (void) +static int plugin_init (void) /* {{{ */ { - INFO("Pinba Starting.."); - service_start(); - return 0; -} + int status; + + if (collector_thread_running) + return (0); + + status = pthread_create (&collector_thread_id, + /* attrs = */ NULL, + collector_thread, + /* args = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_create(3) failed: %s", + sstrerror (errno, errbuf, sizeof (errbuf))); + return (-1); + } + collector_thread_running = 1; + + return (0); +} /* }}} */ -static int plugin_shutdown (void) +static int plugin_shutdown (void) /* {{{ */ { - INFO("Pinba Stopping.."); - service_stop(); - service_statnode_free(); - return 0; -} + if (collector_thread_running) + { + int status; + + DEBUG ("pinba plugin: Shutting down collector thread."); + collector_thread_do_shutdown = 1; + + status = pthread_join (collector_thread_id, /* retval = */ NULL); + if (status != 0) + { + char errbuf[1024]; + ERROR ("pinba plugin: pthread_join(3) failed: %s", + sstrerror (status, errbuf, sizeof (errbuf))); + } + + collector_thread_running = 0; + collector_thread_do_shutdown = 0; + } /* if (collector_thread_running) */ + + return (0); +} /* }}} int plugin_shutdown */ static int plugin_submit (const char *plugin_instance, const char *type,