Code

rrdtool plugin: Implemented the `WritesPerSecond' option.
authorFlorian Forster <octo@noris.net>
Fri, 22 Aug 2008 14:12:41 +0000 (16:12 +0200)
committerFlorian Forster <octo@noris.net>
Fri, 22 Aug 2008 14:12:41 +0000 (16:12 +0200)
This option lets you slow down the `queue thread' within the rrdtool
plugin, so that the system stays responsive while writing all values
to disk. When FLUSH'ing values and during shutdown this limit is not
in effect.

src/collectd.conf.pod
src/rrdtool.c

index d9a4b04c0625ca014c1ced9afa73846fd643182c..ea508fbba0ff3eab0c1b3d89898140ef2b2d3fac 100644 (file)
@@ -1449,7 +1449,7 @@ Set the "XFiles Factor". The default is 0.1. If unsure, don't set this option.
 
 =item B<CacheFlush> I<Seconds>
 
-When the C<rrdtool plugin> uses a cache (by setting B<CacheTimeout>, see below)
+When the C<rrdtool> plugin uses a cache (by setting B<CacheTimeout>, see below)
 it writes all values for a certain RRD-file if the oldest value is older than
 (or equal to) the number of seconds specified. If some RRD-file is not updated
 anymore for some reason (the computer was shut down, the network is broken,
@@ -1468,6 +1468,30 @@ reduces IO-operations and thus lessens the load produced by updating the files.
 The trade off is that the graphs kind of "drag behind" and that more memory is
 used.
 
+=item B<WritesPerSecond> B<Updates>
+
+When collecting many statistics with collectd and the C<rrdtool> plugin, you
+will run serious performance problems. The B<CacheFlush> setting and the
+internal update queue assert that collectd continues to work just fine even
+under heavy load, but the system may become very unresponsive and slow. This is
+a problem especially if create graphs from the RRD files on the same machine,
+for example using the C<graph.cgi> script included in the
+C<contrib/collection3/> directory.
+
+This setting is designed for very large setups. Setting this option to a value
+between 25 and 80 updates per second, depending on your hardware, will leave
+the server responsive enough to draw graphs even while all the cached values
+are written to disk. Flushed values, i.E<nbsp>e. values that are forced to disk
+by the B<FLUSH> command, are B<not> effected by this limit. They are still
+written as fast as possible, so that web frontends have up to date data when
+generating graphs.
+
+For example: If you have 100,000 RRD files and set B<WritesPerSecond> to 30
+updates per second, writing all values to disk will take approximately
+56E<nbsp>minutes. Together with the flushing ability that's integrated into
+"collection3" you'll end up with a responsive and fast system, up to date
+graphs and basically a "backup" of your values every hour.
+
 =back
 
 =head2 Plugin C<sensors>
index 7cf243671254cf261b77fe3e79d21885f321384e..2e1727a888e7d38bc9aca799f70f212ea617dbd2 100644 (file)
@@ -42,7 +42,8 @@ struct rrd_cache_s
        enum
        {
                FLAG_NONE   = 0x00,
-               FLAG_QUEUED = 0x01
+               FLAG_QUEUED = 0x01,
+               FLAG_FLUSHQ = 0x02
        } flags;
 };
 typedef struct rrd_cache_s rrd_cache_t;
@@ -94,7 +95,8 @@ static const char *config_keys[] =
        "HeartBeat",
        "RRARows",
        "RRATimespan",
-       "XFF"
+       "XFF",
+       "WritesPerSecond"
 };
 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
 
@@ -106,6 +108,7 @@ static int     stepsize  = 0;
 static int     heartbeat = 0;
 static int     rrarows   = 1200;
 static double  xff       = 0.1;
