Code

Now, moving a value to the head of the queue is O(1). Before it was
authoroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Wed, 1 Oct 2008 19:44:36 +0000 (19:44 +0000)
committeroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Wed, 1 Oct 2008 19:44:36 +0000 (19:44 +0000)
O(queue size).  This improves performance of individual flushes when
there is a large number of files in the queue.  As a result, we don't
hold the cache_lock as much.

Revamped enqueue_cache_item to take advantage of the new structure.

Renamed _wipe_ci_values to look nicer with other code.

--kevin

git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1555 a5681a0c-68f1-0310-ab6d-d61299d08faa

doc/rrdcached.pod
src/rrd_daemon.c

index 98376a11009a0576b51a63d6028d55c3a41a27dc..a30f2930d9c5d5be85c5c4711bfcf5b04b35f0cb 100644 (file)
@@ -224,7 +224,8 @@ to disk.
  +---+----+---+    +------+-----+             +---+----+---+
  ! File:  foo !    ! File:  bar !             ! File:  qux !
  ! First: 101 !    ! First: 119 !             ! First: 180 !
- ! Next:   ---+--->! Next:   ---+---> ... --->! Next:   -  !
+ ! Next:&bar -+--->! Next:&... -+---> ... --->! Next:NULL  !
+ | Prev:NULL  !<---+-Prev:&foo  !<--- ... ----+-Prev: &... !
  +============+    +============+             +============+
  ! Time:  100 !    ! Time:  120 !             ! Time:  180 !
  ! Value:  10 !    ! Value: 0.1 !             ! Value: 2,2 !
index e869a86d613508dfaf928db539086642e99b28d8..604aee350a10aa34d8a8e8ac8f02d50dd418af9d 100644 (file)
@@ -128,6 +128,7 @@ struct cache_item_s
 #define CI_FLAGS_IN_QUEUE (1<<1)
   int flags;
   pthread_cond_t  flushed;
+  cache_item_t *prev;
   cache_item_t *next;
 };
 
@@ -402,7 +403,7 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
   return (0);
 } /* }}} ssize_t swrite */
 
-static void _wipe_ci_values(cache_item_t *ci, time_t when)
+static void wipe_ci_values(cache_item_t *ci, time_t when)
 {
   ci->values = NULL;
   ci->values_num = 0;
@@ -410,10 +411,30 @@ static void _wipe_ci_values(cache_item_t *ci, time_t when)
   ci->last_flush_time = when;
   if (config_write_jitter > 0)
     ci->last_flush_time += (random() % config_write_jitter);
-
-  ci->flags &= ~(CI_FLAGS_IN_QUEUE);
 }
 
+/* remove_from_queue
+ * remove a "cache_item_t" item from the queue.
+ * must hold 'cache_lock' when calling this
+ */
+static void remove_from_queue(cache_item_t *ci) /* {{{ */
+{
+  if (ci == NULL) return;
+
+  if (ci->prev == NULL)
+    cache_queue_head = ci->next; /* reset head */
+  else
+    ci->prev->next = ci->next;
+
+  if (ci->next == NULL)
+    cache_queue_tail = ci->prev; /* reset the tail */
+  else
+    ci->next->prev = ci->prev;
+
+  ci->next = ci->prev = NULL;
+  ci->flags &= ~CI_FLAGS_IN_QUEUE;
+} /* }}} static void remove_from_queue */
+
 /*
  * enqueue_cache_item:
  * `cache_lock' must be acquired before calling this function!
@@ -421,8 +442,6 @@ static void _wipe_ci_values(cache_item_t *ci, time_t when)
 static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
     queue_side_t side)
 {
-  int did_insert = 0;
-
   if (ci == NULL)
     return (-1);
 
@@ -431,67 +450,47 @@ static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
 
   if (side == HEAD)
   {
-    if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
-    {
-      assert (ci->next == NULL);
-      ci->next = cache_queue_head;
-      cache_queue_head = ci;
-
-      if (cache_queue_tail == NULL)
-        cache_queue_tail = cache_queue_head;
+    if (cache_queue_head == ci)
+      return 0;
 
-      did_insert = 1;
-    }
-    else if (cache_queue_head == ci)
-    {
-      /* do nothing */
-    }
-    else /* enqueued, but not first entry */
-    {
-      cache_item_t *prev;
+    /* remove from the double linked list */
+    if (ci->flags & CI_FLAGS_IN_QUEUE)
+      remove_from_queue(ci);
 
-      /* find previous entry */
-      for (prev = cache_queue_head; prev != NULL; prev = prev->next)
-        if (prev->next == ci)
-          break;
-      assert (prev != NULL);
+    ci->prev = NULL;
+    ci->next = cache_queue_head;
+    if (ci->next != NULL)
+      ci->next->prev = ci;
+    cache_queue_head = ci;
 
-      /* move to the front */
-      prev->next = ci->next;
-      ci->next = cache_queue_head;
-      cache_queue_head = ci;
-
-      /* check if we need to adapt the tail */
-      if (cache_queue_tail == ci)
-        cache_queue_tail = prev;
-    }
+    if (cache_queue_tail == NULL)
+      cache_queue_tail = cache_queue_head;
   }
   else /* (side == TAIL) */
   {
     /* We don't move values back in the list.. */
-    if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+    if (ci->flags & CI_FLAGS_IN_QUEUE)
       return (0);
 
     assert (ci->next == NULL);
+    assert (ci->prev == NULL);
+
+    ci->prev = cache_queue_tail;
 
     if (cache_queue_tail == NULL)
       cache_queue_head = ci;
     else
       cache_queue_tail->next = ci;
-    cache_queue_tail = ci;
 
-    did_insert = 1;
+    cache_queue_tail = ci;
   }
 
   ci->flags |= CI_FLAGS_IN_QUEUE;
 
-  if (did_insert)
-  {
-    pthread_cond_broadcast(&cache_cond);
-    pthread_mutex_lock (&stats_lock);
-    stats_queue_length++;
-    pthread_mutex_unlock (&stats_lock);
-  }
+  pthread_cond_broadcast(&cache_cond);
+  pthread_mutex_lock (&stats_lock);
+  stats_queue_length++;
+  pthread_mutex_unlock (&stats_lock);
 
   return (0);
 } /* }}} int enqueue_cache_item */
@@ -684,12 +683,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
     values = ci->values;
     values_num = ci->values_num;
 
-    _wipe_ci_values(ci, time(NULL));
-
-    cache_queue_head = ci->next;
-    if (cache_queue_head == NULL)
-      cache_queue_tail = NULL;
-    ci->next = NULL;
+    wipe_ci_values(ci, time(NULL));
+    remove_from_queue(ci);
 
     pthread_mutex_lock (&stats_lock);
     assert (stats_queue_length > 0);
@@ -1240,7 +1235,7 @@ static int handle_request_update (int fd, /* {{{ */
       return (0);
     }
 
-    _wipe_ci_values(ci, now);
+    wipe_ci_values(ci, now);
     ci->flags = CI_FLAGS_IN_TREE;
 
     pthread_mutex_lock(&cache_lock);
@@ -1331,7 +1326,8 @@ static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */
     free(ci->values);
   }
 
-  _wipe_ci_values(ci, time(NULL));
+  wipe_ci_values(ci, time(NULL));
+  remove_from_queue(ci);
 
   pthread_mutex_unlock(&cache_lock);
   return (0);