From cae0923259441a8f70683ca1b6d98d43b88d4ab7 Mon Sep 17 00:00:00 2001 From: oetiker Date: Sat, 8 Aug 2009 09:27:13 +0000 Subject: [PATCH] Two-phase shutdown for rrdcached ensures that values are flushed. Previously, it was possible for the queue threads to exit before the flush thread completed queueing values. If running with -F, rrdcached may have crashed due to assertion failure before writing all values. -- kevin git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@1880 a5681a0c-68f1-0310-ab6d-d61299d08faa --- program/src/rrd_daemon.c | 43 +++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/program/src/rrd_daemon.c b/program/src/rrd_daemon.c index 8b1fc9e7..36701f89 100644 --- a/program/src/rrd_daemon.c +++ b/program/src/rrd_daemon.c @@ -215,7 +215,11 @@ static uid_t daemon_uid; static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; -static int do_shutdown = 0; +enum { + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ +} state = RUNNING; static pthread_t *queue_threads; static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER; @@ -273,7 +277,7 @@ static int handle_request_help (HANDLER_PROTO); static void sig_common (const char *sig) /* {{{ */ { RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - do_shutdown++; + state = FLUSHING; pthread_cond_broadcast(&flush_cond); pthread_cond_broadcast(&queue_cond); } /* }}} void sig_common */ @@ -739,13 +743,8 @@ static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ if (ci->flags & CI_FLAGS_IN_QUEUE) return FALSE; - if ((ci->last_flush_time <= cfd->abs_timeout) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if ((do_shutdown != 0) - && (ci->values_num > 0)) + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING)) { enqueue_cache_item (ci, TAIL); } @@ -814,7 +813,7 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_lock(&cache_lock); - while (!do_shutdown) + while (state == RUNNING) { gettimeofday (&now, NULL); if ((now.tv_sec > next_flush.tv_sec) @@ -847,6 +846,8 @@ static void *flush_thread_main (void *args __attribute__((unused))) /* {{{ */ if (config_flush_at_shutdown) flush_old_values (-1); /* flush everything */ + state = SHUTDOWN; + pthread_mutex_unlock(&cache_lock); return NULL; @@ -856,7 +857,7 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ { pthread_mutex_lock (&cache_lock); - while (!do_shutdown + while (state != SHUTDOWN || (cache_queue_head != NULL && config_flush_at_shutdown)) { cache_item_t *ci; @@ -866,8 +867,8 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ int status; /* Now, check if there's something to store away. If not, wait until - * something comes in. if we are shutting down, do not wait around. */ - if (cache_queue_head == NULL && !do_shutdown) + * something comes in. */ + if (cache_queue_head == NULL) { status = pthread_cond_wait (&queue_cond, &cache_lock); if ((status != 0) && (status != ETIMEDOUT)) @@ -1372,6 +1373,10 @@ static int handle_request_update (HANDLER_PROTO) /* {{{ */ free_cache_item (ci); ci = tmp; } + + /* state may have changed while we were unlocked */ + if (state == SHUTDOWN) + return -1; } /* }}} */ assert (ci != NULL); @@ -2003,7 +2008,7 @@ static void *connection_thread_main (void *args) /* {{{ */ connection_threads_num++; pthread_mutex_unlock (&connection_threads_lock); - while (do_shutdown == 0) + while (state == RUNNING) { char *cmd; ssize_t cmd_len; @@ -2018,7 +2023,7 @@ static void *connection_thread_main (void *args) /* {{{ */ pollfd.revents = 0; status = poll (&pollfd, 1, /* timeout = */ 500); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2323,7 +2328,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ RRDD_LOG(LOG_INFO, "listening for connections"); - while (do_shutdown == 0) + while (state == RUNNING) { for (i = 0; i < pollfds_num; i++) { @@ -2333,7 +2338,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } status = poll (pollfds, pollfds_num, /* timeout = */ 1000); - if (do_shutdown) + if (state != RUNNING) break; else if (status == 0) /* timeout */ continue; @@ -2396,7 +2401,7 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } } /* for (pollfds_num) */ - } /* while (do_shutdown == 0) */ + } /* while (state == RUNNING) */ RRDD_LOG(LOG_INFO, "starting shutdown"); @@ -2508,8 +2513,6 @@ error: static int cleanup (void) /* {{{ */ { - do_shutdown++; - pthread_cond_broadcast (&flush_cond); pthread_join (flush_thread, NULL); -- 2.30.2