+static double  write_rate = 0.0;
 
 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
  * ALWAYS lock `cache_lock' first! */
@@ -117,6 +120,8 @@ static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static rrd_queue_t    *queue_head = NULL;
 static rrd_queue_t    *queue_tail = NULL;
+static rrd_queue_t    *flushq_head = NULL;
+static rrd_queue_t    *flushq_tail = NULL;
 static pthread_t       queue_thread = 0;
 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
@@ -605,6 +610,11 @@ static int value_list_to_filename (char *buffer, int buffer_len,
 
 static void *rrd_queue_thread (void *data)
 {
+        struct timeval tv_next_update;
+        struct timeval tv_now;
+
+        gettimeofday (&tv_next_update, /* timezone = */ NULL);
+
        while (42)
        {
                rrd_queue_t *queue_entry;
@@ -613,27 +623,79 @@ static void *rrd_queue_thread (void *data)
                int    values_num;
                int    i;
 
-               /* XXX: If you need to lock both, cache_lock and queue_lock, at
-                * the same time, ALWAYS lock `cache_lock' first! */
-
-               /* wait until an entry is available */
-               pthread_mutex_lock (&queue_lock);
-               while ((queue_head == NULL) && (do_shutdown == 0))
-                       pthread_cond_wait (&queue_cond, &queue_lock);
-
-               /* We're in the shutdown phase */
-               if (queue_head == NULL)
-               {
-                       pthread_mutex_unlock (&queue_lock);
-                       break;
-               }
-
-               /* Dequeue the first entry */
-               queue_entry = queue_head;
-               if (queue_head == queue_tail)
-                       queue_head = queue_tail = NULL;
-               else
-                       queue_head = queue_head->next;
+                pthread_mutex_lock (&queue_lock);
+                /* Wait for values to arrive */
+                while (true)
+                {
+                  struct timespec ts_wait;
+                  int status;
+
+                  while ((flushq_head == NULL) && (queue_head == NULL)
+                      && (do_shutdown == 0))
+                    pthread_cond_wait (&queue_cond, &queue_lock);
+
+                  if ((flushq_head == NULL) && (queue_head == NULL))
+                    break;
+
+                  /* Don't delay if there's something to flush */
+                  if (flushq_head != NULL)
+                    break;
+
+                  /* Don't delay if we're shutting down */
+                  if (do_shutdown != 0)
+                    break;
+
+                  /* Don't delay if no delay was configured. */
+                  if (write_rate <= 0.0)
+                    break;
+
+                  gettimeofday (&tv_now, /* timezone = */ NULL);
+                  status = timeval_sub_timespec (&tv_next_update, &tv_now,
+                      &ts_wait);
+                  /* We're good to go */
+                  if (status != 0)
+                    break;
+
+                  /* We're supposed to wait a bit with this update, so we'll
+                   * wait for the next addition to the queue or to the end of
+                   * the wait period - whichever comes first. */
+                  ts_wait.tv_sec = tv_next_update.tv_sec;
+                  ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
+
+                  status = pthread_cond_timedwait (&queue_cond, &queue_lock,
+                      &ts_wait);
+                  if (status == ETIMEDOUT)
+                    break;
+                } /* while (true) */
+
+                /* XXX: If you need to lock both, cache_lock and queue_lock, at
+                 * the same time, ALWAYS lock `cache_lock' first! */
+
+                /* We're in the shutdown phase */
+                if ((flushq_head == NULL) && (queue_head == NULL))
+                {
+                  pthread_mutex_unlock (&queue_lock);
+                  break;
+                }
+
+                if (flushq_head != NULL)
+                {
+                  /* Dequeue the first flush entry */
+                  queue_entry = flushq_head;
+                  if (flushq_head == flushq_tail)
+                    flushq_head = flushq_tail = NULL;
+                  else
+                    flushq_head = flushq_head->next;
+                }
+                else /* if (queue_head != NULL) */
+                {
+                  /* Dequeue the first regular entry */
+                  queue_entry = queue_head;
+                  if (queue_head == queue_tail)
+                    queue_head = queue_tail = NULL;
+                  else
+                    queue_head = queue_head->next;
+                }
 
                /* Unlock the queue again */
                pthread_mutex_unlock (&queue_lock);
@@ -653,6 +715,20 @@ static void *rrd_queue_thread (void *data)
 
                pthread_mutex_unlock (&cache_lock);
 
+               /* Update `tv_next_update' */
+               if (write_rate > 0.0) 
+                {
+                  gettimeofday (&tv_now, /* timezone = */ NULL);
+                  tv_next_update.tv_sec = tv_now.tv_sec;
+                  tv_next_update.tv_usec = tv_now.tv_usec
+                    + ((suseconds_t) (1000000 * write_rate));
+                  while (tv_next_update.tv_usec > 1000000)
+                  {
+                    tv_next_update.tv_sec++;
+                    tv_next_update.tv_usec -= 1000000;
+                  }
+                }
+
                /* Write the values to the RRD-file */
                srrd_update (queue_entry->filename, NULL,
                                values_num, (const char **)values);
@@ -677,7 +753,8 @@ static void *rrd_queue_thread (void *data)
        return ((void *) 0);
 } /* void *rrd_queue_thread */
 
-static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir)
+static int rrd_queue_enqueue (const char *filename,
+    rrd_queue_t **head, rrd_queue_t **tail)
 {
   rrd_queue_t *queue_entry;
 
@@ -695,55 +772,60 @@ static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir)
   queue_entry->next = NULL;
 
   pthread_mutex_lock (&queue_lock);
-  if (dir == QUEUE_INSERT_FRONT)
-  {
-    queue_entry->next = queue_head;
-    queue_head = queue_entry;
-    if (queue_tail == NULL)
-      queue_tail = queue_head;
-  }
-  else /* (dir == QUEUE_INSERT_BACK) */
-  {
-    if (queue_tail == NULL)
-      queue_head = queue_entry;
-    else
-      queue_tail->next = queue_entry;
-    queue_tail = queue_entry;
-  }
+
+  if (*tail == NULL)
+    *head = queue_entry;
+  else
+    (*tail)->next = queue_entry;
+  *tail = queue_entry;
+
   pthread_cond_signal (&queue_cond);
   pthread_mutex_unlock (&queue_lock);
 
-  DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
-
   return (0);
-} /* int rrd_queue_cache_entry */
+} /* int rrd_queue_enqueue */
 
