Code

snmp plugin: Use several reading threads to parallelize the reading of SNMP hosts.
authorFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 23 Jun 2007 17:45:27 +0000 (19:45 +0200)
committerFlorian Forster <octo@leeloo.lan.home.verplant.org>
Sat, 23 Jun 2007 17:45:27 +0000 (19:45 +0200)
src/Makefile.am
src/snmp.c

index 2459bcf1678325806956acb41ead998566935e04..d92ef96edac56e26969c601522bd60f56d228cd3 100644 (file)
@@ -479,6 +479,9 @@ snmp_la_LDFLAGS = -module -avoid-version
 if BUILD_WITH_LIBNETSNMP
 snmp_la_LDFLAGS += -lnetsnmp
 endif
+if BUILD_WITH_LIBPTHREAD
+snmp_la_LDFLAGS += -lpthread
+endif
 collectd_LDADD += "-dlopen" snmp.la
 collectd_DEPENDENCIES += snmp.la
 endif
index ea9bd43ea52f78891c9e1e70ad55333d88856c8a..2c4c930cb652dce0c5f3a7642a12b5f347a561d3 100644 (file)
@@ -23,6 +23,8 @@
 #include "common.h"
 #include "plugin.h"
 
+#include <pthread.h>
+
 #include <net-snmp/net-snmp-config.h>
 #include <net-snmp/net-snmp-includes.h>
 
@@ -66,6 +68,12 @@ struct host_definition_s
   int16_t skip_left;
   data_definition_t **data_list;
   int data_list_len;
+  enum          /****************************************************/
+  {             /* This host..                                      */
+    STATE_IDLE, /* - just sits there until `skip_left < interval_g' */
+    STATE_WAIT, /* - waits to be queried.                           */
+    STATE_BUSY  /* - is currently being queried.                    */
+  } state;      /****************************************************/
   struct host_definition_s *next;
 };
 typedef struct host_definition_s host_definition_t;
@@ -91,9 +99,17 @@ typedef struct csnmp_table_values_s csnmp_table_values_t;
 /*
  * Private variables
  */
+static int do_shutdown = 0;
+
+pthread_t *threads = NULL;
+int threads_num = 0;
+
 static data_definition_t *data_head = NULL;
 static host_definition_t *host_head = NULL;
 
+static pthread_mutex_t host_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  host_cond = PTHREAD_COND_INITIALIZER;
+
 /*
  * Private functions
  */
@@ -485,6 +501,7 @@ static int csnmp_config_add_host (oconfig_item_t *ci)
   hd->sess_handle = NULL;
   hd->skip_num = 0;
   hd->skip_left = 0;
+  hd->state = STATE_IDLE;
 
   for (i = 0; i < ci->children_num; i++)
   {
@@ -628,34 +645,6 @@ static void csnmp_host_open_session (host_definition_t *host)
   }
 } /* void csnmp_host_open_session */
 
-static int csnmp_init (void)
-{
-  host_definition_t *host;
-
-  call_snmp_init_once ();
-
-  for (host = host_head; host != NULL; host = host->next)
-  {
-    /* We need to initialize `skip_num' here, because `interval_g' isn't
-     * initialized during `configure'. */
-    host->skip_left = interval_g;
-    if (host->skip_num == 0)
-    {
-      host->skip_num = interval_g;
-    }
-    else if (host->skip_num < interval_g)
-    {
-      host->skip_num = interval_g;
-      WARNING ("snmp plugin: Data for host `%s' will be collected every %i seconds.",
-         host->name, host->skip_num);
-    }
-
-    csnmp_host_open_session (host);
-  } /* for (host) */
-
-  return (0);
-} /* int csnmp_init */
-
 static value_t csnmp_value_list_to_value (struct variable_list *vl, int type)
 {
   value_t ret;
@@ -1144,6 +1133,85 @@ static int csnmp_read_host (host_definition_t *host)
   return (0);
 } /* int csnmp_read_host */
 
