From: oetiker Date: Sun, 14 Sep 2008 10:35:51 +0000 (+0000) Subject: did not pick up all the changes for rrdcached in the first round ... so here is the... X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=a186ea0668474b9bc5bb982e523648d70d0778b1;p=rrdtool-all.git did not pick up all the changes for rrdcached in the first round ... so here is the second batch. git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk@1505 a5681a0c-68f1-0310-ab6d-d61299d08faa --- diff --git a/program/CONTRIBUTORS b/program/CONTRIBUTORS index a2dc2320..ebb76e3d 100644 --- a/program/CONTRIBUTORS +++ b/program/CONTRIBUTORS @@ -32,6 +32,7 @@ Joel Becker AIX Joey Miller php3 and php4 bindings Jost.Krieger Kai Siering +Kevin Brintnall bugfixes in and additions to rrdcached, including journaling support Larry Leszczynski Mark Plaksin rrd_graph_v Matt Chambers --full-size-mode for rrdgraph diff --git a/program/doc/rrdcached.pod b/program/doc/rrdcached.pod index 8ad37beb..7631351b 100644 --- a/program/doc/rrdcached.pod +++ b/program/doc/rrdcached.pod @@ -6,7 +6,7 @@ rrdcached - Data caching daemon for rrdtool =head1 SYNOPSIS -B [B<-l> I
] [B<-w> I] [B<-f> I] +B [B<-l> I
] [B<-w> I] [B<-z> I] [B<-f> I] [B<-j> I] =head1 DESCRIPTION @@ -42,6 +42,13 @@ C, will be used. Data is written to disk every I seconds. If this option is not specified the default interval of 300Eseconds will be used. +=item B<-z> I + +If specified, rrdcached will delay writing of each RRD for a random number +of seconds in the rangeE[0,I). This will avoid too many +writes being queued simultaneously. This value should be no greater than +the value specified in B<-w>. By default, there is no delay. + =item B<-f> I Every I seconds the entire cache is searched for old values which are @@ -54,6 +61,19 @@ cases. This timeout defaults to 3600Eseconds. Sets the name and location of the PID-file. If not specified, the default, C/run/rrdcached.pid> will be used. +=item B<-j> I + +Write updates to a journal in I. In the event of a program or system +crash, this will allow the daemon to write any updates that were pending +at the time of the crash. + +On startup, the daemon will check for journal files in this directory. If +found, all updates therein will be read into memory before the daemon +starts accepting new connections. + +The journal will be rotated with the same frequency as the flush timer +given by B<-f>. On clean shutdown, the journal files are removed. + =item B<-b> I The daemon will change into a specific directory at startup. All files passed @@ -293,12 +313,16 @@ name of the value, a colon, one or more spaces and the actual value. Example: - 5 Statistics follow + 9 Statistics follow QueueLength: 0 + UpdatesReceived: 30 + FlushesReceived: 2 UpdatesWritten: 13 DataSetsWritten: 390 TreeNodesNumber: 13 TreeDepth: 4 + JournalBytes: 190 + JournalRotate: 0 =item B I I [I ...] @@ -306,6 +330,13 @@ Adds more data to a filename. This is B operation the daemon was designed for, so describing the mechanism again is unnecessary. Read L above for a detailed explanation. +=item B I + +This command is written to the journal after a file is successfully +written out to disk. It is used during journal replay to determine which +updates have already been applied. It is I valid in the journal; it +is not accepted from the other command channels. + =back =head2 Performance Values @@ -318,18 +349,18 @@ The following counters are returned by the B command: Number of nodes currently enqueued in the update queue. -=item B I<(unsigned 64bit integer)> +=item B I<(unsigned 64bit integer)> -Depth of the tree used for fast key lookup. +Number of UPDATE commands received. -=item B I<(unsigned 64bit integer)> +=item B I<(unsigned 64bit integer)> -Number of nodes in the cache. +Number of FLUSH commands received. =item B I<(unsigned 64bit integer)> -Total number of updates, i.Ee. calls to C, since the daemon -was started. +Total number of updates, i.Ee. calls to C, since the +daemon was started. =item B I<(unsigned 64bit integer)> @@ -338,6 +369,22 @@ data set is one or more values passed to the B command. For example: C is one data set with two values. The term "data set" is used to prevent confusion whether individual values or groups of values are counted. +=item B I<(unsigned 64bit integer)> + +Number of nodes in the cache. + +=item B I<(unsigned 64bit integer)> + +Depth of the tree used for fast key lookup. + +=item B I<(unsigned 64bit integer)> + +Total number of bytes written to the journal since startup. + +=item B I<(unsigned 64bit integer)> + +Number of times the journal has been rotated since startup. + =back =head1 BUGS @@ -348,7 +395,14 @@ No known bugs at the moment. L, L -=head1 AUHOR +=head1 AUTHOR B and this manual page have been written by Florian Forster EoctoEatEverplant.orgE. + +=head1 CONTRIBUTORS + +kevin brintnall Ekbrint@rufus.netE + +=cut + diff --git a/program/doc/rrdflush.pod b/program/doc/rrdflush.pod new file mode 100644 index 00000000..f691c211 --- /dev/null +++ b/program/doc/rrdflush.pod @@ -0,0 +1,54 @@ +=head1 NAME + +rrdflush - Flush the values for a spcific RRD file from memory. + +=head1 SYNOPSIS + +B B I +S<[B<--daemon> I
]> + +=head1 DESCRIPTION + +The B function connects to L, the RRD caching daemon, and +issues a "flush" command for the given file. The daemon will put this file to +the head of the update queue so it is written "soon". The status will be +returned after the node has been B by the update thread. By the time +execution of this command ends it is very likely that the update thread has +just updated the requested file, though this is not guaranteed. + +=over 8 + +=item I + +The name of the B that is to be written to disk. + +=item B<--daemon> I
+ +Address of the L daemon. If not specified, the RRDCACHED_ADDRESS +environment variable must be set (see below). To specify a UNIX domain socket +use the prefix C, see example below. Other addresses are interpreted as +normal network addresses, i.Ee. IPv4 or IPv6 addresses in most cases. + + rrdtool flush --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd + +=back + +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cflush>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + +=head1 AUTHOR + +Florian Forster EoctoEatEverplant.orgE + diff --git a/program/src/librrd.sym.in b/program/src/librrd.sym.in index 22afc865..ae5da6f7 100644 --- a/program/src/librrd.sym.in +++ b/program/src/librrd.sym.in @@ -51,7 +51,10 @@ rrd_version rrd_write rrd_xport rrdc_connect +rrdc_is_connected rrdc_disconnect rrdc_flush +rrdc_stats_free +rrdc_stats_get rrdc_update @RRD_GETOPT_LONG@ diff --git a/program/src/rrd_client.c b/program/src/rrd_client.c index f1253f8e..d1ad5e06 100644 --- a/program/src/rrd_client.c +++ b/program/src/rrd_client.c @@ -21,6 +21,7 @@ #include "rrd.h" #include "rrd_client.h" +#include "rrd_tool.h" #include #include @@ -32,8 +33,23 @@ #include #include +#ifndef ENODATA +#define ENODATA ENOENT +#endif + +struct rrdc_response_s +{ + int status; + char *message; + char **lines; + size_t lines_num; +}; +typedef struct rrdc_response_s rrdc_response_t; + static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; static int sd = -1; +static char *sd_path = NULL; /* cache the path for sd */ +static void _disconnect(void); static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ { @@ -57,16 +73,15 @@ static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ if (status == 0) { - close (sd); - sd = -1; + _disconnect(); errno = EPROTO; return (-1); } assert ((0 > status) || (buffer_free >= (size_t) status)); - buffer_free = buffer_free - status; - buffer_used = buffer_used + status; + buffer_free -= status; + buffer_used += status; if (buffer[buffer_used - 1] == '\n') break; @@ -78,7 +93,7 @@ static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ return (-1); } - buffer[buffer_used - 1] = 0; + buffer[buffer_used - 1] = '\0'; return (buffer_used); } /* }}} ssize_t sread */ @@ -100,13 +115,13 @@ static ssize_t swrite (const void *buf, size_t count) /* {{{ */ if (status < 0) { - close (sd); - sd = -1; + _disconnect(); + rrd_set_error("lost connection to rrdcached"); return (status); } - nleft = nleft - status; - ptr = ptr + status; + nleft -= status; + ptr += status; } return (0); @@ -177,26 +192,143 @@ static int buffer_add_value (const char *value, /* {{{ */ return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); } /* }}} int buffer_add_value */ -static int rrdc_connect_unix (const char *path) /* {{{ */ +static int response_parse (char *buffer, size_t buffer_size, /* {{{ */ + rrdc_response_t **ret_response) { - struct sockaddr_un sa; - int status; + rrdc_response_t *ret; - assert (path != NULL); + char *dummy; + char *saveptr; - pthread_mutex_lock (&lock); + char *line_ptr; + size_t line_counter; - if (sd >= 0) + if (buffer == NULL) + return (EINVAL); + if (buffer_size <= 0) + return (EINVAL); + + if (buffer[buffer_size - 1] != 0) + return (-1); + + ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t)); + if (ret == NULL) + return (ENOMEM); + memset (ret, 0, sizeof (*ret)); + + line_counter = 0; + + dummy = buffer; + saveptr = NULL; + while ((line_ptr = strtok_r (dummy, "\r\n", &saveptr)) != NULL) { - pthread_mutex_unlock (&lock); - return (0); + dummy = NULL; + + if (ret->message == NULL) + { + ret->status = strtol (buffer, &ret->message, 0); + if (buffer == ret->message) + { + free (ret); + return (EPROTO); + } + + /* Skip leading whitespace of the status message */ + ret->message += strspn (ret->message, " \t"); + + if (ret->status > 0) + { + ret->lines = (char **) malloc (sizeof (char *) * ret->status); + if (ret->lines == NULL) + { + free (ret); + return (ENOMEM); + } + memset (ret->lines, 0, sizeof (char *) * ret->status); + ret->lines_num = (size_t) ret->status; + } + else + { + ret->lines = NULL; + ret->lines_num = 0; + } + } + else /* if (ret->message != NULL) */ + { + if (line_counter < ret->lines_num) + ret->lines[line_counter] = line_ptr; + line_counter++; + } + } /* while (strtok_r) */ + + if (ret->lines_num != line_counter) + { + errno = EPROTO; + if (ret->lines != NULL) + free (ret->lines); + free (ret); + return (-1); + } + + *ret_response = ret; + return (0); +} /* }}} int response_parse */ + +static void response_free (rrdc_response_t *res) /* {{{ */ +{ + if (res == NULL) + return; + + if (res->lines != NULL) + { + res->lines_num = 0; + free (res->lines); + res->lines = NULL; } + free (res); +} /* }}} void response_free */ + + +/* determine whether we are connected to the specified daemon_addr if + * NULL, return whether we are connected at all + */ +int rrdc_is_connected(const char *daemon_addr) /* {{{ */ +{ + if (sd < 0) + return 0; + else if (daemon_addr == NULL) + { + /* here we have to handle the case i.e. + * UPDATE --daemon ...; UPDATEV (no --daemon) ... + * In other words: we have a cached connection, + * but it is not specified in the current command. + * Daemon is only implied in this case if set in ENV + */ + if (getenv(ENV_RRDCACHED_ADDRESS) != NULL) + return 1; + else + return 0; + } + else if (strcmp(daemon_addr, sd_path) == 0) + return 1; + else + return 0; + +} /* }}} int rrdc_is_connected */ + +static int rrdc_connect_unix (const char *path) /* {{{ */ +{ + struct sockaddr_un sa; + int status; + + assert (path != NULL); + assert (sd == -1); + sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); if (sd < 0) { status = errno; - pthread_mutex_unlock (&lock); return (status); } @@ -208,38 +340,22 @@ static int rrdc_connect_unix (const char *path) /* {{{ */ if (status != 0) { status = errno; - pthread_mutex_unlock (&lock); return (status); } - pthread_mutex_unlock (&lock); - return (0); } /* }}} int rrdc_connect_unix */ -int rrdc_connect (const char *addr) /* {{{ */ +static int rrdc_connect_network (const char *addr) /* {{{ */ { struct addrinfo ai_hints; struct addrinfo *ai_res; struct addrinfo *ai_ptr; - int status; - - if (addr == NULL) - addr = RRDCACHED_DEFAULT_ADDRESS; - if (strncmp ("unix:", addr, strlen ("unix:")) == 0) - return (rrdc_connect_unix (addr + strlen ("unix:"))); - else if (addr[0] == '/') - return (rrdc_connect_unix (addr)); - - pthread_mutex_lock (&lock); - - if (sd >= 0) - { - pthread_mutex_unlock (&lock); - return (0); - } + assert (addr != NULL); + assert (sd == -1); + int status; memset (&ai_hints, 0, sizeof (ai_hints)); ai_hints.ai_flags = 0; #ifdef AI_ADDRCONFIG @@ -251,10 +367,7 @@ int rrdc_connect (const char *addr) /* {{{ */ ai_res = NULL; status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); if (status != 0) - { - pthread_mutex_unlock (&lock); return (status); - } for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) { @@ -270,31 +383,76 @@ int rrdc_connect (const char *addr) /* {{{ */ if (status != 0) { status = errno; - close (sd); - sd = -1; + _disconnect(); continue; } assert (status == 0); break; } /* for (ai_ptr) */ - pthread_mutex_unlock (&lock); return (status); -} /* }}} int rrdc_connect */ +} /* }}} int rrdc_connect_network */ -int rrdc_disconnect (void) /* {{{ */ +int rrdc_connect (const char *addr) /* {{{ */ { - pthread_mutex_lock (&lock); + int status = 0; - if (sd < 0) + if (addr == NULL) + addr = getenv (ENV_RRDCACHED_ADDRESS); + + if (addr == NULL) + return 0; + + pthread_mutex_lock(&lock); + + if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0) { + /* connection to the same daemon; use cached connection */ pthread_mutex_unlock (&lock); return (0); } + else + { + _disconnect(); + } + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + status = rrdc_connect_unix (addr + strlen ("unix:")); + else if (addr[0] == '/') + status = rrdc_connect_unix (addr); + else + status = rrdc_connect_network(addr); + + if (status == 0 && sd >= 0) + sd_path = strdup(addr); + else + rrd_set_error("Unable to connect to rrdcached: %s", + (status < 0) + ? "Internal error" + : rrd_strerror (status)); + + pthread_mutex_unlock (&lock); + return (status); +} /* }}} int rrdc_connect */ + +static void _disconnect(void) /* {{{ */ +{ + if (sd >= 0) + close(sd); + + if (sd_path != NULL) + free(sd_path); - close (sd); sd = -1; + sd_path = NULL; +} /* }}} static void _disconnect(void) */ + +int rrdc_disconnect (void) /* {{{ */ +{ + pthread_mutex_lock (&lock); + + _disconnect(); pthread_mutex_unlock (&lock); @@ -431,6 +589,204 @@ int rrdc_flush (const char *filename) /* {{{ */ return (status); } /* }}} int rrdc_flush */ + + +/* convenience function; if there is a daemon specified, or if we can + * detect one from the environment, then flush the file. Otherwise, no-op + */ +int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */ +{ + int status = 0; + + rrdc_connect(opt_daemon); + + if (rrdc_is_connected(opt_daemon)) + { + status = rrdc_flush (filename); + if (status != 0) + { + rrd_set_error ("rrdc_flush (%s) failed with status %i.", + filename, status); + } + } /* if (daemon_addr) */ + + return status; +} /* }}} int rrdc_flush_if_daemon */ + + +int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */ +{ + rrdc_stats_t *head; + rrdc_stats_t *tail; + + rrdc_response_t *response; + + char buffer[4096]; + size_t buffer_size; + int status; + size_t i; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + /* Protocol example: {{{ + * -> STATS + * <- 5 Statistics follow + * <- QueueLength: 0 + * <- UpdatesWritten: 0 + * <- DataSetsWritten: 0 + * <- TreeNodesNumber: 0 + * <- TreeDepth: 0 + * }}} */ + status = swrite ("STATS\n", strlen ("STATS\n")); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + /* Assert NULL termination */ + buffer_size = (size_t) status; + if (buffer[buffer_size - 1] != 0) + { + if (buffer_size < sizeof (buffer)) + { + buffer[buffer_size] = 0; + buffer_size++; + } + else + { + return (ENOBUFS); + } + } + + status = response_parse (buffer, buffer_size, &response); + if (status != 0) + return (status); + + if (response->status <= 0) + { + response_free (response); + return (EIO); + } + + head = NULL; + tail = NULL; + for (i = 0; i < response->lines_num; i++) + { + char *key; + char *value; + char *endptr; + rrdc_stats_t *s; + + key = response->lines[i]; + value = strchr (key, ':'); + if (value == NULL) + continue; + *value = 0; + value++; + + while ((value[0] == ' ') || (value[0] == '\t')) + value++; + + s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t)); + if (s == NULL) + continue; + memset (s, 0, sizeof (*s)); + + s->name = strdup (key); + + endptr = NULL; + if ((strcmp ("QueueLength", key) == 0) + || (strcmp ("TreeNodesNumber", key) == 0) + || (strcmp ("TreeDepth", key) == 0)) + { + s->type = RRDC_STATS_TYPE_GAUGE; + s->value.gauge = strtod (value, &endptr); + } + else if ((strcmp ("UpdatesWritten", key) == 0) + || (strcmp ("DataSetsWritten", key) == 0)) + { + s->type = RRDC_STATS_TYPE_COUNTER; + s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0); + } + else + { + free (s); + continue; + } + + /* Conversion failed */ + if (endptr == value) + { + free (s); + continue; + } + + if (head == NULL) + { + head = s; + tail = s; + s->next = NULL; + } + else + { + tail->next = s; + tail = s; + } + } /* for (i = 0; i < response->lines_num; i++) */ + + response_free (response); + + if (head == NULL) + return (EPROTO); + + *ret_stats = head; + return (0); +} /* }}} int rrdc_stats_get */ + +void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */ +{ + rrdc_stats_t *this; + + this = ret_stats; + while (this != NULL) + { + rrdc_stats_t *next; + + next = this->next; + + if (this->name != NULL) + { + free (this->name); + this->name = NULL; + } + free (this); + + this = next; + } /* while (this != NULL) */ +} /* }}} void rrdc_stats_free */ + /* * vim: set sw=2 sts=2 ts=8 et fdm=marker : */ diff --git a/program/src/rrd_client.h b/program/src/rrd_client.h index 92d4c07c..1776c2b3 100644 --- a/program/src/rrd_client.h +++ b/program/src/rrd_client.h @@ -22,6 +22,8 @@ #ifndef __RRD_CLIENT_H #define __RRD_CLIENT_H 1 +#include + #ifndef RRDCACHED_DEFAULT_ADDRESS # define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock" #endif @@ -30,11 +32,36 @@ #define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS" int rrdc_connect (const char *addr); +int rrdc_is_connected(const char *daemon_addr); int rrdc_disconnect (void); int rrdc_update (const char *filename, int values_num, const char * const *values); int rrdc_flush (const char *filename); +int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename); + + +struct rrdc_stats_s +{ + const char *name; + uint16_t type; +#define RRDC_STATS_TYPE_GAUGE 0x0001 +#define RRDC_STATS_TYPE_COUNTER 0x0002 + uint16_t flags; + union + { + uint64_t counter; + double gauge; + } value; + struct rrdc_stats_s *next; +}; +typedef struct rrdc_stats_s rrdc_stats_t; + +int rrdc_stats_get (rrdc_stats_t **ret_stats); +void rrdc_stats_free (rrdc_stats_t *ret_stats); #endif /* __RRD_CLIENT_H */ +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */ diff --git a/program/src/rrd_daemon.c b/program/src/rrd_daemon.c index bc299f8d..0816526b 100644 --- a/program/src/rrd_daemon.c +++ b/program/src/rrd_daemon.c @@ -1,6 +1,7 @@ /** * RRDTool - src/rrd_daemon.c * Copyright (C) 2008 Florian octo Forster + * Copyright (C) 2008 Kevin Brintnall * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the @@ -17,8 +18,10 @@ * * Authors: * Florian octo Forster + * kevin brintnall **/ +#if 0 /* * First tell the compiler to stick to the C99 and POSIX standards as close as * possible. @@ -54,6 +57,7 @@ # undef _GNU_SOURCE #endif /* }}} */ +#endif /* 0 */ /* * Now for some includes.. @@ -112,8 +116,8 @@ struct cache_item_s char **values; int values_num; time_t last_flush_time; -#define CI_FLAGS_IN_TREE 0x01 -#define CI_FLAGS_IN_QUEUE 0x02 +#define CI_FLAGS_IN_TREE (1<<0) +#define CI_FLAGS_IN_QUEUE (1<<1) int flags; cache_item_t *next; @@ -135,9 +139,14 @@ enum queue_side_e }; typedef enum queue_side_e queue_side_t; +/* max length of socket command or response */ +#define CMD_MAX 4096 + /* * Variables */ +static int stay_foreground = 0; + static listen_socket_t *listen_fds = NULL; static size_t listen_fds_num = 0; @@ -145,9 +154,9 @@ static int do_shutdown = 0; static pthread_t queue_thread; -static pthread_t *connetion_threads = NULL; -static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static int connetion_threads_num = 0; +static pthread_t *connection_threads = NULL; +static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connection_threads_num = 0; /* Cache stuff */ static GTree *cache_tree = NULL; @@ -159,6 +168,7 @@ static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static int config_write_interval = 300; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static char *config_pid_file = NULL; static char *config_base_dir = NULL; @@ -167,27 +177,45 @@ static char **config_listen_address_list = NULL; static int config_listen_address_list_len = 0; static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_received = 0; +static uint64_t stats_flush_received = 0; static uint64_t stats_updates_written = 0; static uint64_t stats_data_sets_written = 0; +static uint64_t stats_journal_bytes = 0; +static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; +/* Journaled updates */ +static char *journal_cur = NULL; +static char *journal_old = NULL; +static FILE *journal_fh = NULL; +static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; +static int journal_write(char *cmd, char *args); +static void journal_done(void); +static void journal_rotate(void); + /* * Functions */ static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGINT"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_int_handler */ static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ { + RRDD_LOG(LOG_NOTICE, "caught SIGTERM"); do_shutdown++; + pthread_cond_broadcast(&cache_cond); } /* }}} void sig_term_handler */ static int write_pidfile (void) /* {{{ */ { pid_t pid; char *file; + int fd; FILE *fh; pid = getpid (); @@ -196,10 +224,19 @@ static int write_pidfile (void) /* {{{ */ ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - fh = fopen (file, "w"); + fd = open(file, O_CREAT|O_EXCL|O_WRONLY, S_IRUSR|S_IRGRP|S_IROTH); + if (fd < 0) + { + RRDD_LOG(LOG_ERR, "FATAL: cannot create '%s' (%s)", + file, rrd_strerror(errno)); + return (-1); + } + + fh = fdopen (fd, "w"); if (fh == NULL) { RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + close(fd); return (-1); } @@ -282,6 +319,9 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ size_t nleft; ssize_t status; + /* special case for journal replay */ + if (fd < 0) return 0; + ptr = (const char *) buf; nleft = count; @@ -295,13 +335,25 @@ static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ if (status < 0) return (status); - nleft = nleft - status; - ptr = ptr + status; + nleft -= status; + ptr += status; } return (0); } /* }}} ssize_t swrite */ +static void _wipe_ci_values(cache_item_t *ci, time_t when) +{ + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (random() % config_write_jitter); + + ci->flags &= ~(CI_FLAGS_IN_QUEUE); +} + /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! @@ -517,8 +569,14 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ flush_old_values (config_write_interval); /* Determine the time of the next cache flush. */ - while (next_flush.tv_sec < now.tv_sec) + while (next_flush.tv_sec <= now.tv_sec) next_flush.tv_sec += config_flush_interval; + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); } /* Now, check if there's something to store away. If not, wait until @@ -552,14 +610,13 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ continue; } + assert(ci->values != NULL); + assert(ci->values_num > 0); + values = ci->values; values_num = ci->values_num; - ci->values = NULL; - ci->values_num = 0; - - ci->last_flush_time = time (NULL); - ci->flags &= ~(CI_FLAGS_IN_QUEUE); + _wipe_ci_values(ci, time(NULL)); cache_queue_head = ci->next; if (cache_queue_head == NULL) @@ -573,18 +630,23 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ pthread_mutex_unlock (&cache_lock); + rrd_clear_error (); status = rrd_update_r (file, NULL, values_num, (void *) values); if (status != 0) { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "rrd_update_r failed with status %i.", - status); + RRDD_LOG (LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); } - free (file); + journal_write("wrote", file); + for (i = 0; i < values_num; i++) free (values[i]); + free(values); + free(file); + if (status == 0) { pthread_mutex_lock (&stats_lock); @@ -602,6 +664,10 @@ static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ pthread_mutex_unlock (&cache_lock); + assert(cache_queue_head == NULL); + RRDD_LOG(LOG_INFO, "clean shutdown; all RRDs flushed"); + journal_done(); + return (NULL); } /* }}} void *queue_thread_main */ @@ -625,13 +691,13 @@ static int buffer_get_field (char **buffer_ret, /* {{{ */ return (-1); /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == ' '); + assert (buffer[buffer_size - 1] == '\0'); status = -1; while (buffer_pos < buffer_size) { /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ') + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { field[field_size] = 0; field_size++; @@ -808,19 +874,27 @@ static int handle_request_stats (int fd, /* {{{ */ size_t buffer_size __attribute__((unused))) { int status; - char outbuf[4096]; + char outbuf[CMD_MAX]; uint64_t copy_queue_length; + uint64_t copy_updates_received; + uint64_t copy_flush_received; uint64_t copy_updates_written; uint64_t copy_data_sets_written; + uint64_t copy_journal_bytes; + uint64_t copy_journal_rotate; uint64_t tree_nodes_number; uint64_t tree_depth; pthread_mutex_lock (&stats_lock); copy_queue_length = stats_queue_length; + copy_updates_received = stats_updates_received; + copy_flush_received = stats_flush_received; copy_updates_written = stats_updates_written; copy_data_sets_written = stats_data_sets_written; + copy_journal_bytes = stats_journal_bytes; + copy_journal_rotate = stats_journal_rotate; pthread_mutex_unlock (&stats_lock); pthread_mutex_lock (&cache_lock); @@ -838,13 +912,21 @@ static int handle_request_stats (int fd, /* {{{ */ return (status); \ } - strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf)); + strncpy (outbuf, "9 Statistics follow\n", sizeof (outbuf)); RRDD_STATS_SEND; snprintf (outbuf, sizeof (outbuf), "QueueLength: %"PRIu64"\n", copy_queue_length); RRDD_STATS_SEND; + snprintf (outbuf, sizeof (outbuf), + "UpdatesReceived: %"PRIu64"\n", copy_updates_received); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof (outbuf), + "FlushesReceived: %"PRIu64"\n", copy_flush_received); + RRDD_STATS_SEND; + snprintf (outbuf, sizeof (outbuf), "UpdatesWritten: %"PRIu64"\n", copy_updates_written); RRDD_STATS_SEND; @@ -861,6 +943,14 @@ static int handle_request_stats (int fd, /* {{{ */ "TreeDepth: %"PRIu64"\n", tree_depth); RRDD_STATS_SEND; + snprintf (outbuf, sizeof(outbuf), + "JournalBytes: %"PRIu64"\n", copy_journal_bytes); + RRDD_STATS_SEND; + + snprintf (outbuf, sizeof(outbuf), + "JournalRotate: %"PRIu64"\n", copy_journal_rotate); + RRDD_STATS_SEND; + return (0); #undef RRDD_STATS_SEND } /* }}} int handle_request_stats */ @@ -870,7 +960,7 @@ static int handle_request_flush (int fd, /* {{{ */ { char *file; int status; - char result[4096]; + char result[CMD_MAX]; status = buffer_get_field (&buffer, &buffer_size, &file); if (status != 0) @@ -879,11 +969,24 @@ static int handle_request_flush (int fd, /* {{{ */ } else { + pthread_mutex_lock(&stats_lock); + stats_flush_received++; + pthread_mutex_unlock(&stats_lock); + status = flush_file (file); if (status == 0) snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file); else if (status == ENOENT) - snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + { + /* no file in our tree; see whether it exists at all */ + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) + snprintf (result, sizeof (result), "0 Nothing to flush: %s.\n", file); + else + snprintf (result, sizeof (result), "-1 No such file: %s.\n", file); + } else if (status < 0) strncpy (result, "-1 Internal error.\n", sizeof (result)); else @@ -912,7 +1015,7 @@ static int handle_request_update (int fd, /* {{{ */ time_t now; cache_item_t *ci; - char answer[4096]; + char answer[CMD_MAX]; #define RRDD_UPDATE_SEND \ answer[sizeof (answer) - 1] = 0; \ @@ -935,6 +1038,10 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } + pthread_mutex_lock(&stats_lock); + stats_updates_received++; + pthread_mutex_unlock(&stats_lock); + pthread_mutex_lock (&cache_lock); ci = g_tree_lookup (cache_tree, file); @@ -947,11 +1054,11 @@ static int handle_request_update (int fd, /* {{{ */ if (status != 0) { pthread_mutex_unlock (&cache_lock); - RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file); + RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); status = errno; if (status == ENOENT) - snprintf (answer, sizeof (answer), "-1 No such file: %s", file); + snprintf (answer, sizeof (answer), "-1 No such file: %s\n", file); else snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n", status); @@ -962,7 +1069,16 @@ static int handle_request_update (int fd, /* {{{ */ { pthread_mutex_unlock (&cache_lock); - snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file); + snprintf (answer, sizeof (answer), "-1 Not a regular file: %s\n", file); + RRDD_UPDATE_SEND; + return (0); + } + if (access(file, R_OK|W_OK) != 0) + { + pthread_mutex_unlock (&cache_lock); + + snprintf (answer, sizeof (answer), "-1 Cannot read/write %s: %s\n", + file, rrd_strerror(errno)); RRDD_UPDATE_SEND; return (0); } @@ -991,9 +1107,7 @@ static int handle_request_update (int fd, /* {{{ */ return (0); } - ci->values = NULL; - ci->values_num = 0; - ci->last_flush_time = now; + _wipe_ci_values(ci, now); ci->flags = CI_FLAGS_IN_TREE; g_tree_insert (cache_tree, (void *) ci->file, (void *) ci); @@ -1056,31 +1170,48 @@ static int handle_request_update (int fd, /* {{{ */ #undef RRDD_UPDATE_SEND } /* }}} int handle_request_update */ -static int handle_request (int fd) /* {{{ */ +/* we came across a "WROTE" entry during journal replay. + * throw away any values that we have accumulated for this file + */ +static int handle_request_wrote (int fd __attribute__((unused)), /* {{{ */ + const char *buffer, + size_t buffer_size __attribute__((unused))) { - char buffer[4096]; - size_t buffer_size; - char *buffer_ptr; - char *command; - int status; + int i; + cache_item_t *ci; + const char *file = buffer; - status = (int) sread (fd, buffer, sizeof (buffer)); - if (status == 0) + pthread_mutex_lock(&cache_lock); + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) { - return (1); + pthread_mutex_unlock(&cache_lock); + return (0); } - else if (status < 0) + + if (ci->values) { - RRDD_LOG (LOG_ERR, "handle_request: sread failed."); - return (-1); + for (i=0; i < ci->values_num; i++) + free(ci->values[i]); + + free(ci->values); } - buffer_size = (size_t) status; - assert (buffer_size <= sizeof (buffer)); - assert (buffer[buffer_size - 1] == 0); - /* Place the normal field separator at the end to simplify - * `buffer_get_field's work. */ - buffer[buffer_size - 1] = ' '; + _wipe_ci_values(ci, time(NULL)); + + pthread_mutex_unlock(&cache_lock); + return (0); +} /* }}} int handle_request_wrote */ + +/* if fd < 0, we are in journal replay mode */ +static int handle_request (int fd, char *buffer, size_t buffer_size) /* {{{ */ +{ + char *buffer_ptr; + char *command; + int status; + + assert (buffer[buffer_size - 1] == '\0'); buffer_ptr = buffer; command = NULL; @@ -1093,8 +1224,17 @@ static int handle_request (int fd) /* {{{ */ if (strcasecmp (command, "update") == 0) { + /* don't re-write updates in replay mode */ + if (fd >= 0) + journal_write(command, buffer_ptr); + return (handle_request_update (fd, buffer_ptr, buffer_size)); } + else if (strcasecmp (command, "wrote") == 0 && fd < 0) + { + /* this is only valid in replay mode */ + return (handle_request_wrote (fd, buffer_ptr, buffer_size)); + } else if (strcasecmp (command, "flush") == 0) { return (handle_request_flush (fd, buffer_ptr, buffer_size)); @@ -1109,7 +1249,7 @@ static int handle_request (int fd) /* {{{ */ } else { - char result[4096]; + char result[CMD_MAX]; snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command); result[sizeof (result) - 1] = 0; @@ -1125,36 +1265,174 @@ static int handle_request (int fd) /* {{{ */ return (0); } /* }}} int handle_request */ -static void *connection_thread_main (void *args /* {{{ */ - __attribute__((unused))) +/* MUST NOT hold journal_lock before calling this */ +static void journal_rotate(void) /* {{{ */ +{ + FILE *old_fh = NULL; + + if (journal_cur == NULL || journal_old == NULL) + return; + + pthread_mutex_lock(&journal_lock); + + /* we rotate this way (rename before close) so that the we can release + * the journal lock as fast as possible. Journal writes to the new + * journal can proceed immediately after the new file is opened. The + * fclose can then block without affecting new updates. + */ + if (journal_fh != NULL) + { + old_fh = journal_fh; + rename(journal_cur, journal_old); + ++stats_journal_rotate; + } + + journal_fh = fopen(journal_cur, "a"); + pthread_mutex_unlock(&journal_lock); + + if (old_fh != NULL) + fclose(old_fh); + + if (journal_fh == NULL) + RRDD_LOG(LOG_CRIT, + "JOURNALING DISABLED: Cannot open journal file '%s' : (%s)", + journal_cur, rrd_strerror(errno)); + +} /* }}} static void journal_rotate */ + +static void journal_done(void) /* {{{ */ +{ + if (journal_cur == NULL) + return; + + pthread_mutex_lock(&journal_lock); + if (journal_fh != NULL) + { + fclose(journal_fh); + journal_fh = NULL; + } + + RRDD_LOG(LOG_INFO, "removing journals"); + + unlink(journal_old); + unlink(journal_cur); + pthread_mutex_unlock(&journal_lock); + +} /* }}} static void journal_done */ + +static int journal_write(char *cmd, char *args) /* {{{ */ +{ + int chars; + + if (journal_fh == NULL) + return 0; + + pthread_mutex_lock(&journal_lock); + chars = fprintf(journal_fh, "%s %s\n", cmd, args); + pthread_mutex_unlock(&journal_lock); + + if (chars > 0) + { + pthread_mutex_lock(&stats_lock); + stats_journal_bytes += chars; + pthread_mutex_unlock(&stats_lock); + } + + return chars; +} /* }}} static int journal_write */ + +static int journal_replay (const char *file) /* {{{ */ +{ + FILE *fh; + int entry_cnt = 0; + int fail_cnt = 0; + uint64_t line = 0; + char entry[CMD_MAX]; + + if (file == NULL) return 0; + + fh = fopen(file, "r"); + if (fh == NULL) + { + if (errno != ENOENT) + RRDD_LOG(LOG_ERR, "journal_replay: cannot open journal file: '%s' (%s)", + file, rrd_strerror(errno)); + return 0; + } + else + RRDD_LOG(LOG_NOTICE, "replaying from journal: %s", file); + + while(!feof(fh)) + { + size_t entry_len; + + ++line; + fgets(entry, sizeof(entry), fh); + entry_len = strlen(entry); + + /* check \n termination in case journal writing crashed mid-line */ + if (entry_len == 0) + continue; + else if (entry[entry_len - 1] != '\n') + { + RRDD_LOG(LOG_NOTICE, "Malformed journal entry at line %"PRIu64, line); + ++fail_cnt; + continue; + } + + entry[entry_len - 1] = '\0'; + + if (handle_request(-1, entry, entry_len) == 0) + ++entry_cnt; + else + ++fail_cnt; + } + + fclose(fh); + + if (entry_cnt > 0) + { + RRDD_LOG(LOG_INFO, "Replayed %d entries (%d failures)", + entry_cnt, fail_cnt); + return 1; + } + else + return 0; + +} /* }}} static int journal_replay */ + +static void *connection_thread_main (void *args) /* {{{ */ { pthread_t self; int i; int fd; fd = *((int *) args); + free (args); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); { pthread_t *temp; - temp = (pthread_t *) realloc (connetion_threads, - sizeof (pthread_t) * (connetion_threads_num + 1)); + temp = (pthread_t *) realloc (connection_threads, + sizeof (pthread_t) * (connection_threads_num + 1)); if (temp == NULL) { RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed."); } else { - connetion_threads = temp; - connetion_threads[connetion_threads_num] = pthread_self (); - connetion_threads_num++; + connection_threads = temp; + connection_threads[connection_threads_num] = pthread_self (); + connection_threads_num++; } } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); while (do_shutdown == 0) { + char buffer[CMD_MAX]; + struct pollfd pollfd; int status; @@ -1188,7 +1466,18 @@ static void *connection_thread_main (void *args /* {{{ */ break; } - status = handle_request (fd); + status = (int) sread (fd, buffer, sizeof (buffer)); + if (status <= 0) + { + close (fd); + + if (status < 0) + RRDD_LOG(LOG_ERR, "connection_thread_main: sread failed."); + + break; + } + + status = handle_request (fd, buffer, /*buffer_size=*/ status); if (status != 0) { close (fd); @@ -1198,25 +1487,24 @@ static void *connection_thread_main (void *args /* {{{ */ self = pthread_self (); /* Remove this thread from the connection threads list */ - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); /* Find out own index in the array */ - for (i = 0; i < connetion_threads_num; i++) - if (pthread_equal (connetion_threads[i], self) != 0) + for (i = 0; i < connection_threads_num; i++) + if (pthread_equal (connection_threads[i], self) != 0) break; - assert (i < connetion_threads_num); + assert (i < connection_threads_num); /* Move the trailing threads forward. */ - if (i < (connetion_threads_num - 1)) + if (i < (connection_threads_num - 1)) { - memmove (connetion_threads + i, - connetion_threads + i + 1, - sizeof (pthread_t) * (connetion_threads_num - i - 1)); + memmove (connection_threads + i, + connection_threads + i + 1, + sizeof (pthread_t) * (connection_threads_num - i - 1)); } - connetion_threads_num--; - pthread_mutex_unlock (&connetion_threads_lock); + connection_threads_num--; + pthread_mutex_unlock (&connection_threads_lock); - free (args); return (NULL); } /* }}} void *connection_thread_main */ @@ -1400,6 +1688,8 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } memset (pollfds, 0, sizeof (*pollfds) * pollfds_num); + RRDD_LOG(LOG_INFO, "listening for connections"); + while (do_shutdown == 0) { assert (pollfds_num == ((int) listen_fds_num)); @@ -1471,29 +1761,29 @@ static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */ } /* for (pollfds_num) */ } /* while (do_shutdown == 0) */ + RRDD_LOG(LOG_INFO, "starting shutdown"); + close_listen_sockets (); - pthread_mutex_lock (&connetion_threads_lock); - while (connetion_threads_num > 0) + pthread_mutex_lock (&connection_threads_lock); + while (connection_threads_num > 0) { pthread_t wait_for; - wait_for = connetion_threads[0]; + wait_for = connection_threads[0]; - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); pthread_join (wait_for, /* retval = */ NULL); - pthread_mutex_lock (&connetion_threads_lock); + pthread_mutex_lock (&connection_threads_lock); } - pthread_mutex_unlock (&connetion_threads_lock); + pthread_mutex_unlock (&connection_threads_lock); return (NULL); } /* }}} void *listen_thread_main */ static int daemonize (void) /* {{{ */ { - pid_t child; int status; - char *base_dir; /* These structures are static, because `sigaction' behaves weird if the are * overwritten.. */ @@ -1501,39 +1791,45 @@ static int daemonize (void) /* {{{ */ static struct sigaction sa_term; static struct sigaction sa_pipe; - child = fork (); - if (child < 0) - { - fprintf (stderr, "daemonize: fork(2) failed.\n"); - return (-1); - } - else if (child > 0) + if (!stay_foreground) { - return (1); - } + pid_t child; + char *base_dir; - /* Change into the /tmp directory. */ - base_dir = (config_base_dir != NULL) - ? config_base_dir - : "/tmp"; - status = chdir (base_dir); - if (status != 0) - { - fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); - return (-1); - } + child = fork (); + if (child < 0) + { + fprintf (stderr, "daemonize: fork(2) failed.\n"); + return (-1); + } + else if (child > 0) + { + return (1); + } - /* Become session leader */ - setsid (); + /* Change into the /tmp directory. */ + base_dir = (config_base_dir != NULL) + ? config_base_dir + : "/tmp"; + status = chdir (base_dir); + if (status != 0) + { + fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir); + return (-1); + } + + /* Become session leader */ + setsid (); - /* Open the first three file descriptors to /dev/null */ - close (2); - close (1); - close (0); + /* Open the first three file descriptors to /dev/null */ + close (2); + close (1); + close (0); - open ("/dev/null", O_RDWR); - dup (0); - dup (0); + open ("/dev/null", O_RDWR); + dup (0); + dup (0); + } /* if (!stay_foreground) */ /* Install signal handlers */ memset (&sa_int, 0, sizeof (sa_int)); @@ -1549,6 +1845,7 @@ static int daemonize (void) /* {{{ */ sigaction (SIGPIPE, &sa_pipe, NULL); openlog ("rrdcached", LOG_PID, LOG_DAEMON); + RRDD_LOG(LOG_INFO, "starting up"); cache_tree = g_tree_new ((GCompareFunc) strcmp); if (cache_tree == NULL) @@ -1557,18 +1854,8 @@ static int daemonize (void) /* {{{ */ return (-1); } - memset (&queue_thread, 0, sizeof (queue_thread)); - status = pthread_create (&queue_thread, /* attr = */ NULL, - queue_thread_main, /* args = */ NULL); - if (status != 0) - { - RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed."); - return (-1); - } - - write_pidfile (); - - return (0); + status = write_pidfile (); + return status; } /* }}} int daemonize */ static int cleanup (void) /* {{{ */ @@ -1580,6 +1867,7 @@ static int cleanup (void) /* {{{ */ remove_pidfile (); + RRDD_LOG(LOG_INFO, "goodbye"); closelog (); return (0); @@ -1590,10 +1878,14 @@ static int read_options (int argc, char **argv) /* {{{ */ int option; int status = 0; - while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1) + while ((option = getopt(argc, argv, "gl:f:w:b:z:p:j:h?")) != -1) { switch (option) { + case 'g': + stay_foreground=1; + break; + case 'l': { char **temp; @@ -1647,6 +1939,22 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'z': + { + int temp; + + temp = atoi(optarg); + if (temp > 0) + config_write_jitter = temp; + else + { + fprintf (stderr, "Invalid write jitter: -z %s\n", optarg); + status = 2; + } + + break; + } + case 'b': { size_t len; @@ -1688,6 +1996,41 @@ static int read_options (int argc, char **argv) /* {{{ */ } break; + case 'j': + { + struct stat statbuf; + const char *dir = optarg; + + status = stat(dir, &statbuf); + if (status != 0) + { + fprintf(stderr, "Cannot stat '%s' : %s\n", dir, rrd_strerror(errno)); + return 6; + } + + if (!S_ISDIR(statbuf.st_mode) + || access(dir, R_OK|W_OK|X_OK) != 0) + { + fprintf(stderr, "Must specify a writable directory with -j! (%s)\n", + errno ? rrd_strerror(errno) : ""); + return 6; + } + + journal_cur = malloc(PATH_MAX + 1); + journal_old = malloc(PATH_MAX + 1); + if (journal_cur == NULL || journal_old == NULL) + { + fprintf(stderr, "malloc failure for journal files\n"); + return 6; + } + else + { + snprintf(journal_cur, PATH_MAX, "%s/rrd.journal", dir); + snprintf(journal_old, PATH_MAX, "%s/rrd.journal.old", dir); + } + } + break; + case 'h': case '?': printf ("RRDd %s Copyright (C) 2008 Florian octo Forster\n" @@ -1697,6 +2040,7 @@ static int read_options (int argc, char **argv) /* {{{ */ "Valid options are:\n" " -l
Socket address to listen to.\n" " -w Interval in which to write data.\n" + " -z Delay writes up to seconds to spread load" \ " -f Interval in which to flush dead data.\n" " -p Location of the PID-file.\n" " -b Base directory to change to.\n" @@ -1710,6 +2054,14 @@ static int read_options (int argc, char **argv) /* {{{ */ } /* switch (option) */ } /* while (getopt) */ + /* advise the user when values are not sane */ + if (config_flush_interval < 2 * config_write_interval) + fprintf(stderr, "WARNING: flush interval (-f) should be at least" + " 2x write interval (-w) !\n"); + if (config_write_jitter > config_write_interval) + fprintf(stderr, "WARNING: write delay (-z) should NOT be larger than" + " write interval (-w) !\n"); + return (status); } /* }}} int read_options */ @@ -1742,8 +2094,40 @@ int main (int argc, char **argv) return (1); } - listen_thread_main (NULL); + if (journal_cur != NULL) + { + int had_journal = 0; + + pthread_mutex_lock(&journal_lock); + + RRDD_LOG(LOG_INFO, "checking for journal files"); + had_journal += journal_replay(journal_old); + had_journal += journal_replay(journal_cur); + + if (had_journal) + flush_old_values(-1); + + pthread_mutex_unlock(&journal_lock); + journal_rotate(); + + RRDD_LOG(LOG_INFO, "journal processing complete"); + } + + /* start the queue thread */ + memset (&queue_thread, 0, sizeof (queue_thread)); + status = pthread_create (&queue_thread, + NULL, /* attr */ + queue_thread_main, + NULL); /* args */ + if (status != 0) + { + RRDD_LOG (LOG_ERR, "FATAL: cannot create queue thread"); + cleanup(); + return (1); + } + + listen_thread_main (NULL); cleanup (); return (0); diff --git a/program/src/rrd_dump.c b/program/src/rrd_dump.c index 552c636c..a32f4fb3 100644 --- a/program/src/rrd_dump.c +++ b/program/src/rrd_dump.c @@ -493,43 +493,9 @@ int rrd_dump( return (-1); } - if (opt_daemon == NULL) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - opt_daemon = strdup (temp); - if (opt_daemon == NULL) - { - rrd_set_error("strdup failed."); - return (-1); - } - } - } - - if (opt_daemon != NULL) - { - int status; - - status = rrdc_connect (opt_daemon); - if (status != 0) - { - rrd_set_error ("rrdc_connect failed with status %i.", status); - return (-1); - } - - status = rrdc_flush (argv[optind]); - if (status != 0) - { - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - argv[optind], status); - return (-1); - } - - rrdc_disconnect (); - } /* if (opt_daemon) */ + rc = rrdc_flush_if_daemon(opt_daemon, argv[optind]); + if (opt_daemon) free(opt_daemon); + if (rc) return (rc); if ((argc - optind) == 2) { rc = rrd_dump_opt_r(argv[optind], argv[optind + 1], opt_noheader); diff --git a/program/src/rrd_fetch.c b/program/src/rrd_fetch.c index 563e76b3..568e2622 100644 --- a/program/src/rrd_fetch.c +++ b/program/src/rrd_fetch.c @@ -167,41 +167,9 @@ int rrd_fetch( return -1; } - if (opt_daemon == NULL) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - opt_daemon = strdup (temp); - if (opt_daemon == NULL) - { - rrd_set_error("strdup failed."); - return (-1); - } - } - } - - if (opt_daemon != NULL) - { - status = rrdc_connect (opt_daemon); - if (status != 0) - { - rrd_set_error ("rrdc_connect failed with status %i.", status); - return (-1); - } - - status = rrdc_flush (argv[optind]); - if (status != 0) - { - rrd_set_error ("rrdc_flush (%s) failed with status %i.", - argv[optind], status); - return (-1); - } - - rrdc_disconnect (); - } /* if (opt_daemon) */ + status = rrdc_flush_if_daemon(opt_daemon, argv[optind]); + if (opt_daemon) free (opt_daemon); + if (status) return (-1); cf = argv[optind + 1]; diff --git a/program/src/rrd_flush.c b/program/src/rrd_flush.c new file mode 100644 index 00000000..218a65a4 --- /dev/null +++ b/program/src/rrd_flush.c @@ -0,0 +1,95 @@ +/** + * RRDTool - src/rrd_flush.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include "rrd_tool.h" +#include "rrd_client.h" + +int rrd_cmd_flush (int argc, char **argv) +{ + char *opt_daemon = NULL; + int status; + + /* initialize getopt */ + optind = 0; + opterr = 0; + + while (42) + { + int opt; + static struct option long_options[] = + { + {"daemon", required_argument, 0, 'd'}, + {0, 0, 0, 0} + }; + + opt = getopt_long(argc, argv, "d:", long_options, NULL); + + if (opt == -1) + break; + + switch (opt) + { + case 'd': + if (opt_daemon != NULL) + free (opt_daemon); + opt_daemon = strdup (optarg); + if (opt_daemon == NULL) + { + rrd_set_error ("strdup failed."); + return (-1); + } + break; + + default: + rrd_set_error ("Usage: rrdtool %s [--daemon ] ", + argv[0]); + return (-1); + } + } /* while (42) */ + + if ((argc - optind) != 1) + { + rrd_set_error ("Usage: rrdtool %s [--daemon ] ", argv[0]); + return (-1); + } + + /* try to connect to rrdcached */ + status = rrdc_connect(opt_daemon); + if (opt_daemon) free(opt_daemon); + if (status != 0) return status; + + if (! rrdc_is_connected(opt_daemon)) + { + rrd_set_error ("Daemon address unknown. Please use the \"--daemon\" " + "option to set an address on the command line or set the " + "\"%s\" environment variable.", + ENV_RRDCACHED_ADDRESS); + return (-1); + } + + status = rrdc_flush(argv[optind]); + + return ((status == 0) ? 0 : -1); +} /* int rrd_flush */ + +/* + * vim: set sw=4 sts=4 et fdm=marker : + */ diff --git a/program/src/rrd_graph.c b/program/src/rrd_graph.c index 9acf82e5..ea5d8c3b 100644 --- a/program/src/rrd_graph.c +++ b/program/src/rrd_graph.c @@ -307,11 +307,8 @@ int im_free( if (im == NULL) return 0; - if (im->use_rrdcached) - { - rrdc_disconnect (); - im->use_rrdcached = 0; - } + if (im->daemon_addr != NULL) + free(im->daemon_addr); for (i = 0; i < (unsigned) im->gdes_c; i++) { if (im->gdes[i].data_first) { @@ -845,7 +842,7 @@ int data_fetch( * - a connection to the daemon has been established * - this is the first occurrence of that RRD file */ - if (im->use_rrdcached) + if (rrdc_is_connected(im->daemon_addr)) { int status; @@ -869,7 +866,7 @@ int data_fetch( return (-1); } } - } /* if (im->use_rrdcached) */ + } /* if (rrdc_is_connected()) */ if ((rrd_fetch_fn(im->gdes[i].rrd, im->gdes[i].cf, @@ -3748,6 +3745,7 @@ void rrd_graph_init( #endif #endif im->base = 1000; + im->daemon_addr = NULL; im->draw_x_grid = 1; im->draw_y_grid = 1; im->extra_flags = 0; @@ -3763,7 +3761,6 @@ void rrd_graph_init( im->grinfo_current = (rrd_info_t *) NULL; im->imgformat = IF_PNG; im->imginfo = NULL; - im->use_rrdcached = 0; im->lazy = 0; im->logarithmic = 0; im->maxval = DNAN; @@ -4254,21 +4251,20 @@ void rrd_graph_options( break; case 'd': { - int status; - if (im->use_rrdcached) + if (im->daemon_addr != NULL) { rrd_set_error ("You cannot specify --daemon " "more than once."); return; } - status = rrdc_connect (optarg); - if (status != 0) + + im->daemon_addr = strdup(optarg); + if (im->daemon_addr == NULL) { - rrd_set_error ("rrdc_connect(%s) failed with status %i.", - optarg, status); - return; + rrd_set_error("strdup failed"); + return; } - im->use_rrdcached = 1; + break; } case '?': @@ -4280,24 +4276,9 @@ void rrd_graph_options( } } /* while (1) */ - if (im->use_rrdcached == 0) - { - char *temp; - - temp = getenv (ENV_RRDCACHED_ADDRESS); - if (temp != NULL) - { - int status; - - status = rrdc_connect (temp); - if (status != 0) - { - rrd_set_error ("rrdc_connect(%s) failed with status %i.", - temp, status); - return; - } - im->use_rrdcached = 1; - } + { /* try to connect to rrdcached */ + int status = rrdc_connect(im->daemon_addr); + if (status != 0) return; } pango_cairo_context_set_font_options(pango_layout_get_context(im->layout), im->font_options); diff --git a/program/src/rrd_graph.h b/program/src/rrd_graph.h index cffce4b0..8b86e864 100644 --- a/program/src/rrd_graph.h +++ b/program/src/rrd_graph.h @@ -213,7 +213,7 @@ typedef struct image_desc_t { char *imginfo; /* construct an