-static int rrd_queue_move_to_front (const char *filename)
+static int rrd_queue_dequeue (const char *filename,
+    rrd_queue_t **head, rrd_queue_t **tail)
 {
   rrd_queue_t *this;
   rrd_queue_t *prev;
 
-  this = NULL;
-  prev = NULL;
   pthread_mutex_lock (&queue_lock);
-  for (this = queue_head; this != NULL; this = this->next)
+
+  prev = NULL;
+  this = *head;
+
+  while (this != NULL)
   {
     if (strcmp (this->filename, filename) == 0)
       break;
+    
     prev = this;
+    this = this->next;
   }
 
-  /* Check if we found the entry and if it is NOT the first entry. */
-  if ((this != NULL) && (prev != NULL))
+  if (this == NULL)
   {
-    prev->next = this->next;
-    this->next = queue_head;
-    queue_head = this;
+    pthread_mutex_unlock (&queue_lock);
+    return (-1);
   }
+
+  if (prev == NULL)
+    *head = this->next;
+  else
+    prev->next = this->next;
+
+  if (this->next == NULL)
+    *tail = prev;
+
   pthread_mutex_unlock (&queue_lock);
 
+  sfree (this->filename);
+  sfree (this);
+
   return (0);
-} /* int rrd_queue_move_to_front */
+} /* int rrd_queue_dequeue */
 
 static void rrd_cache_flush (int timeout)
 {
@@ -765,13 +847,16 @@ static void rrd_cache_flush (int timeout)
        iter = c_avl_get_iterator (cache);
        while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
        {
-               if (rc->flags == FLAG_QUEUED)
+               if (rc->flags != FLAG_NONE)
                        continue;
                else if ((now - rc->first_value) < timeout)
                        continue;
                else if (rc->values_num > 0)
                {
-                       if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0)
+                       int status;
+
+                       status = rrd_queue_enqueue (key, &queue_head,  &queue_tail);
+                       if (status == 0)
                                rc->flags = FLAG_QUEUED;
                }
                else /* ancient and no values -> waste of memory */
@@ -833,31 +918,42 @@ static int rrd_cache_flush_identifier (int timeout, const char *identifier)
   now = time (NULL);
 
   if (datadir == NULL)
-         snprintf (key, sizeof (key), "%s.rrd",
-                         identifier);
+    snprintf (key, sizeof (key), "%s.rrd",
+        identifier);
   else
-         snprintf (key, sizeof (key), "%s/%s.rrd",
-                         datadir, identifier);
+    snprintf (key, sizeof (key), "%s/%s.rrd",
+        datadir, identifier);
   key[sizeof (key) - 1] = 0;
 
   status = c_avl_get (cache, key, (void *) &rc);
   if (status != 0)
   {
     WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
-       "c_avl_get (%s) failed. Does that file really exist?",
-       key);
+        "c_avl_get (%s) failed. Does that file really exist?",
+        key);
     return (status);
   }
 
-  if (rc->flags == FLAG_QUEUED)
-    status = rrd_queue_move_to_front (key);
+  if (rc->flags == FLAG_FLUSHQ)
+  {
+    status = 0;
+  }
+  else if (rc->flags == FLAG_QUEUED)
+  {
+    rrd_queue_dequeue (key, &queue_head, &queue_tail);
+    status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
+    if (status == 0)
+      rc->flags = FLAG_FLUSHQ;
+  }
   else if ((now - rc->first_value) < timeout)
+  {
     status = 0;
+  }
   else if (rc->values_num > 0)
   {
-    status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT);
+    status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
     if (status == 0)
-      rc->flags = FLAG_QUEUED;
+      rc->flags = FLAG_FLUSHQ;
   }
 
   return (status);
@@ -957,9 +1053,12 @@ static int rrd_cache_insert (const char *filename,
        {
                /* XXX: If you need to lock both, cache_lock and queue_lock, at
                 * the same time, ALWAYS lock `cache_lock' first! */
-               if (rc->flags != FLAG_QUEUED)
+               if (rc->flags == FLAG_NONE)
                {
-                       if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0)
+                       int status;
+
+                       status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
+                       if (status == 0)
                                rc->flags = FLAG_QUEUED;
                }
                else
@@ -972,7 +1071,6 @@ static int rrd_cache_insert (const char *filename,
                        ((time (NULL) - cache_flush_last) > cache_flush_timeout))
                rrd_cache_flush (cache_flush_timeout);
 
-
        pthread_mutex_unlock (&cache_lock);
 
        return (0);
@@ -1168,6 +1266,25 @@ static int rrd_config (const char *key, const char *value)
                }
                xff = tmp;
        }
+       else if (strcasecmp ("WritesPerSecond", key) == 0)
+       {
+               double wps = atof (value);
+
+               if (wps < 0.0)
+               {
+                       fprintf (stderr, "rrdtool: `WritesPerSecond' must be "
+                                       "greater than or equal to zero.");
+                       return (1);
+               }
+               else if (wps == 0.0)
+               {
+                       write_rate = 0.0;
+               }
+               else
+               {
+                       write_rate = 1.0 / wps;
+               }
+       }
        else
        {
                return (-1);