From: oetiker Date: Sun, 14 Sep 2008 09:49:03 +0000 (+0000) Subject: RRDcached patch. This implements an infrastructure, where rrd updates can be X-Git-Url: https://git.tokkee.org/?a=commitdiff_plain;h=ac630adec930653637199258efd99024d49325c7;p=rrdtool.git RRDcached patch. This implements an infrastructure, where rrd updates can be sent to a daemon which caches them prior to bulk-updateing rrd files. See the rrdcached manual page. -- Created by Florian Forster with some help from Kevin Brintnall. git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1504 a5681a0c-68f1-0310-ab6d-d61299d08faa --- diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 847c1d1..a2dc232 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -21,7 +21,7 @@ David Grimes SQRT/SORT/REV/SHIFT/TREND David L. Barker xport function bug fixes Evan Miller Multiplicative HW Enhancements Frank Strauss TCL bindings -Florian octo Forster rrd_restore libxml2 rewrite deprecated function export +Florian octo Forster rrd_restore libxml2 rewrite, deprecated function export, rrdcached Henrik Storner functions for min/max values of data in graph Hermann Hueni (SunOS porting) Jakob Ilves HPUX 11 diff --git a/configure.ac b/configure.ac index cc2e998..451502c 100644 --- a/configure.ac +++ b/configure.ac @@ -130,8 +130,9 @@ CONFIGURE_PART(Audit Compilation Environment) dnl Check for the compiler and static/shared library creation. -AC_PROG_CC AC_PROG_CPP +AC_PROG_CC +AM_PROG_CC_C_O AC_PROG_LIBTOOL dnl Try to detect/use GNU features @@ -466,7 +467,7 @@ EX_CHECK_ALL(cairo, cairo_font_options_create, cairo.h, EX_CHECK_ALL(cairo, cairo_svg_surface_create, cairo-svg.h, cairo-svg, 1.4.6, http://cairographics.org/releases/, "") EX_CHECK_ALL(cairo, cairo_pdf_surface_create, cairo-pdf.h, cairo-pdf, 1.4.6, http://cairographics.org/releases/, "") EX_CHECK_ALL(cairo, cairo_ps_surface_create, cairo-ps.h, cairo-ps, 1.4.6, http://cairographics.org/releases/, "") -dnl EX_CHECK_ALL(glib-2.0, glib_check_version, glib.h, glib-2.0, 2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "") +EX_CHECK_ALL(glib-2.0, glib_check_version, glib.h, glib-2.0, 2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "") EX_CHECK_ALL(pango-1.0, pango_cairo_context_set_font_options, pango/pango.h, pangocairo, 1.17, http://ftp.gnome.org/pub/GNOME/sources/pango/1.17, "") EX_CHECK_ALL(xml2, xmlParseFile, libxml/parser.h, libxml-2.0, 2.6.31, http://xmlsoft.org/downloads.html, /usr/include/libxml2) diff --git a/doc/Makefile.am b/doc/Makefile.am index 16fd617..8e52084 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -10,8 +10,8 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej POD = bin_dec_hex.pod rrddump.pod rrdgraph_examples.pod rrdrestore.pod rrdupdate.pod \ cdeftutorial.pod rrdfetch.pod rrdgraph_graph.pod rrdthreads.pod rrdxport.pod \ - rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod \ - rrd-beginners.pod rrdinfo.pod rrdtune.pod rrdbuild.pod \ + rpntutorial.pod rrdfirst.pod rrdgraph_rpn.pod rrdtool.pod rrdcached.pod \ + rrd-beginners.pod rrdinfo.pod rrdtune.pod rrdbuild.pod rrdflush.pod \ rrdcgi.pod rrdgraph.pod rrdlast.pod rrdlastupdate.pod \ rrdcreate.pod rrdgraph_data.pod rrdresize.pod rrdtutorial.pod diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod new file mode 100644 index 0000000..8ad37be --- /dev/null +++ b/doc/rrdcached.pod @@ -0,0 +1,354 @@ +=pod + +=head1 NAME + +rrdcached - Data caching daemon for rrdtool + +=head1 SYNOPSIS + +B [B<-l> I
] [B<-w> I] [B<-f> I] + +=head1 DESCRIPTION + +B is a daemon that receives updates to existing RRD files, +accumulates them and, if enough have been received or a defined time has +passed, writes the updates to the RRD file. A I command may be used to +force writing of values to disk, so that graphing facilities and similar can +work with up-to-date data. + +The daemon was written with big setups in mind. Those setups usually run into +IOErelated problems sooner or later for reasons that are beyond the scope +of this document. Check the wiki at the RRDTool homepage for details. Also +check L below before using this daemon! A detailed +description of how the daemon operates can be found in the L +section below. + +=head1 OPTIONS + +=over 4 + +=item B<-l> I
+ +Tells the daemon to bind to I
and accept incoming connections on that +socket. If I
begins with C, everything following that prefix is +interpreted as the path to a UNIX domain socket. Otherwise the address or node +name are resolved using L. + +If the B<-l> option is not specified the default address, +C, will be used. + +=item B<-w> I + +Data is written to disk every I seconds. If this option is not +specified the default interval of 300Eseconds will be used. + +=item B<-f> I + +Every I seconds the entire cache is searched for old values which are +written to disk. This only concerns files to which updates have stopped, so +setting this to a high value, such as 3600Eseconds, is acceptable in most +cases. This timeout defaults to 3600Eseconds. + +=item B<-p> I + +Sets the name and location of the PID-file. If not specified, the default, +C/run/rrdcached.pid> will be used. + +=item B<-b> I + +The daemon will change into a specific directory at startup. All files passed +to the daemon, that are specified by a B path, will be interpreted +to be relative to this directory. If not given the default, C, will be +used. + + +------------------------+------------------------+ + ! Command line ! File updated ! + +------------------------+------------------------+ + ! foo.rrd ! /tmp/foo.rrd ! + ! foo/bar.rrd ! /tmp/foo/bar.rrd ! + ! /var/lib/rrd/foo.rrd ! /var/lib/rrd/foo.rrd ! + +------------------------+------------------------+ + Paths given on the command line and paths actually + updated by the daemon, assuming the base directory + "/tmp". + +=back + +=head1 EFFECTED RRDTOOL COMMANDS + +The following commands may be made aware of the B using the command +line argument B<--daemon> or the environment variable B: + +=over 4 + +=item B + +=item B + +=item B + +=item B + +=item B + +=item B + +=item B + +=item B + +=item B + +=item B + +=back + +The B command can send values to the daemon instead of writing them to +the disk itself. All other commands can send a B command (see below) to +the daemon before accessing the files, so they work with up-to-date data even +if the cache timeout is large. + +=head1 HOW IT WORKS + +When receiving an update, B does not write to disk but looks for an +entry for that file in its internal tree. If not found, an entry is created +including the current time (called "First" in the diagram below). This time is +B the time specified on the command line but the time the operating system +considers to be "now". The value and time of the value (called "Time" in the +diagram below) are appended to the tree node. + +When appending a value to a tree node, it is checked whether it's time to write +the values to disk. Values are written to disk if +S= timeout>>, where C is the timeout specified +using the B<-w> option, see L. If the values are "old enough" they +will be enqueued in the "update queue", i.Ee. they will be appended to +the linked list shown below. Because the tree nodes and the elements of the +linked list are the same data structures in memory, any update to a file that +has already been enqueued will be written with the next write to the RRD file, +too. + +A separate "update thread" constantly dequeues the first element in the update +queue and writes all its values to the appropriate file. So as long as the +update queue is not empty files are written at the highest possible rate. + +Since the timeout of files is checked only when new values are added to the +file, "dead" files, i.Ee. files that are not updated anymore, would never +be written to disk. Therefore, every now and then, controlled by the B<-f> +option, the entire tree is walked and all "old" values are enqueued. Since this +only affects "dead" files and walking the tree is relatively expensive, you +should set the "flush interval" to a reasonably high value. The default is +3600Eseconds (one hour). + +The downside of caching values is that they won't show up in graphs generated +from the RRDEfiles. To get around this, the daemon provides the "flush +command" to flush specific files. This means that the file is inserted at the +B of the update queue or moved there if it is already enqueued. The flush +command will return after the update thread has dequeued the file, so there is +a good chance that the file has been updated by the time the client receives +the response from the daemon, but there is no guarantee. + + +------+ +------+ +------+ + ! head ! ! root ! ! tail ! + +---+--+ +---+--+ +---+--+ + ! /\ ! + ! / \ ! + ! /\ /\ ! + ! /\/\ \ `----------------- ... --------, ! + V / `-------, ! V + +---+----+---+ +------+-----+ +---+----+---+ + ! File: foo ! ! File: bar ! ! File: qux ! + ! First: 101 ! ! First: 119 ! ! First: 180 ! + ! Next: ---+--->! Next: ---+---> ... --->! Next: - ! + +============+ +============+ +============+ + ! Time: 100 ! ! Time: 120 ! ! Time: 180 ! + ! Value: 10 ! ! Value: 0.1 ! ! Value: 2,2 ! + +------------+ +------------+ +------------+ + ! Time: 110 ! ! Time: 130 ! ! Time: 190 ! + ! Value: 26 ! ! Value: 0.1 ! ! Value: 7,3 ! + +------------+ +------------+ +------------+ + : : : : : : + +------------+ +------------+ +------------+ + ! Time: 230 ! ! Time: 250 ! ! Time: 310 ! + ! Value: 42 ! ! Value: 0.2 ! ! Value: 1,2 ! + +------------+ +------------+ +------------+ + +The above diagram demonstrates: + +=over + +=item * + +Files/values are stored in a (balanced) tree. + +=item * + +Tree nodes and entries in the update queue are the same data structure. + +=item * + +The local time ("First") and the time specified in updates ("Time") may differ. + +=item * + +Timed out values are inserted at the "tail". + +=item * + +Explicitly flushed values are inserted at the "head". + +=item * + +ASCII art rocks. + +=back + +=head1 SECURITY CONSIDERATIONS + +This daemon is meant to improve IOEperformance for setups with thousands +of RRDEfile to be updated. So security measures built into the daemon can +be summarized easily: B + +There is no authentication and authorization, so B will have to take care +that only authorized clients can talk to the daemon. Since we assume that graph +collection is done on a dedicated machine, i.Ee. the box doesn't do +anything else and especially does not have any interactive logins other than +root, a UNIX domain socket should take care of that. + +If you (want to) use the network capability, i.Ee. let the daemon bind to +an IPv4 or IPv6 socket, it is B job to install a packet filter or similar +mechanism to prevent unauthorized connections. Unless you have a dedicated VLAN +or VPN for this, using the network option is probably a bad idea! + +The daemon will blindly write to any file it gets told, so you really should +create a separate user just for this daemon. Also it does not do any sanity +checks, so if it gets told to write values for a time far in the future, your +files will be messed up good! + +You have been warned. + +=head1 PROTOCOL + +The daemon communicates with clients using a line based ASCII protocol which is +easy to read and easy to type. This makes it easy for scripts to implement the +protocol and possible for users to use L to connect to the daemon +and test stuff "by hand". + +The protocol is line based, this means that each record consists of one or more +lines. A line is terminated by the line feed character C<0x0A>, commonly +written as C<\n>. In the examples below, this character will be written as +CLFE> ("line feed"). + +After the connection has been established, the client is expected to send a +"command". A command consists of the command keyword, possibly some arguments, +and a terminating newline character. For a list of commands, see +L below. + +Example: + + FLUSH /tmp/foo.rrd + +The daemon answers with a line consisting of a status code and a short status +message, separated by one or more space characters. A negative status code +signals an error, a positive status code or zero signal success. If the status +code is greater than zero, it indicates the number of lines that follow the +status line. + +Examples: + + 0 Success + + 2 Two lines follow + This is the first line + And this is the second line + +=head2 Valid Commands + +The following commands are understood by the daemon: + +=over 4 + +=item B I + +Causes the daemon to put I to the B of the update queue +(possibly moving it there if the node is already enqueued). The answer will be +sent B the node has been dequeued. + +=item B [I] + +Returns a short usage message. If no command is given, or I is +B, a list of commands supported by the daemon is returned. Otherwise a +short description, possibly containing a pointer to a manual page, is returned. +Obviously, this is meant for interactive usage and the format in which the +commands and usage summaries are returned is not well defined. + +=item B + +Returns a list of metrics which can be used to measure the daemons performance +and check its status. For a description of the values returned, see +L below. + +The format in which the values are returned is similar to many other line based +protocols: Each value is printed on a separate line, each consisting of the +name of the value, a colon, one or more spaces and the actual value. + +Example: + + 5 Statistics follow + QueueLength: 0 + UpdatesWritten: 13 + DataSetsWritten: 390 + TreeNodesNumber: 13 + TreeDepth: 4 + +=item B I I [I ...] + +Adds more data to a filename. This is B operation the daemon was designed +for, so describing the mechanism again is unnecessary. Read L +above for a detailed explanation. + +=back + +=head2 Performance Values + +The following counters are returned by the B command: + +=over 4 + +=item B I<(unsigned 64bit integer)> + +Number of nodes currently enqueued in the update queue. + +=item B I<(unsigned 64bit integer)> + +Depth of the tree used for fast key lookup. + +=item B I<(unsigned 64bit integer)> + +Number of nodes in the cache. + +=item B I<(unsigned 64bit integer)> + +Total number of updates, i.Ee. calls to C, since the daemon +was started. + +=item B I<(unsigned 64bit integer)> + +Total number of "data sets" written to disk since the daemon was started. A +data set is one or more values passed to the B command. For example: +C is one data set with two values. The term "data set" is used to +prevent confusion whether individual values or groups of values are counted. + +=back + +=head1 BUGS + +No known bugs at the moment. + +=head1 SEE ALSO + +L, L + +=head1 AUHOR + +B and this manual page have been written by Florian Forster +EoctoEatEverplant.orgE. diff --git a/doc/rrddump.pod b/doc/rrddump.pod index a698d84..89e378b 100644 --- a/doc/rrddump.pod +++ b/doc/rrddump.pod @@ -4,11 +4,16 @@ rrddump - dump the contents of an RRD to XML format =head1 SYNOPSIS -B B S<[B<--no-header>|B<-n>]> I E I +B B I +S<[B<--no-header>|B<-n>]> +S<[B<--daemon> I
]> +S I> or -B B S<[B<--no-header>|B<-n>]> I I +B B I I +S<[B<--no-header>|B<-n>]> +S<[B<--daemon> I
]> =head1 DESCRIPTION @@ -31,13 +36,24 @@ The name of the B you want to dump. The (optional) filename that you want to write the XML output to. If not specified, the XML will be printed to stdout. -=item S<[B<--no-header>|B<-n>]> +=item B<--no-header>|B<-n> In rrdtool 1.3, the dump function started producing correct xml-headers. Unfortunately the rrdtool restore function from the 1.2 series can not handle these headers. With this option you can supress the creatinon of the xml headers. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool dump --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd + =back =head1 EXAMPLES @@ -62,6 +78,21 @@ B for details. =back +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cdump>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + =head1 AUTHOR Tobias Oetiker Etobi@oetiker.chE diff --git a/doc/rrdfetch.pod b/doc/rrdfetch.pod index 51b5ccd..d187b69 100644 --- a/doc/rrdfetch.pod +++ b/doc/rrdfetch.pod @@ -8,6 +8,7 @@ B B I I S<[B<--resolution>|B<-r> I]> S<[B<--start>|B<-s> I]> S<[B<--end>|B<-e> I]> +S<[B<--daemon> I
]> =head1 DESCRIPTION @@ -48,6 +49,17 @@ the end of the time series in seconds since epoch. See also AT-STYLE TIME SPECIFICATION section for a detailed explanation of how to specify the end time. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool fetch --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd AVERAGE + =back =head2 RESOLUTION INTERVAL @@ -257,6 +269,22 @@ I<931225537> -- 18:45 July 5th, 1999 I<19970703 12:45> -- 12:45 July 3th, 1997 (my favorite, and its even got an ISO number (8601)). +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cfetch>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + =head1 AUTHOR -Tobias Oetiker +Tobias Oetiker Etobi@oetiker.chE + diff --git a/doc/rrdgraph.pod b/doc/rrdgraph.pod index c2e9b3d..2c57dbb 100644 --- a/doc/rrdgraph.pod +++ b/doc/rrdgraph.pod @@ -248,7 +248,7 @@ to the more robust B<--alt-y-grid> mode. How many digits should rrdtool assume the y-axis labels to be? You may have to use this option to make enough space once you start -fideling with the y-axis labeling. +fiddling with the y-axis labeling. [B<--units=si>] @@ -267,6 +267,17 @@ Note, that only the image size will be returned, if you run with lazy even when using graphv and even when using PRINT. +[B<--daemon> I
] + +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows the graph to contain +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool graph [...] --daemon unix:/var/run/rrdcached.sock [...] + [B<-f>|B<--imginfo> I] After the image has been created, the graph function uses printf @@ -469,6 +480,21 @@ There is more information returned than in the standard interface. Especially the 'graph_*' keys are new. They help applications that want to know what is where on the graph. +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cgraph>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + =head1 SEE ALSO L gives an overview of how B works. diff --git a/doc/rrdinfo.pod b/doc/rrdinfo.pod index e83d8d6..2af411a 100644 --- a/doc/rrdinfo.pod +++ b/doc/rrdinfo.pod @@ -4,7 +4,8 @@ rrdinfo - extract header information from an RRD =head1 SYNOPSIS -B B I +B B I +S<[B<--daemon> I
]> =head1 DESCRIPTION @@ -14,6 +15,25 @@ a parsing friendly format. Check L if you are uncertain about the meaning of the individual keys. +=over 8 + +=item I + +The name of the B you want to examine. + +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool info --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd + +=back + =head1 EXAMPLE This is the output generated by running B on a simple RRD which @@ -48,11 +68,18 @@ data sources. rra[0].cdp_prep[1].value = nan rra[0].cdp_prep[1].unknown_datapoints = 0 -=over 8 +=head1 ENVIRONMENT VARIABLES -=item I +The following environment variables may be used to change the behavior of +Cinfo>: -The name of the B you want to examine. +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. =back diff --git a/doc/rrdlast.pod b/doc/rrdlast.pod index a3eb7fe..ecec65a 100644 --- a/doc/rrdlast.pod +++ b/doc/rrdlast.pod @@ -5,6 +5,7 @@ rrdlast - Return the date of the last data sample in an RRD =head1 SYNOPSIS B B I +S<[B<--daemon> I
]> =head1 DESCRIPTION @@ -17,6 +18,32 @@ update of the RRD. The name of the B that contains the data. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool last --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd + +=back + +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Clast>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + =back =head1 AUTHOR diff --git a/doc/rrdlastupdate.pod b/doc/rrdlastupdate.pod index 37013cf..db506ab 100644 --- a/doc/rrdlastupdate.pod +++ b/doc/rrdlastupdate.pod @@ -5,6 +5,7 @@ rrdlastupdate - Return the most recent update to an RRD =head1 SYNOPSIS B B I +S<[B<--daemon> I
]> =head1 DESCRIPTION @@ -17,11 +18,37 @@ value stored for each datum in the most recent update of an RRD. The name of the B that contains the data. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool lastupdate --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd + +=back + +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Clastupdate>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + =back =head1 AUTHOR -Andy Riebs +Andy Riebs Eandy.riebs@hp.comE diff --git a/doc/rrdtool.pod b/doc/rrdtool.pod index 154afe7..84749af 100644 --- a/doc/rrdtool.pod +++ b/doc/rrdtool.pod @@ -91,7 +91,11 @@ Change the size of individual RRAs. This is dangerous! Check L. =item B -Export data retrieved from one or several RRDs. Check L +Export data retrieved from one or several RRDs. Check L. + +=item B + +Flush the values for a spcific RRD file from memory. Check L. =item B @@ -298,9 +302,17 @@ sockets, tools like netcat, or in a quick interactive test by using B that there is no authentication with this feature! Do not setup such a port unless you are sure what you are doing. +=head1 RRDCACHED, THE CACHING DAEMON + +For very big setups, updating thousands of RRD files often becomes a serious IO +problem. If you run into such problems, you might want to take a look at +L, a caching daemon for RRDTool which may help you lessen the +stress on your disks. + =head1 SEE ALSO -rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport +rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport, +rrdflush, rrdcached =head1 BUGS diff --git a/doc/rrdupdate.pod b/doc/rrdupdate.pod index cc0b452..82c6603 100644 --- a/doc/rrdupdate.pod +++ b/doc/rrdupdate.pod @@ -6,6 +6,7 @@ rrdupdate - Store a new set of values into the RRD B {B | B} I S<[B<--template>|B<-t> I[B<:>I]...]> +S<[B<--daemon> I
]> S|IB<:>I[B<:>I...]> SB<@>I[B<:>I...]> S<[IB<:>I[B<:>I...] ...]> @@ -29,6 +30,9 @@ RRA (consolidation function and PDPs per CDP), and data source (name). Note that depending on the arguments of the current and previous call to update, the list may have no entries or a large number of entries. +Since B requires direct disk access, the B<--daemon> option cannot be +used with this command. + =item I The name of the B you want to update. @@ -56,6 +60,18 @@ function. If this is done accidentally (and this can only be done using the template switch), B will ignore the value specified for the COMPUTE B. +=item B<--daemon> I
+ +If given, B will try to connect to the caching daemon L +at I
and will fail if the connection cannot be established. If the +connection is successfully established the values will be sent to the daemon +instead of accessing the files directly. If I
begins with C +then everything after this prefix will be considered to be a UNIX domain +socket, see L below. Otherwise the address is interpreted as network +address or node name as understood by L. One practical +consequence is that both, IPv4 and IPv6, may be used if the system supports +it. This option is available for the B command only. + =item B|IB<:>I[B<:>I...] The data used for updating the RRD was acquired at a certain @@ -82,20 +98,65 @@ separator. =back -=head1 EXAMPLE +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cupdate>: + +=over + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back + +=head1 EXAMPLES + +=over + +=item * C Update the database file demo1.rrd with 3 known and one I<*UNKNOWN*> value. Use the current time as the update time. +=item * + C Update the database file demo2.rrd which expects data from a single data-source, three times. First with an I<*UNKNOWN*> value then with two regular readings. The update interval seems to be around 300 seconds. -=head1 AUTHOR +=item * + +C + +Update the file C with a single data source, using the +current time. If the caching daemon cannot be reached, do B fall back to +direct file access. + +=item * + +C + +Use the UNIX domain socket C to contact the caching daemon. If +the caching daemon is not available, update the file C directly. +B Since a relative path is specified, the following disturbing effect +may occur: If the daemon is available, the file relative to the working +directory B is used. If the daemon is not available, the file +relative to the current working directory of the invoking process is used. +B Don't do relative paths, kids! + +=back + +=head1 AUTHORS -Tobias Oetiker +Tobias Oetiker , +Florian Forster atEverplant.org> diff --git a/doc/rrdxport.pod b/doc/rrdxport.pod index a668a20..11159e4 100644 --- a/doc/rrdxport.pod +++ b/doc/rrdxport.pod @@ -9,6 +9,7 @@ S<[B<-s>|B<--start> I]> S<[B<-e>|B<--end> I]> S<[B<-m>|B<--maxrows> I]> S<[B<--step> I]> +S<[B<--daemon> I
]> S<[BIB<=>IB<:>IB<:>I]> S<[BIB<=>I]> S<[BB<:>I[B<:>I]]> @@ -48,6 +49,17 @@ for details. See L documentation. +=item B<--daemon> I
+ +Address of the L daemon. If specified, a C command is sent +to the server before reading the RRD files. This allows B to return +fresh data even if the daemon is configured to cache values for a long time. To +specify a UNIX domain socket use the prefix C, see example below. Other +addresses are interpreted as normal network addresses, i.Ee. IPv4 or IPv6 +addresses in most cases. + + rrdtool xport --daemon unix:/var/run/rrdcached.sock ... + =item B<--enumds> The generated xml should contain the data values in enumerated tags. @@ -137,6 +149,20 @@ The resulting data section is: XPORT:out2:"if2 out bytes" \ XPORT:sum:"output sum" +=head1 ENVIRONMENT VARIABLES + +The following environment variables may be used to change the behavior of +Cxport>: + +=over 4 + +=item B + +If this environment variable is set it will have the same effect as specifying +the C<--daemon> option on the command line. If both are present, the command +line argument takes precedence. + +=back =head1 AUTHOR diff --git a/src/Makefile.am b/src/Makefile.am index 8ff671d..420aeab 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -23,6 +23,7 @@ UPD_C_FILES = \ rrd_info.c \ rrd_error.c \ rrd_open.c \ + rrd_client.c \ rrd_nan_inf.c \ rrd_rpncalc.c \ rrd_update.c @@ -41,6 +42,7 @@ RRD_C_FILES = \ rrd_xport.c \ rrd_gfx.c \ rrd_dump.c \ + rrd_flush.c \ rrd_fetch.c \ rrd_resize.c \ rrd_tune.c @@ -82,9 +84,9 @@ librrd_th_la_LDFLAGS = $(MULTITHREAD_LDFLAGS) -version-info @LIBVERS@ librrd_th_la_LDFLAGS += -export-symbols librrd.sym librrd_th_la_LIBADD = $(ALL_LIBS) -include_HEADERS = rrd.h rrd_format.h +include_HEADERS = rrd.h rrd_format.h rrd_client.h -bin_PROGRAMS = rrdtool rrdupdate +bin_PROGRAMS = rrdtool rrdupdate rrdcached if BUILD_RRDCGI bin_PROGRAMS += rrdcgi @@ -100,6 +102,11 @@ rrdtool_SOURCES = rrd_tool.c rrdtool_DEPENDENCIES = librrd.la rrdtool_LDADD = librrd.la +rrdcached_SOURCES = rrd_daemon.c +rrdcached_DEPENDENCIES = librrd.la +rrdcached_CPPFLAGS = -DVERSION='"$(VERSION)"' -DLOCALSTATEDIR='"$(localstatedir)"' +rrdcached_LDADD = librrd.la + # strftime is here because we do not usually need it. unices have propper # iso date support EXTRA_DIST= strftime.c strftime.h rrd_getopt.c rrd_getopt1.c rrd_getopt.h \ diff --git a/src/librrd.sym.in b/src/librrd.sym.in index a178f55..22afc86 100644 --- a/src/librrd.sym.in +++ b/src/librrd.sym.in @@ -1,5 +1,6 @@ rrd_clear_error rrd_close +rrd_cmd_flush rrd_create rrd_create_r rrd_dontneed @@ -25,6 +26,7 @@ rrd_init rrd_last rrd_last_r rrd_lastupdate +rrd_lastupdate_r rrd_lock rrd_new_context rrd_open @@ -48,4 +50,8 @@ rrd_update_v rrd_version rrd_write rrd_xport +rrdc_connect +rrdc_disconnect +rrdc_flush +rrdc_update @RRD_GETOPT_LONG@ diff --git a/src/rrd.h b/src/rrd.h index a428370..ad7e15c 100644 --- a/src/rrd.h +++ b/src/rrd.h @@ -171,13 +171,7 @@ extern "C" { time_t rrd_last( int, char **); - int rrd_lastupdate( - int argc, - char **argv, - time_t *last_update, - unsigned long *ds_cnt, - char ***ds_namv, - char ***last_ds); + int rrd_lastupdate(int argc, char **argv); time_t rrd_first( int, char **); @@ -198,6 +192,7 @@ extern "C" { unsigned long *, char ***, rrd_value_t **); + int rrd_cmd_flush (int argc, char **argv); void rrd_freemem( void *mem); @@ -217,20 +212,24 @@ extern "C" { const char *_template, int argc, const char **argv); - int rrd_fetch_r( - const char *filename, - const char *cf, - time_t *start, - time_t *end, - unsigned long *step, - unsigned long *ds_cnt, - char ***ds_namv, - rrd_value_t **data); + int rrd_fetch_r ( + const char *filename, + const char *cf, + time_t *start, + time_t *end, + unsigned long *step, + unsigned long *ds_cnt, + char ***ds_namv, + rrd_value_t **data); int rrd_dump_r( const char *filename, char *outname); - time_t rrd_last_r( - const char *filename); + time_t rrd_last_r (const char *filename); + int rrd_lastupdate_r (const char *filename, + time_t *ret_last_update, + unsigned long *ret_ds_count, + char ***ret_ds_names, + char ***ret_last_ds); time_t rrd_first_r( const char *filename, int rraindex); diff --git a/src/rrd_client.c b/src/rrd_client.c new file mode 100644 index 0000000..f1253f8 --- /dev/null +++ b/src/rrd_client.c @@ -0,0 +1,436 @@ +/** + * RRDTool - src/rrd_client.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#include "rrd.h" +#include "rrd_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +static int sd = -1; + +static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (sd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + { + close (sd); + sd = -1; + errno = EPROTO; + return (-1); + } + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (sd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + { + close (sd); + sd = -1; + return (status); + } + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + +static int buffer_add_string (const char *str, /* {{{ */ + char **buffer_ret, size_t *buffer_size_ret) +{ + char *buffer; + size_t buffer_size; + size_t buffer_pos; + size_t i; + int status; + + buffer = *buffer_ret; + buffer_size = *buffer_size_ret; + buffer_pos = 0; + + i = 0; + status = -1; + while (buffer_pos < buffer_size) + { + if (str[i] == 0) + { + buffer[buffer_pos] = ' '; + buffer_pos++; + status = 0; + break; + } + else if ((str[i] == ' ') || (str[i] == '\\')) + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer[buffer_pos] = '\\'; + buffer_pos++; + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + else + { + buffer[buffer_pos] = str[i]; + buffer_pos++; + } + i++; + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (-1); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + + return (0); +} /* }}} int buffer_add_string */ + +static int buffer_add_value (const char *value, /* {{{ */ + char **buffer_ret, size_t *buffer_size_ret) +{ + char temp[4096]; + + if (strncmp (value, "N:", 2) == 0) + snprintf (temp, sizeof (temp), "%lu:%s", + (unsigned long) time (NULL), value + 2); + else + strncpy (temp, value, sizeof (temp)); + temp[sizeof (temp) - 1] = 0; + + return (buffer_add_string (temp, buffer_ret, buffer_size_ret)); +} /* }}} int buffer_add_value */ + +static int rrdc_connect_unix (const char *path) /* {{{ */ +{ + struct sockaddr_un sa; + int status; + + assert (path != NULL); + + pthread_mutex_lock (&lock); + + if (sd >= 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0); + if (sd < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + + memset (&sa, 0, sizeof (sa)); + sa.sun_family = AF_UNIX; + strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1); + + status = connect (sd, (struct sockaddr *) &sa, sizeof (sa)); + if (status != 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + + pthread_mutex_unlock (&lock); + + return (0); +} /* }}} int rrdc_connect_unix */ + +int rrdc_connect (const char *addr) /* {{{ */ +{ + struct addrinfo ai_hints; + struct addrinfo *ai_res; + struct addrinfo *ai_ptr; + int status; + + if (addr == NULL) + addr = RRDCACHED_DEFAULT_ADDRESS; + + if (strncmp ("unix:", addr, strlen ("unix:")) == 0) + return (rrdc_connect_unix (addr + strlen ("unix:"))); + else if (addr[0] == '/') + return (rrdc_connect_unix (addr)); + + pthread_mutex_lock (&lock); + + if (sd >= 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + memset (&ai_hints, 0, sizeof (ai_hints)); + ai_hints.ai_flags = 0; +#ifdef AI_ADDRCONFIG + ai_hints.ai_flags |= AI_ADDRCONFIG; +#endif + ai_hints.ai_family = AF_UNSPEC; + ai_hints.ai_socktype = SOCK_STREAM; + + ai_res = NULL; + status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next) + { + sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol); + if (sd < 0) + { + status = errno; + sd = -1; + continue; + } + + status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen); + if (status != 0) + { + status = errno; + close (sd); + sd = -1; + continue; + } + + assert (status == 0); + break; + } /* for (ai_ptr) */ + pthread_mutex_unlock (&lock); + + return (status); +} /* }}} int rrdc_connect */ + +int rrdc_disconnect (void) /* {{{ */ +{ + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (0); + } + + close (sd); + sd = -1; + + pthread_mutex_unlock (&lock); + + return (0); +} /* }}} int rrdc_disconnect */ + +int rrdc_update (const char *filename, int values_num, /* {{{ */ + const char * const *values) +{ + char buffer[4096]; + char *buffer_ptr; + size_t buffer_free; + size_t buffer_size; + int status; + int i; + + memset (buffer, 0, sizeof (buffer)); + buffer_ptr = &buffer[0]; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("update", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + for (i = 0; i < values_num; i++) + { + status = buffer_add_value (values[i], &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + } + + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + status = swrite (buffer, buffer_size); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + status = atoi (buffer); + return (status); +} /* }}} int rrdc_update */ + +int rrdc_flush (const char *filename) /* {{{ */ +{ + char buffer[4096]; + char *buffer_ptr; + size_t buffer_free; + size_t buffer_size; + int status; + + if (filename == NULL) + return (-1); + + memset (buffer, 0, sizeof (buffer)); + buffer_ptr = &buffer[0]; + buffer_free = sizeof (buffer); + + status = buffer_add_string ("flush", &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + status = buffer_add_string (filename, &buffer_ptr, &buffer_free); + if (status != 0) + return (ENOBUFS); + + assert (buffer_free < sizeof (buffer)); + buffer_size = sizeof (buffer) - buffer_free; + assert (buffer[buffer_size - 1] == ' '); + buffer[buffer_size - 1] = '\n'; + + pthread_mutex_lock (&lock); + + if (sd < 0) + { + pthread_mutex_unlock (&lock); + return (ENOTCONN); + } + + status = swrite (buffer, buffer_size); + if (status != 0) + { + pthread_mutex_unlock (&lock); + return (status); + } + + status = sread (buffer, sizeof (buffer)); + if (status < 0) + { + status = errno; + pthread_mutex_unlock (&lock); + return (status); + } + else if (status == 0) + { + pthread_mutex_unlock (&lock); + return (ENODATA); + } + + pthread_mutex_unlock (&lock); + + status = atoi (buffer); + return (status); +} /* }}} int rrdc_flush */ + +/* + * vim: set sw=2 sts=2 ts=8 et fdm=marker : + */ diff --git a/src/rrd_client.h b/src/rrd_client.h new file mode 100644 index 0000000..92d4c07 --- /dev/null +++ b/src/rrd_client.h @@ -0,0 +1,40 @@ +/** + * RRDTool - src/rrd_client.h + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +#ifndef __RRD_CLIENT_H +#define __RRD_CLIENT_H 1 + +#ifndef RRDCACHED_DEFAULT_ADDRESS +# define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock" +#endif + +#define RRDCACHED_DEFAULT_PORT "42217" +#define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS" + +int rrdc_connect (const char *addr); +int rrdc_disconnect (void); + +int rrdc_update (const char *filename, int values_num, + const char * const *values); + +int rrdc_flush (const char *filename); + +#endif /* __RRD_CLIENT_H */ diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c new file mode 100644 index 0000000..bc299f8 --- /dev/null +++ b/src/rrd_daemon.c @@ -0,0 +1,1754 @@ +/** + * RRDTool - src/rrd_daemon.c + * Copyright (C) 2008 Florian octo Forster + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Florian octo Forster + **/ + +/* + * First tell the compiler to stick to the C99 and POSIX standards as close as + * possible. + */ +#ifndef __STRICT_ANSI__ /* {{{ */ +# define __STRICT_ANSI__ +#endif + +#ifndef _ISOC99_SOURCE +# define _ISOC99_SOURCE +#endif + +#ifdef _POSIX_C_SOURCE +# undef _POSIX_C_SOURCE +#endif +#define _POSIX_C_SOURCE 200112L + +/* Single UNIX needed for strdup. */ +#ifdef _XOPEN_SOURCE +# undef _XOPEN_SOURCE +#endif +#define _XOPEN_SOURCE 500 + +#ifndef _REENTRANT +# define _REENTRANT +#endif + +#ifndef _THREAD_SAFE +# define _THREAD_SAFE +#endif + +#ifdef _GNU_SOURCE +# undef _GNU_SOURCE +#endif +/* }}} */ + +/* + * Now for some includes.. + */ +#include "rrd.h" /* {{{ */ +#include "rrd_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +/* }}} */ + +#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__) + +#ifndef __GNUC__ +# define __attribute__(x) /**/ +#endif + +/* + * Types + */ +struct listen_socket_s +{ + int fd; + char path[PATH_MAX + 1]; +}; +typedef struct listen_socket_s listen_socket_t; + +struct cache_item_s; +typedef struct cache_item_s cache_item_t; +struct cache_item_s +{ + char *file; + char **values; + int values_num; + time_t last_flush_time; +#define CI_FLAGS_IN_TREE 0x01 +#define CI_FLAGS_IN_QUEUE 0x02 + int flags; + + cache_item_t *next; +}; + +struct callback_flush_data_s +{ + time_t now; + time_t abs_timeout; + char **keys; + size_t keys_num; +}; +typedef struct callback_flush_data_s callback_flush_data_t; + +enum queue_side_e +{ + HEAD, + TAIL +}; +typedef enum queue_side_e queue_side_t; + +/* + * Variables + */ +static listen_socket_t *listen_fds = NULL; +static size_t listen_fds_num = 0; + +static int do_shutdown = 0; + +static pthread_t queue_thread; + +static pthread_t *connetion_threads = NULL; +static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER; +static int connetion_threads_num = 0; + +/* Cache stuff */ +static GTree *cache_tree = NULL; +static cache_item_t *cache_queue_head = NULL; +static cache_item_t *cache_queue_tail = NULL; +static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cache_cond = PTHREAD_COND_INITIALIZER; + +static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; + +static int config_write_interval = 300; +static int config_flush_interval = 3600; +static char *config_pid_file = NULL; +static char *config_base_dir = NULL; + +static char **config_listen_address_list = NULL; +static int config_listen_address_list_len = 0; + +static uint64_t stats_queue_length = 0; +static uint64_t stats_updates_written = 0; +static uint64_t stats_data_sets_written = 0; +static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; + +/* + * Functions + */ +static void sig_int_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_int_handler */ + +static void sig_term_handler (int s __attribute__((unused))) /* {{{ */ +{ + do_shutdown++; +} /* }}} void sig_term_handler */ + +static int write_pidfile (void) /* {{{ */ +{ + pid_t pid; + char *file; + FILE *fh; + + pid = getpid (); + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + fh = fopen (file, "w"); + if (fh == NULL) + { + RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file); + return (-1); + } + + fprintf (fh, "%i\n", (int) pid); + fclose (fh); + + return (0); +} /* }}} int write_pidfile */ + +static int remove_pidfile (void) /* {{{ */ +{ + char *file; + int status; + + file = (config_pid_file != NULL) + ? config_pid_file + : LOCALSTATEDIR "/run/rrdcached.pid"; + + status = unlink (file); + if (status == 0) + return (0); + return (errno); +} /* }}} int remove_pidfile */ + +static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */ +{ + char *buffer; + size_t buffer_used; + size_t buffer_free; + ssize_t status; + + buffer = (char *) buffer_void; + buffer_used = 0; + buffer_free = buffer_size; + + while (buffer_free > 0) + { + status = read (fd, buffer + buffer_used, buffer_free); + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (-1); + + if (status == 0) + return (0); + + assert ((0 > status) || (buffer_free >= (size_t) status)); + + buffer_free = buffer_free - status; + buffer_used = buffer_used + status; + + if (buffer[buffer_used - 1] == '\n') + break; + } + + assert (buffer_used > 0); + + if (buffer[buffer_used - 1] != '\n') + { + errno = ENOBUFS; + return (-1); + } + + buffer[buffer_used - 1] = 0; + + /* Fix network line endings. */ + if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r')) + { + buffer_used--; + buffer[buffer_used - 1] = 0; + } + + return (buffer_used); +} /* }}} ssize_t sread */ + +static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */ +{ + const char *ptr; + size_t nleft; + ssize_t status; + + ptr = (const char *) buf; + nleft = count; + + while (nleft > 0) + { + status = write (fd, (const void *) ptr, nleft); + + if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR))) + continue; + + if (status < 0) + return (status); + + nleft = nleft - status; + ptr = ptr + status; + } + + return (0); +} /* }}} ssize_t swrite */ + +/* + * enqueue_cache_item: + * `cache_lock' must be acquired before calling this function! + */ +static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ + queue_side_t side) +{ + int did_insert = 0; + + if (ci == NULL) + return (-1); + + if (ci->values_num == 0) + return (0); + + if (side == HEAD) + { + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + { + assert (ci->next == NULL); + ci->next = cache_queue_head; + cache_queue_head = ci; + + if (cache_queue_tail == NULL) + cache_queue_tail = cache_queue_head; + + did_insert = 1; + } + else if (cache_queue_head == ci) + { + /* do nothing */ + } + else /* enqueued, but not first entry */ + { + cache_item_t *prev; + + /* find previous entry */ + for (prev = cache_queue_head; prev != NULL; prev = prev->next) + if (prev->next == ci) + break; + assert (prev != NULL); + + /* move to the front */ + prev->next = ci->next; + ci->next = cache_queue_head; + cache_queue_head = ci; + + /* check if we need to adapt the tail */ + if (cache_queue_tail == ci) + cache_queue_tail = prev; + } + } + else /* (side == TAIL) */ + { + /* We don't move values back in the list.. */ + if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + return (0); + + assert (ci->next == NULL); + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; + cache_queue_tail = ci; + + did_insert = 1; + } + + ci->flags |= CI_FLAGS_IN_QUEUE; + + if (did_insert) + { + pthread_mutex_lock (&stats_lock); + stats_queue_length++; + pthread_mutex_unlock (&stats_lock); + } + + return (0); +} /* }}} int enqueue_cache_item */ + +/* + * tree_callback_flush: + * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held + * while this is in progress. + */ +static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ + gpointer data) +{ + cache_item_t *ci; + callback_flush_data_t *cfd; + + ci = (cache_item_t *) value; + cfd = (callback_flush_data_t *) data; + + if ((ci->last_flush_time <= cfd->abs_timeout) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if ((do_shutdown != 0) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num > 0)) + { + enqueue_cache_item (ci, TAIL); + } + else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && (ci->values_num <= 0)) + { + char **temp; + + temp = (char **) realloc (cfd->keys, + sizeof (char *) * (cfd->keys_num + 1)); + if (temp == NULL) + { + RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed."); + return (FALSE); + } + cfd->keys = temp; + /* Make really sure this points to the _same_ place */ + assert ((char *) key == ci->file); + cfd->keys[cfd->keys_num] = (char *) key; + cfd->keys_num++; + } + + return (FALSE); +} /* }}} gboolean tree_callback_flush */ + +static int flush_old_values (int max_age) +{ + callback_flush_data_t cfd; + size_t k; + + memset (&cfd, 0, sizeof (cfd)); + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + cfd.now = time (NULL); + cfd.keys = NULL; + cfd.keys_num = 0; + + if (max_age > 0) + cfd.abs_timeout = cfd.now - max_age; + else + cfd.abs_timeout = cfd.now + 1; + + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); + + for (k = 0; k < cfd.keys_num; k++) + { + cache_item_t *ci; + + /* This must not fail. */ + ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]); + assert (ci != NULL); + + /* If we end up here with values available, something's seriously + * messed up. */ + assert (ci->values_num == 0); + + /* Remove the node from the tree */ + g_tree_remove (cache_tree, cfd.keys[k]); + cfd.keys[k] = NULL; + + /* Now free and clean up `ci'. */ + free (ci->file); + ci->file = NULL; + free (ci); + ci = NULL; + } /* for (k = 0; k < cfd.keys_num; k++) */ + + if (cfd.keys != NULL) + { + free (cfd.keys); + cfd.keys = NULL; + } + + return (0); +} /* int flush_old_values */ + +static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */ +{ + struct timeval now; + struct timespec next_flush; + + gettimeofday (&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; + + pthread_mutex_lock (&cache_lock); + while ((do_shutdown == 0) || (cache_queue_head != NULL)) + { + cache_item_t *ci; + char *file; + char **values; + int values_num; + int status; + int i; + + /* First, check if it's time to do the cache flush. */ + gettimeofday (&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) + { + /* Flush all values that haven't been written in the last + * `config_write_interval' seconds. */ + flush_old_values (config_write_interval); + + /* Determine the time of the next cache flush. */ + while (next_flush.tv_sec < now.tv_sec) + next_flush.tv_sec += config_flush_interval; + } + + /* Now, check if there's something to store away. If not, wait until + * something comes in or it's time to do the cache flush. */ + if (cache_queue_head == NULL) + { + status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush); + if ((status != 0) && (status != ETIMEDOUT)) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "pthread_cond_timedwait returned %i.", status); + } + } + + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ + if (cache_queue_head == NULL) + continue; + + ci = cache_queue_head; + + /* copy the relevant parts */ + file = strdup (ci->file); + if (file == NULL) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed."); + continue; + } + + values = ci->values; + values_num = ci->values_num; + + ci->values = NULL; + ci->values_num = 0; + + ci->last_flush_time = time (NULL); + ci->flags &= ~(CI_FLAGS_IN_QUEUE); + + cache_queue_head = ci->next; + if (cache_queue_head == NULL) + cache_queue_tail = NULL; + ci->next = NULL; + + pthread_mutex_lock (&stats_lock); + assert (stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock (&stats_lock); + + pthread_mutex_unlock (&cache_lock); + + status = rrd_update_r (file, NULL, values_num, (void *) values); + if (status != 0) + { + RRDD_LOG (LOG_ERR, "queue_thread_main: " + "rrd_update_r failed with status %i.", + status); + } + + free (file); + for (i = 0; i < values_num; i++) + free (values[i]); + + if (status == 0) + { + pthread_mutex_lock (&stats_lock); + stats_updates_written++; + stats_data_sets_written += values_num; + pthread_mutex_unlock (&stats_lock); + } + + pthread_mutex_lock (&cache_lock); + pthread_cond_broadcast (&flush_cond); + + /* We're about to shut down, so lets flush the entire tree. */ + if ((do_shutdown != 0) && (cache_queue_head == NULL)) + flush_old_values (/* max age = */ -1); + } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */ + pthread_mutex_unlock (&cache_lock); + + return (NULL); +} /* }}} void *queue_thread_main */ + +static int buffer_get_field (char **buffer_ret, /* {{{ */ + size_t *buffer_size_ret, char **field_ret) +{ + char *buffer; + size_t buffer_pos; + size_t buffer_size; + char *field; + size_t field_size; + int status; + + buffer = *buffer_ret; + buffer_pos = 0; + buffer_size = *buffer_size_ret; + field = *buffer_ret; + field_size = 0; + + if (buffer_size <= 0) + return (-1); + + /* This is ensured by `handle_request'. */ + assert (buffer[buffer_size - 1] == ' '); + + status = -1; + while (buffer_pos < buffer_size) + { + /* Check for end-of-field or end-of-buffer */ + if (buffer[buffer_pos] == ' ') + { + field[field_size] = 0; + field_size++; + buffer_pos++; + status = 0; + break; + } + /* Handle escaped characters. */ + else if (buffer[buffer_pos] == '\\') + { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer_pos++; + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + /* Normal operation */ + else + { + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (status); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + *field_ret = field; + + return (0); +} /* }}} int buffer_get_field */ + +static int flush_file (const char *filename) /* {{{ */ +{ + cache_item_t *ci; + + pthread_mutex_lock (&cache_lock); + + ci = (cache_item_t *) g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + pthread_mutex_unlock (&cache_lock); + return (ENOENT); + } + + /* Enqueue at head */ + enqueue_cache_item (ci, HEAD); + pthread_cond_signal (&cache_cond); + + while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0) + { + ci = NULL; + + pthread_cond_wait (&flush_cond, &cache_lock); + + ci = g_tree_lookup (cache_tree, filename); + if (ci == NULL) + { + RRDD_LOG (LOG_ERR, "flush_file: Tree node went away " + "while waiting for flush."); + pthread_mutex_unlock (&cache_lock); + return (-1); + } + } + + pthread_mutex_unlock (&cache_lock); + return (0); +} /* }}} int flush_file */ + +static int handle_request_help (int fd, /* {{{ */ + char *buffer, size_t buffer_size) +{ + int status; + char **help_text; + size_t help_text_len; + char *command; + size_t i; + + char *help_help[] = + { + "4 Command overview\n", + "FLUSH \n", + "HELP []\n", + "UPDATE [ ...]\n", + "STATS\n" + }; + size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]); + + char *help_flush[] = + { + "4 Help for FLUSH\n", + "Usage: FLUSH \n", + "\n", + "Adds the given filename to the head of the update queue and returns\n", + "after is has been dequeued.\n" + }; + size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]); + + char *help_update[] = + { + "9 Help for UPDATE\n", + "Usage: UPDATE [ ...]\n" + "\n", + "Adds the given file to the internal cache if it is not yet known and\n", + "appends the given value(s) to the entry. See the rrdcached(1) manpage\n", + "for details.\n", + "\n", + "Each has the following form:\n", + " =