Code

pinba plugin: Removed the dependency to libevent.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 9 Apr 2010 14:21:23 +0000 (16:21 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Fri, 9 Apr 2010 14:21:23 +0000 (16:21 +0200)
The filedescriptors are now watched using poll(2).

src/pinba.c

index f5eb69dcfc9e532d2a3a919b5b224450b20fa107..c1bc108f97c319d3f6e3663bc037925587d01b42 100644 (file)
@@ -2,6 +2,7 @@
  * collectd - src/pinba.c (based on code from pinba_engine 0.0.5)
  * Copyright (c) 2007-2009  Antony Dovgal
  * Copyright (C) 2010       Phoenix Kayo
+ * Copyright (C) 2010       Florian Forster
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License as published by the
@@ -19,6 +20,7 @@
  * Authors:
  *   Antony Dovgal <tony at daylessday.org>
  *   Phoenix Kayo <kayo.k11.4 at gmail.com>
+ *   Florian Forster <octo at verplant.org>
  **/
 
 #include "collectd.h"
 
 #include "pinba.pb-c.h"
 
-typedef uint8_t u_char;
-
-#include <event.h>
-
 /*
  *  Service declaration section
  */
@@ -75,9 +73,6 @@ struct _pinba_statres_ {
 };
 
 struct pinba_socket_s {
-  int listen_sock;
-  struct event *accept_event;
-
   struct pollfd fd[PINBA_MAX_SOCKETS];
   nfds_t fd_num;
 };
@@ -86,12 +81,6 @@ typedef struct pinba_socket_s pinba_socket_t;
 typedef double pinba_time_t;
 typedef uint32_t pinba_size_t;
 
-static struct event_base *temp_base = NULL;
-
-static pinba_socket_t *temp_sock = NULL;
-
-static pthread_t temp_thrd;
-
 typedef struct pinba_statnode_s pinba_statnode_t;
 struct pinba_statnode_s
 {
@@ -119,6 +108,10 @@ char service_status=0;
 char *service_address = PINBA_DEFAULT_ADDRESS;
 unsigned int service_port=PINBA_DEFAULT_PORT;
 
+static _Bool collector_thread_running = 0;
+static _Bool collector_thread_do_shutdown = 0;
+static pthread_t collector_thread_id;
+
 static pinba_time_t now (void) /* {{{ */
 {
   static struct timeval tv;
@@ -304,105 +297,25 @@ static void service_process_request (Pinba__Request *request)
   pthread_mutex_unlock(&stat_nodes_lock);
 }
 
-static void *pinba_main (void *arg)
+static int pb_del_socket (pinba_socket_t *s, /* {{{ */
+    nfds_t index)
 {
-  DEBUG("entering listen-loop..");
-  
-  service_status=1;
-  event_base_dispatch(temp_base);
-  
-  /* unreachable */
-  return NULL;
-}
+  if (index >= s->fd_num)
+    return (EINVAL);
 
-static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */
-{
-  if (!socket)
-    return;
-  
-  if (socket->listen_sock >= 0)
-  {
-    close(socket->listen_sock);
-    socket->listen_sock = -1;
-  }
-  
-  if (socket->accept_event)
+  close (s->fd[index].fd);
+  s->fd[index].fd = -1;
+
+  /* When deleting the last element in the list, no memmove is necessary. */
+  if (index < (s->fd_num - 1))
   {
-    event_del(socket->accept_event);
-    free(socket->accept_event);
-    socket->accept_event = NULL;
+    memmove (&s->fd[index], &s->fd[index + 1],
+        sizeof (s->fd[0]) * (s->fd_num - (index + 1)));
   }
-  
-  free(socket);
-} /* }}} void pinba_socket_free */
-
-static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */
-    size_t buffer_size)
-{
-  Pinba__Request *request;  
-  
-  request = pinba__request__unpack (NULL, buffer_size, buffer);
-  
-  if (!request)
-    return (-1);
 
-  service_process_request(request);
-  pinba__request__free_unpacked (request, NULL);
-    
+  s->fd_num--;
   return (0);
-} /* }}} int pinba_process_stats_packet */
-
-static void pinba_udp_read_callback_fn (int sock, short event, void *arg) /* {{{ */
-{
-  uint8_t buffer[PINBA_UDP_BUFFER_SIZE];
-  size_t buffer_size;
-  int status;
-
-  if ((event & EV_READ) == 0)
-    return;
-
-  while (42)
-  {
-    buffer_size = sizeof (buffer);
-    status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0);
-    if (status < 0)
-    {
-      char errbuf[1024];
-
-      if ((errno == EINTR)
-#ifdef EWOULDBLOCK
-          || (errno == EWOULDBLOCK)
-#endif
-          || (errno == EAGAIN))
-      {
-        continue;
-      }
-
-      WARNING("pinba plugin: recvfrom(2) failed: %s",
-          sstrerror (errno, errbuf, sizeof (errbuf)));
-      return;
-    }
-    else if (status == 0)
-    {
-      DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero.");
-      return;
-    }
-    else /* if (status > 0) */
-    {
-      assert (((size_t) status) < buffer_size);
-      buffer_size = (size_t) status;
-      buffer[buffer_size] = 0;
-
-      status = pinba_process_stats_packet (buffer, buffer_size);
-      if (status != 0)
-        DEBUG("pinba plugin: Parsing packet failed.");
-      return;
-    }
-
-    /* not reached */
-    assert (23 == 42);
-  } /* while (42) */
-} /* }}} void pinba_udp_read_callback_fn */
+} /* }}} int pb_del_socket */
 
 static int pb_add_socket (pinba_socket_t *s, /* {{{ */
     const struct addrinfo *ai)