+static void *csnmp_read_thread (void *data)
+{
+  host_definition_t *host;
+
+  pthread_mutex_lock (&host_lock);
+  while (do_shutdown == 0)
+  {
+    pthread_cond_wait (&host_cond, &host_lock);
+
+    for (host = host_head; host != NULL; host = host->next)
+    {
+      if (do_shutdown != 0)
+       break;
+      if (host->state != STATE_WAIT)
+       continue;
+
+      host->state = STATE_BUSY;
+      pthread_mutex_unlock (&host_lock);
+      csnmp_read_host (host);
+      pthread_mutex_lock (&host_lock);
+      host->state = STATE_IDLE;
+    } /* for (host) */
+  } /* while (do_shutdown == 0) */
+  pthread_mutex_unlock (&host_lock);
+
+  pthread_exit ((void *) 0);
+  return ((void *) 0);
+} /* void *csnmp_read_thread */
+
+static int csnmp_init (void)
+{
+  host_definition_t *host;
+  int i;
+
+  if (host_head == NULL)
+    return (-1);
+
+  call_snmp_init_once ();
+
+  threads_num = 0;
+  for (host = host_head; host != NULL; host = host->next)
+  {
+    threads_num++;
+    /* We need to initialize `skip_num' here, because `interval_g' isn't
+     * initialized during `configure'. */
+    host->skip_left = interval_g;
+    if (host->skip_num == 0)
+    {
+      host->skip_num = interval_g;
+    }
+    else if (host->skip_num < interval_g)
+    {
+      host->skip_num = interval_g;
+      WARNING ("snmp plugin: Data for host `%s' will be collected every %i seconds.",
+         host->name, host->skip_num);
+    }
+
+    csnmp_host_open_session (host);
+  } /* for (host) */
+
+  /* Now start the reading threads */
+  if (threads_num > 3)
+  {
+    threads_num = 3 + ((threads_num - 3) / 10);
+    if (threads_num > 10)
+      threads_num = 10;
+  }
+
+  threads = (pthread_t *) malloc (threads_num * sizeof (pthread_t));
+  if (threads == NULL)
+    return (-1);
+  memset (threads, '\0', threads_num * sizeof (pthread_t));
+
+  for (i = 0; i < threads_num; i++)
+      pthread_create (threads + i, NULL, csnmp_read_thread, (void *) 0);
+
+  return (0);
+} /* int csnmp_init */
+
 static int csnmp_read (void)
 {
   host_definition_t *host;
@@ -1157,25 +1225,88 @@ static int csnmp_read (void)
 
   now = time (NULL);
 
+  pthread_mutex_lock (&host_lock);
   for (host = host_head; host != NULL; host = host->next)
   {
+    if (host->state != STATE_IDLE)
+      continue;
+
     host->skip_left -= interval_g;
     if (host->skip_left >= interval_g)
       continue;
 
-    csnmp_read_host (host);
+    host->state = STATE_WAIT;
 
     host->skip_left = host->skip_num;
   } /* for (host) */
 
+  pthread_cond_broadcast (&host_cond);
+  pthread_mutex_unlock (&host_lock);
+
   return (0);
 } /* int csnmp_read */
 
+static int csnmp_shutdown (void)
+{
+  host_definition_t *host_this;
+  host_definition_t *host_next;
+
+  data_definition_t *data_this;
+  data_definition_t *data_next;
+
+  int i;
+
+  pthread_mutex_lock (&host_lock);
+  do_shutdown = 1;
+  pthread_cond_broadcast (&host_cond);
+  pthread_mutex_unlock (&host_lock);
+
+  for (i = 0; i < threads_num; i++)
+    pthread_join (threads[i], NULL);
+
+  /* Now that all the threads have exited, let's free all the global variables.
+   * This isn't really neccessary, I guess, but I think it's good stile to do
+   * so anyway. */
+  host_this = host_head;
+  host_head = NULL;
+  while (host_this != NULL)
+  {
+    host_next = host_this->next;
+
+    csnmp_host_close_session (host_this);
+
+    sfree (host_this->name);
+    sfree (host_this->address);
+    sfree (host_this->community);
+    sfree (host_this->data_list);
+    sfree (host_this);
+
+    host_this = host_next;
+  }
+
+  data_this = data_head;
+  data_head = NULL;
+  while (data_this != NULL)
+  {
+    data_next = data_this->next;
+
+    sfree (data_this->name);
+    sfree (data_this->type);
+    sfree (data_this->values);
+    sfree (data_this);
+
+    data_this = data_next;
+  }
+
+  return (0);
+} /* int csnmp_shutdown */
+
 void module_register (void)
 {
   plugin_register_complex_config ("snmp", csnmp_config);
   plugin_register_init ("snmp", csnmp_init);
   plugin_register_read ("snmp", csnmp_read);
+  plugin_register_shutdown ("snmp", csnmp_shutdown);
 } /* void module_register */
 
 /*