@@ -513,81 +426,156 @@ static pinba_socket_t *pinba_socket_open (const char *node, /* {{{ */
   return (s);
 } /* }}} pinba_socket_open */
 
-static int service_cleanup (void)
+static void pinba_socket_free (pinba_socket_t *socket) /* {{{ */
 {
-  DEBUG("closing socket..");
-  if(temp_sock){
-    pthread_mutex_lock(&stat_nodes_lock);
-    pinba_socket_free(temp_sock);
-    pthread_mutex_unlock(&stat_nodes_lock);
-  }
-  
-  DEBUG("shutdowning event..");
-  event_base_free(temp_base);
-  
-  DEBUG("shutting down..");
+  nfds_t i;
 
-  return (0);
-}
-
-static int service_start(void)
-{
-  DEBUG("starting up..");
-  
-  DEBUG("initializing event..");
-  temp_base = event_base_new();
-  
-  DEBUG("opening socket..");
-  
-  temp_sock = pinba_socket_open(service_address, service_port);
-  
-  if (!temp_sock) {
-    service_cleanup();
-    return 1;
-  }
+  if (!socket)
+    return;
   
-  if (pthread_create(&temp_thrd, NULL, pinba_main, NULL)) {
-    service_cleanup();
-    return 1;
+  for (i = 0; i < socket->fd_num; i++)
+  {
+    if (socket->fd[i].fd < 0)
+      continue;
+    close (socket->fd[i].fd);
+    socket->fd[i].fd = -1;
   }
   
-  return 0;
-}
+  sfree(socket);
+} /* }}} void pinba_socket_free */
 
-static int service_stop (void)
+static int pinba_process_stats_packet (const uint8_t *buffer, /* {{{ */
+    size_t buffer_size)
 {
-  pthread_cancel(temp_thrd);
-  pthread_join(temp_thrd, NULL);
-  service_status=0;
-  DEBUG("terminating listen-loop..");
+  Pinba__Request *request;  
   
-  service_cleanup();
+  request = pinba__request__unpack (NULL, buffer_size, buffer);
   
-  return 0;
-}
+  if (!request)
+    return (-1);
 
-static void service_config (const char *address, unsigned int port) /* {{{ */
+  service_process_request(request);
+  pinba__request__free_unpacked (request, NULL);
+    
+  return (0);
+} /* }}} int pinba_process_stats_packet */
+
+static int pinba_udp_read_callback_fn (int sock) /* {{{ */
 {
-  int need_restart = 0;
+  uint8_t buffer[PINBA_UDP_BUFFER_SIZE];
+  size_t buffer_size;
+  int status;
 
-  if (address && service_address && (strcmp(service_address, address) != 0))
+  while (42)
   {
-    strset (&service_address, address);
-    need_restart++;
-  }
+    buffer_size = sizeof (buffer);
+    status = recvfrom (sock, buffer, buffer_size - 1, MSG_DONTWAIT, /* from = */ NULL, /* from len = */ 0);
+    if (status < 0)
+    {
+      char errbuf[1024];
+
+      if ((errno == EINTR)
+#ifdef EWOULDBLOCK
+          || (errno == EWOULDBLOCK)
+#endif
+          || (errno == EAGAIN))
+      {
+        continue;
+      }
+
+      WARNING("pinba plugin: recvfrom(2) failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      return (-1);
+    }
+    else if (status == 0)
+    {
+      DEBUG ("pinba plugin: recvfrom(2) returned unexpected status zero.");
+      return (-1);
+    }
+    else /* if (status > 0) */
+    {
+      assert (((size_t) status) < buffer_size);
+      buffer_size = (size_t) status;
+      buffer[buffer_size] = 0;
 
-  if ((port > 0) && (port < 65536) && (service_port != port))
+      status = pinba_process_stats_packet (buffer, buffer_size);
+      if (status != 0)
+        DEBUG("pinba plugin: Parsing packet failed.");
+      return (status);
+    }
+  } /* while (42) */
+
+  /* not reached */
+  assert (23 == 42);
+  return (-1);
+} /* }}} void pinba_udp_read_callback_fn */
+
+static int receive_loop (void) /* {{{ */
+{
+  pinba_socket_t *s;
+
+  s = pinba_socket_open (service_address, service_port);
+  if (s == NULL)
   {
-    service_port=port;
-    need_restart++;
+    ERROR ("pinba plugin: Collector thread is exiting prematurely.");
+    return (-1);
   }
 
-  if(service_status && need_restart)
+  while (!collector_thread_do_shutdown)
   {
-    service_stop();
-    service_start();
-  }
-} /* }}} void service_config */
+    int status;
+    nfds_t i;
+
+    if (s->fd_num < 1)
+      break;
+
+    status = poll (s->fd, s->fd_num, /* timeout = */ 1000);
+    if (status == 0) /* timeout */
+    {
+      continue;
+    }
+    else if (status < 0)
+    {
+      char errbuf[1024];
+
+      if ((errno == EINTR) || (errno == EAGAIN))
+        continue;
+
+      ERROR ("pinba plugin: poll(2) failed: %s",
+          sstrerror (errno, errbuf, sizeof (errbuf)));
+      pinba_socket_free (s);
+      return (-1);
+    }
+
+    for (i = 0; i < s->fd_num; i++)
+    {
+      if (s->fd[i].revents & (POLLERR | POLLHUP | POLLNVAL))
+      {
+        pb_del_socket (s, i);
+        i--;
+      }
+      else if (s->fd[i].revents & (POLLIN | POLLPRI))
+      {
+        pinba_udp_read_callback_fn (s->fd[i].fd);
+      }
+    } /* for (s->fd) */
+  } /* while (!collector_thread_do_shutdown) */
+
+  pinba_socket_free (s);
+  s = NULL;
+
+  return (0);
+} /* }}} int receive_loop */
+
+static void *collector_thread (void *arg) /* {{{ */
+{
+  receive_loop ();
+
+  memset (&collector_thread_id, 0, sizeof (collector_thread_id));
+  collector_thread_running = 0;
+  pthread_exit (NULL);
+  return (NULL);
+} /* }}} void *collector_thread */
 
 /*
  * Plugin declaration section
@@ -675,24 +663,55 @@ static int plugin_config (oconfig_item_t *ci)
   
   service_statnode_end();
   
-  service_config(pinba_address, pinba_port);
   return (0);
 } /* int pinba_config */
 
-static int plugin_init (void)
+static int plugin_init (void) /* {{{ */
 {
-  INFO("Pinba Starting..");
-  service_start();
-  return 0;
-}
+  int status;
+
+  if (collector_thread_running)
+    return (0);
+
+  status = pthread_create (&collector_thread_id,
+      /* attrs = */ NULL,
+      collector_thread,
+      /* args = */ NULL);
+  if (status != 0)
+  {
+    char errbuf[1024];
+    ERROR ("pinba plugin: pthread_create(3) failed: %s",
+        sstrerror (errno, errbuf, sizeof (errbuf)));
+    return (-1);
+  }
+  collector_thread_running = 1;
+
+  return (0);
+} /* }}} */
 
-static int plugin_shutdown (void)
+static int plugin_shutdown (void) /* {{{ */
 {
-  INFO("Pinba Stopping..");
-  service_stop();
-  service_statnode_free();
-  return 0;
-}
+  if (collector_thread_running)
+  {
+    int status;
+
+    DEBUG ("pinba plugin: Shutting down collector thread.");
+    collector_thread_do_shutdown = 1;
+
+    status = pthread_join (collector_thread_id, /* retval = */ NULL);
+    if (status != 0)
+    {
+      char errbuf[1024];
+      ERROR ("pinba plugin: pthread_join(3) failed: %s",
+          sstrerror (status, errbuf, sizeof (errbuf)));
+    }
+
+    collector_thread_running = 0;
+    collector_thread_do_shutdown = 0;
+  } /* if (collector_thread_running) */
+
+  return (0);
+} /* }}} int plugin_shutdown */
 
 static int plugin_submit (const char *plugin_instance,
               const char *type,