]> git.tokkee.org Git - rrdtool.git/commitdiff

Code

RRDcached patch. This implements an infrastructure, where rrd updates can be
authoroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Sun, 14 Sep 2008 09:49:03 +0000 (09:49 +0000)
committeroetiker <oetiker@a5681a0c-68f1-0310-ab6d-d61299d08faa>
Sun, 14 Sep 2008 09:49:03 +0000 (09:49 +0000)
sent to a daemon which caches them prior to bulk-updateing rrd files. See the rrdcached manual page.
-- Created by Florian Forster with some help from Kevin Brintnall.

git-svn-id: svn://svn.oetiker.ch/rrdtool/trunk/program@1504 a5681a0c-68f1-0310-ab6d-d61299d08faa

30 files changed:
CONTRIBUTORS
configure.ac
doc/Makefile.am
doc/rrdcached.pod [new file with mode: 0644]
doc/rrddump.pod
doc/rrdfetch.pod
doc/rrdgraph.pod
doc/rrdinfo.pod
doc/rrdlast.pod
doc/rrdlastupdate.pod
doc/rrdtool.pod
doc/rrdupdate.pod
doc/rrdxport.pod
src/Makefile.am
src/librrd.sym.in
src/rrd.h
src/rrd_client.c [new file with mode: 0644]
src/rrd_client.h [new file with mode: 0644]
src/rrd_daemon.c [new file with mode: 0644]
src/rrd_dump.c
src/rrd_fetch.c
src/rrd_graph.c
src/rrd_graph.h
src/rrd_info.c
src/rrd_last.c
src/rrd_lastupdate.c
src/rrd_tool.c
src/rrd_tool.h
src/rrd_update.c
src/rrd_xport.c

index 847c1d149f66aa3f65c18beacba34e884378a7ea..a2dc2320536c812ce278db9f7d1c1b5752e0f218 100644 (file)
@@ -21,7 +21,7 @@ David Grimes <dgrimes with navisite.com> SQRT/SORT/REV/SHIFT/TREND
 David L. Barker <dave with ncomtech.com> xport function bug fixes
 Evan Miller <emiller with imvu.com> Multiplicative HW Enhancements
 Frank Strauss <strauss with escape.de> TCL bindings
-Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite deprecated function export
+Florian octo Forster <rrdtool nospam.verplant.org> rrd_restore libxml2 rewrite, deprecated function export, rrdcached
 Henrik Storner <henrik with hswn.dk> functions for min/max values of data in graph
 Hermann Hueni <hueni with glue.ch> (SunOS porting)
 Jakob Ilves <jilves with se.oracle.com> HPUX 11
index cc2e998d3fdeddf1254391c5bd9e808579ca41f0..451502cd2939e59a61d8f192aa88e812ea67fac2 100644 (file)
@@ -130,8 +130,9 @@ CONFIGURE_PART(Audit Compilation Environment)
 
 
 dnl Check for the compiler and static/shared library creation.
-AC_PROG_CC
 AC_PROG_CPP
+AC_PROG_CC
+AM_PROG_CC_C_O
 AC_PROG_LIBTOOL
 
 dnl Try to detect/use GNU features
@@ -466,7 +467,7 @@ EX_CHECK_ALL(cairo,      cairo_font_options_create,     cairo.h,
 EX_CHECK_ALL(cairo,      cairo_svg_surface_create,      cairo-svg.h,            cairo-svg,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_pdf_surface_create,      cairo-pdf.h,            cairo-pdf,   1.4.6,  http://cairographics.org/releases/, "")
 EX_CHECK_ALL(cairo,      cairo_ps_surface_create,       cairo-ps.h,             cairo-ps,    1.4.6,  http://cairographics.org/releases/, "")
-dnl EX_CHECK_ALL(glib-2.0,   glib_check_version,        glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
+EX_CHECK_ALL(glib-2.0,   glib_check_version,            glib.h,                 glib-2.0,    2.12.12, ftp://ftp.gtk.org/pub/glib/2.12/, "")
 EX_CHECK_ALL(pango-1.0,  pango_cairo_context_set_font_options,  pango/pango.h,  pangocairo,  1.17,    http://ftp.gnome.org/pub/GNOME/sources/pango/1.17, "")
 EX_CHECK_ALL(xml2,       xmlParseFile,                  libxml/parser.h,        libxml-2.0,        2.6.31,  http://xmlsoft.org/downloads.html, /usr/include/libxml2)
 
index 16fd61755698fb51666b6a369102d089c144adaf..8e52084578c7b0731bec40e0936659b2b0c22198 100644 (file)
@@ -10,8 +10,8 @@ CLEANFILES = *.1 *.html *.txt *-dircache RRD?.pod *.pdf *~ core *itemcache *.rej
 
 POD = bin_dec_hex.pod        rrddump.pod            rrdgraph_examples.pod  rrdrestore.pod         rrdupdate.pod  \
       cdeftutorial.pod       rrdfetch.pod           rrdgraph_graph.pod     rrdthreads.pod         rrdxport.pod   \
-      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod                           \
-      rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod                          \
+      rpntutorial.pod        rrdfirst.pod           rrdgraph_rpn.pod       rrdtool.pod            rrdcached.pod  \
+      rrd-beginners.pod      rrdinfo.pod            rrdtune.pod            rrdbuild.pod           rrdflush.pod   \
       rrdcgi.pod             rrdgraph.pod           rrdlast.pod            rrdlastupdate.pod                     \
       rrdcreate.pod          rrdgraph_data.pod      rrdresize.pod          rrdtutorial.pod                       
 
diff --git a/doc/rrdcached.pod b/doc/rrdcached.pod
new file mode 100644 (file)
index 0000000..8ad37be
--- /dev/null
@@ -0,0 +1,354 @@
+=pod
+
+=head1 NAME
+
+rrdcached - Data caching daemon for rrdtool
+
+=head1 SYNOPSIS
+
+B<rrdcached> [B<-l> I<address>] [B<-w> I<timeout>] [B<-f> I<timeout>]
+
+=head1 DESCRIPTION
+
+B<rrdcached> is a daemon that receives updates to existing RRD files,
+accumulates them and, if enough have been received or a defined time has
+passed, writes the updates to the RRD file. A I<flush> command may be used to
+force writing of values to disk, so that graphing facilities and similar can
+work with up-to-date data.
+
+The daemon was written with big setups in mind. Those setups usually run into
+IOE<nbsp>related problems sooner or later for reasons that are beyond the scope
+of this document. Check the wiki at the RRDTool homepage for details. Also
+check L<SECURITY CONSIDERATIONS> below before using this daemon! A detailed
+description of how the daemon operates can be found in the L<HOW IT WORKS>
+section below.
+
+=head1 OPTIONS
+
+=over 4
+
+=item B<-l> I<address>
+
+Tells the daemon to bind to I<address> and accept incoming connections on that
+socket. If I<address> begins with C<unix:>, everything following that prefix is
+interpreted as the path to a UNIX domain socket. Otherwise the address or node
+name are resolved using L<getaddrinfo>.
+
+If the B<-l> option is not specified the default address,
+C<unix:/tmp/rrdcached.sock>, will be used.
+
+=item B<-w> I<timeout>
+
+Data is written to disk every I<timeout> seconds. If this option is not
+specified the default interval of 300E<nbsp>seconds will be used.
+
+=item B<-f> I<timeout>
+
+Every I<timeout> seconds the entire cache is searched for old values which are
+written to disk. This only concerns files to which updates have stopped, so
+setting this to a high value, such as 3600E<nbsp>seconds, is acceptable in most
+cases. This timeout defaults to 3600E<nbsp>seconds.
+
+=item B<-p> I<file>
+
+Sets the name and location of the PID-file. If not specified, the default,
+C<I<$localststedir>/run/rrdcached.pid> will be used.
+
+=item B<-b> I<dir>
+
+The daemon will change into a specific directory at startup. All files passed
+to the daemon, that are specified by a B<relative> path, will be interpreted
+to be relative to this directory. If not given the default, C</tmp>, will be
+used.
+
+  +------------------------+------------------------+
+  ! Command line           ! File updated           !
+  +------------------------+------------------------+
+  ! foo.rrd                ! /tmp/foo.rrd           !
+  ! foo/bar.rrd            ! /tmp/foo/bar.rrd       !
+  ! /var/lib/rrd/foo.rrd   ! /var/lib/rrd/foo.rrd   !
+  +------------------------+------------------------+
+  Paths given on the command  line and paths actually
+  updated by the daemon,  assuming the base directory
+  "/tmp".
+
+=back
+
+=head1 EFFECTED RRDTOOL COMMANDS
+
+The following commands may be made aware of the B<rrdcached> using the command
+line argument B<--daemon> or the environment variable B<RRDCACHED_ADDRESS>:
+
+=over 4
+
+=item B<dump>
+
+=item B<fetch>
+
+=item B<flush>
+
+=item B<graph>
+
+=item B<graphv>
+
+=item B<info>
+
+=item B<last>
+
+=item B<lastupdate>
+
+=item B<update>
+
+=item B<xport>
+
+=back
+
+The B<update> command can send values to the daemon instead of writing them to
+the disk itself. All other commands can send a B<FLUSH> command (see below) to
+the daemon before accessing the files, so they work with up-to-date data even
+if the cache timeout is large.
+
+=head1 HOW IT WORKS
+
+When receiving an update, B<rrdcached> does not write to disk but looks for an
+entry for that file in its internal tree. If not found, an entry is created
+including the current time (called "First" in the diagram below). This time is
+B<not> the time specified on the command line but the time the operating system
+considers to be "now". The value and time of the value (called "Time" in the
+diagram below) are appended to the tree node.
+
+When appending a value to a tree node, it is checked whether it's time to write
+the values to disk. Values are written to disk if
+S<C<now() - First E<gt>= timeout>>, where C<timeout> is the timeout specified
+using the B<-w> option, see L<OPTIONS>. If the values are "old enough" they
+will be enqueued in the "update queue", i.E<nbsp>e. they will be appended to
+the linked list shown below.  Because the tree nodes and the elements of the
+linked list are the same data structures in memory, any update to a file that
+has already been enqueued will be written with the next write to the RRD file,
+too.
+
+A separate "update thread" constantly dequeues the first element in the update
+queue and writes all its values to the appropriate file. So as long as the
+update queue is not empty files are written at the highest possible rate.
+
+Since the timeout of files is checked only when new values are added to the
+file, "dead" files, i.E<nbsp>e. files that are not updated anymore, would never
+be written to disk. Therefore, every now and then, controlled by the B<-f>
+option, the entire tree is walked and all "old" values are enqueued. Since this
+only affects "dead" files and walking the tree is relatively expensive, you
+should set the "flush interval" to a reasonably high value. The default is
+3600E<nbsp>seconds (one hour).
+
+The downside of caching values is that they won't show up in graphs generated
+from the RRDE<nbsp>files. To get around this, the daemon provides the "flush
+command" to flush specific files. This means that the file is inserted at the
+B<head> of the update queue or moved there if it is already enqueued. The flush
+command will return after the update thread has dequeued the file, so there is
+a good chance that the file has been updated by the time the client receives
+the response from the daemon, but there is no guarantee.
+
+ +------+   +------+                               +------+
+ ! head !   ! root !                               ! tail !
+ +---+--+   +---+--+                               +---+--+
+     !         /\                                      !
+     !        /  \                                     !
+     !       /\  /\                                    !
+     !      /\/\ \ `----------------- ... --------,    !
+     V     /      `-------,                       !    V
+ +---+----+---+    +------+-----+             +---+----+---+
+ ! File:  foo !    ! File:  bar !             ! File:  qux !
+ ! First: 101 !    ! First: 119 !             ! First: 180 !
+ ! Next:   ---+--->! Next:   ---+---> ... --->! Next:   -  !
+ +============+    +============+             +============+
+ ! Time:  100 !    ! Time:  120 !             ! Time:  180 !
+ ! Value:  10 !    ! Value: 0.1 !             ! Value: 2,2 !
+ +------------+    +------------+             +------------+
+ ! Time:  110 !    ! Time:  130 !             ! Time:  190 !
+ ! Value:  26 !    ! Value: 0.1 !             ! Value: 7,3 !
+ +------------+    +------------+             +------------+
+ :            :    :            :             :            :
+ +------------+    +------------+             +------------+
+ ! Time:  230 !    ! Time:  250 !             ! Time:  310 !
+ ! Value:  42 !    ! Value: 0.2 !             ! Value: 1,2 !
+ +------------+    +------------+             +------------+
+
+The above diagram demonstrates:
+
+=over
+
+=item *
+
+Files/values are stored in a (balanced) tree.
+
+=item *
+
+Tree nodes and entries in the update queue are the same data structure.
+
+=item *
+
+The local time ("First") and the time specified in updates ("Time") may differ.  
+
+=item *
+
+Timed out values are inserted at the "tail".
+
+=item *
+
+Explicitly flushed values are inserted at the "head".
+
+=item *
+
+ASCII art rocks.
+
+=back
+
+=head1 SECURITY CONSIDERATIONS
+
+This daemon is meant to improve IOE<nbsp>performance for setups with thousands
+of RRDE<nbsp>file to be updated. So security measures built into the daemon can
+be summarized easily: B<There is no security built in!>
+
+There is no authentication and authorization, so B<you> will have to take care
+that only authorized clients can talk to the daemon. Since we assume that graph
+collection is done on a dedicated machine, i.E<nbsp>e. the box doesn't do
+anything else and especially does not have any interactive logins other than
+root, a UNIX domain socket should take care of that.
+
+If you (want to) use the network capability, i.E<nbsp>e. let the daemon bind to
+an IPv4 or IPv6 socket, it is B<your> job to install a packet filter or similar
+mechanism to prevent unauthorized connections. Unless you have a dedicated VLAN
+or VPN for this, using the network option is probably a bad idea!
+
+The daemon will blindly write to any file it gets told, so you really should
+create a separate user just for this daemon. Also it does not do any sanity
+checks, so if it gets told to write values for a time far in the future, your
+files will be messed up good!
+
+You have been warned.
+
+=head1 PROTOCOL
+
+The daemon communicates with clients using a line based ASCII protocol which is
+easy to read and easy to type. This makes it easy for scripts to implement the
+protocol and possible for users to use L<telnet> to connect to the daemon
+and test stuff "by hand".
+
+The protocol is line based, this means that each record consists of one or more
+lines. A line is terminated by the line feed character C<0x0A>, commonly
+written as C<\n>. In the examples below, this character will be written as
+C<E<lt>LFE<gt>> ("line feed").
+
+After the connection has been established, the client is expected to send a
+"command". A command consists of the command keyword, possibly some arguments,
+and a terminating newline character. For a list of commands, see
+L<Valid Commands> below.
+
+Example:
+
+  FLUSH /tmp/foo.rrd<LF>
+
+The daemon answers with a line consisting of a status code and a short status
+message, separated by one or more space characters. A negative status code
+signals an error, a positive status code or zero signal success. If the status
+code is greater than zero, it indicates the number of lines that follow the
+status line.
+
+Examples:
+
+ 0 Success<LF>
+
+ 2 Two lines follow<LF>
+ This is the first line<LF>
+ And this is the second line<LF>
+
+=head2 Valid Commands
+
+The following commands are understood by the daemon:
+
+=over 4
+
+=item B<FLUSH> I<filename>
+
+Causes the daemon to put I<filename> to the B<head> of the update queue
+(possibly moving it there if the node is already enqueued). The answer will be
+sent B<after> the node has been dequeued.
+
+=item B<HELP> [I<command>]
+
+Returns a short usage message. If no command is given, or I<command> is
+B<HELP>, a list of commands supported by the daemon is returned. Otherwise a
+short description, possibly containing a pointer to a manual page, is returned.
+Obviously, this is meant for interactive usage and the format in which the
+commands and usage summaries are returned is not well defined.
+
+=item B<STATS>
+
+Returns a list of metrics which can be used to measure the daemons performance
+and check its status. For a description of the values returned, see
+L<Performance Values> below.
+
+The format in which the values are returned is similar to many other line based
+protocols: Each value is printed on a separate line, each consisting of the
+name of the value, a colon, one or more spaces and the actual value.
+
+Example:
+
+ 5 Statistics follow
+ QueueLength: 0
+ UpdatesWritten: 13
+ DataSetsWritten: 390
+ TreeNodesNumber: 13
+ TreeDepth: 4
+
+=item B<UPDATE> I<filename> I<values> [I<values> ...]
+
+Adds more data to a filename. This is B<the> operation the daemon was designed
+for, so describing the mechanism again is unnecessary. Read L<HOW IT WORKS>
+above for a detailed explanation.
+
+=back
+
+=head2 Performance Values
+
+The following counters are returned by the B<STATS> command:
+
+=over 4
+
+=item B<QueueLength> I<(unsigned 64bit integer)>
+
+Number of nodes currently enqueued in the update queue.
+
+=item B<TreeDepth> I<(unsigned 64bit integer)>
+
+Depth of the tree used for fast key lookup.
+
+=item B<TreeNodesNumber> I<(unsigned 64bit integer)>
+
+Number of nodes in the cache.
+
+=item B<UpdatesWritten> I<(unsigned 64bit integer)>
+
+Total number of updates, i.E<nbsp>e. calls to C<rrd_update_r>, since the daemon
+was started.
+
+=item B<DataSetsWritten> I<(unsigned 64bit integer)>
+
+Total number of "data sets" written to disk since the daemon was started. A
+data set is one or more values passed to the B<UPDATE> command. For example:
+C<N:123:456> is one data set with two values. The term "data set" is used to
+prevent confusion whether individual values or groups of values are counted.
+
+=back
+
+=head1 BUGS
+
+No known bugs at the moment.
+
+=head1 SEE ALSO
+
+L<rrdtool>, L<rrdgraph>
+
+=head1 AUHOR
+
+B<rrdcached> and this manual page have been written by Florian Forster
+E<lt>octoE<nbsp>atE<nbsp>verplant.orgE<gt>.
index a698d841eaa5387580fa9000e0c43020eab8de84..89e378bca1994a3a2b1ace4b89fc76cbde686d8c 100644 (file)
@@ -4,11 +4,16 @@ rrddump - dump the contents of an RRD to XML format
 
 =head1 SYNOPSIS
 
-B<rrdtool> B<dump> S<[B<--no-header>|B<-n>]> I<filename.rrd> E<gt> I<filename.xml>
+B<rrdtool> B<dump> I<filename.rrd>
+S<[B<--no-header>|B<-n>]>
+S<[B<--daemon> I<address>]>
+S<E<gt> I<filename.xml>>
 
 or 
 
-B<rrdtool> B<dump> S<[B<--no-header>|B<-n>]> I<filename.rrd> I<filename.xml>
+B<rrdtool> B<dump> I<filename.rrd> I<filename.xml>
+S<[B<--no-header>|B<-n>]>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -31,13 +36,24 @@ The name of the B<RRD> you want to dump.
 The (optional) filename that you want to write the XML output to.
 If not specified, the XML will be printed to stdout.
 
-=item S<[B<--no-header>|B<-n>]>
+=item B<--no-header>|B<-n>
 
 In rrdtool 1.3, the dump function started producing correct xml-headers.
 Unfortunately the rrdtool restore function from the 1.2 series can not
 handle these headers. With this option you can supress the creatinon of
 the xml headers.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool dump --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
 =back
 
 =head1 EXAMPLES
@@ -62,6 +78,21 @@ B<rrdrestore> for details.
 
 =back
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>dump>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 AUTHOR
 
 Tobias Oetiker E<lt>tobi@oetiker.chE<gt>
index 51b5ccd99c8441f8f7f12021836ba22a902ccc56..d187b69e9db48b0152bfa519d71dabc187002b66 100644 (file)
@@ -8,6 +8,7 @@ B<rrdtool> B<fetch> I<filename> I<CF>
 S<[B<--resolution>|B<-r> I<resolution>]>
 S<[B<--start>|B<-s> I<start>]>
 S<[B<--end>|B<-e> I<end>]>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -48,6 +49,17 @@ the end of the time series in seconds since epoch. See also AT-STYLE
 TIME SPECIFICATION section for a detailed explanation of how to
 specify the end time.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool fetch --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd AVERAGE
+
 =back
 
 =head2 RESOLUTION INTERVAL
@@ -257,6 +269,22 @@ I<931225537> -- 18:45  July 5th, 1999
 I<19970703 12:45> -- 12:45  July 3th, 1997
 (my favorite, and its even got an ISO number (8601)).
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>fetch>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 AUTHOR
 
-Tobias Oetiker <tobi@oetiker.ch>
+Tobias Oetiker E<lt>tobi@oetiker.chE<gt>
+
index c2e9b3da212bb33e40818a1da0858dbbd0f1355c..2c57dbb156b7e1687e8f04916273bf2cb05e3de2 100644 (file)
@@ -248,7 +248,7 @@ to the more robust B<--alt-y-grid> mode.
 
 How many digits should rrdtool assume the y-axis labels to be? You
 may have to use this option to make enough space once you start
-fideling with the y-axis labeling.
+fiddling with the y-axis labeling.
 
 [B<--units=si>]
 
@@ -267,6 +267,17 @@ Note, that only the image size will be returned, if you run with lazy even
 when using graphv and even when using PRINT.
 
 
+[B<--daemon> I<address>]
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows the graph to contain
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool graph [...] --daemon unix:/var/run/rrdcached.sock [...]
+
 [B<-f>|B<--imginfo> I<printfstr>]
 
 After the image has been created, the graph function uses printf
@@ -469,6 +480,21 @@ There is more information returned than in the standard interface.
 Especially the 'graph_*' keys are new. They help applications that want to
 know what is where on the graph.
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>graph>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
 =head1 SEE ALSO
 
 L<rrdgraph> gives an overview of how B<rrdtool graph> works.
index e83d8d6138279af4e4fbd9eb4b38b494357a3e9e..2af411a5595a61694c233288e383d2525ec3a21d 100644 (file)
@@ -4,7 +4,8 @@ rrdinfo - extract header information from an RRD
 
 =head1 SYNOPSIS
 
-B<rrdtool> B<info> I<filename.rrd>
+B<rrdtool> B<info> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -14,6 +15,25 @@ a parsing friendly format.
 Check L<rrdcreate> if you are uncertain about the meaning of the
 individual keys.
 
+=over 8
+
+=item I<filename>
+
+The name of the B<RRD> you want to examine.
+
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool info --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
 =head1 EXAMPLE
 
 This is the output generated by running B<info> on a simple RRD which
@@ -48,11 +68,18 @@ data sources.
  rra[0].cdp_prep[1].value = nan
  rra[0].cdp_prep[1].unknown_datapoints = 0
 
-=over 8
+=head1 ENVIRONMENT VARIABLES
 
-=item I<filename.rrd>
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>info>:
 
-The name of the B<RRD> you want to examine.
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
 
 =back
 
index a3eb7feedf10289cafa5ee7dfb579200de94dd94..ecec65a949df966f8b416259ec4bf0548249e0bc 100644 (file)
@@ -5,6 +5,7 @@ rrdlast - Return the date of the last data sample in an RRD
 =head1 SYNOPSIS
 
 B<rrdtool> B<last> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -17,6 +18,32 @@ update of the RRD.
 
 The name of the B<RRD> that contains the data.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool last --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>last>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
 =back
 
 =head1 AUTHOR
index 37013cf7ef21617473cc1e9efabbbd6f71f36eec..db506abd0559a3264c0efea38dbc8bbb656bee6d 100644 (file)
@@ -5,6 +5,7 @@ rrdlastupdate - Return the most recent update to an RRD
 =head1 SYNOPSIS
 
 B<rrdtool> B<lastupdate> I<filename>
+S<[B<--daemon> I<address>]>
 
 =head1 DESCRIPTION
 
@@ -17,11 +18,37 @@ value stored for each datum in the most recent update of an RRD.
 
 The name of the B<RRD> that contains the data.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool lastupdate --daemon unix:/var/run/rrdcached.sock /var/lib/rrd/foo.rrd
+
+=back
+
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>lastupdate>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
 =back
 
 =head1 AUTHOR
 
-Andy Riebs <andy.riebs@hp.com>
+Andy Riebs E<lt>andy.riebs@hp.comE<gt>
 
 
 
index 154afe7a356501a7fb10d41aa2e0b75a03dbc626..84749af542042fb0a507492f8f9e90e50e8b390f 100644 (file)
@@ -91,7 +91,11 @@ Change the size of individual RRAs. This is dangerous! Check L<rrdresize>.
 
 =item B<xport>
 
-Export data retrieved from one or several RRDs. Check L<rrdxport>
+Export data retrieved from one or several RRDs. Check L<rrdxport>.
+
+=item B<flush>
+
+Flush the values for a spcific RRD file from memory. Check L<rrdflush>.
 
 =item B<rrdcgi>
 
@@ -298,9 +302,17 @@ sockets, tools like netcat, or in a quick interactive test by using
 B<NOTE:> that there is no authentication with this feature! Do not setup
 such a port unless you are sure what you are doing.
 
+=head1 RRDCACHED, THE CACHING DAEMON
+
+For very big setups, updating thousands of RRD files often becomes a serious IO
+problem. If you run into such problems, you might want to take a look at
+L<rrdcached>, a caching daemon for RRDTool which may help you lessen the
+stress on your disks.
+
 =head1 SEE ALSO
 
-rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport
+rrdcreate, rrdupdate, rrdgraph, rrddump, rrdfetch, rrdtune, rrdlast, rrdxport,
+rrdflush, rrdcached
 
 =head1 BUGS
 
index cc0b452f76ce0e9faeb3d6cbd54058f5fa86f40f..82c66037279439760745cb895f30a4661658e2d5 100644 (file)
@@ -6,6 +6,7 @@ rrdupdate - Store a new set of values into the RRD
 
 B<rrdtool> {B<update> | B<updatev>} I<filename>
 S<[B<--template>|B<-t> I<ds-name>[B<:>I<ds-name>]...]>
+S<[B<--daemon> I<address>]>
 S<B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]>
 S<I<at-timestamp>B<@>I<value>[B<:>I<value>...]>
 S<[I<timestamp>B<:>I<value>[B<:>I<value>...] ...]>
@@ -29,6 +30,9 @@ RRA (consolidation function and PDPs per CDP), and data source (name).
 Note that depending on the arguments of the current and previous call to
 update, the list may have no entries or a large number of entries.
 
+Since B<updatev> requires direct disk access, the B<--daemon> option cannot be
+used with this command.
+
 =item I<filename>
 
 The name of the B<RRD> you want to update.
@@ -56,6 +60,18 @@ function. If this is done accidentally (and this can only be done
 using the template switch), B<RRDtool> will ignore the value specified
 for the COMPUTE B<DST>.
 
+=item B<--daemon> I<address>
+
+If given, B<RRDTool> will try to connect to the caching daemon L<rrdcached>
+at I<address> and will fail if the connection cannot be established. If the
+connection is successfully established the values will be sent to the daemon
+instead of accessing the files directly. If I<address> begins with C<unix:>
+then everything after this prefix will be considered to be a UNIX domain
+socket, see L<EXAMPLES> below. Otherwise the address is interpreted as network
+address or node name as understood by L<getaddrinfo>. One practical
+consequence is that both, IPv4 and IPv6, may be used if the system supports
+it. This option is available for the B<update> command only.
+
 =item B<N>|I<timestamp>B<:>I<value>[B<:>I<value>...]
 
 The data used for updating the RRD was acquired at a certain
@@ -82,20 +98,65 @@ separator.
 
 =back
 
-=head1 EXAMPLE
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>update>:
+
+=over
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
+
+=head1 EXAMPLES
+
+=over
+
+=item *
 
 C<rrdtool update demo1.rrd N:3.44:3.15:U:23>
 
 Update the database file demo1.rrd with 3 known and one I<*UNKNOWN*>
 value. Use the current time as the update time.
 
+=item *
+
 C<rrdtool update demo2.rrd 887457267:U 887457521:22 887457903:2.7>
 
 Update the database file demo2.rrd which expects data from a single
 data-source, three times. First with an I<*UNKNOWN*> value then with two
 regular readings. The update interval seems to be around 300 seconds.
 
-=head1 AUTHOR
+=item *
+
+C<rrdtool update --cache /var/lib/rrd/demo3.rrd N:42>
+
+Update the file C</var/lib/rrd/demo3.rrd> with a single data source, using the
+current time. If the caching daemon cannot be reached, do B<not> fall back to
+direct file access.
+
+=item *
+
+C<rrdtool update --daemon unix:/tmp/rrdd.sock demo4.rrd N:23>
+
+Use the UNIX domain socket C</tmp/rrdd.sock> to contact the caching daemon. If
+the caching daemon is not available, update the file C<demo4.rrd> directly.
+B<WARNING:> Since a relative path is specified, the following disturbing effect
+may occur: If the daemon is available, the file relative to the working
+directory B<of the daemon> is used. If the daemon is not available, the file
+relative to the current working directory of the invoking process is used.
+B<This may update two different files depending on whether the daemon could be
+reached or not.> Don't do relative paths, kids!
+
+=back
+
+=head1 AUTHORS
 
-Tobias Oetiker <tobi@oetiker.ch>
+Tobias Oetiker <tobi@oetiker.ch>,
+Florian Forster <octoE<nbsp>atE<nbsp>verplant.org>
 
index a668a20f33434ecaddf514a75e37a80c7e32c07a..11159e479fbe47d15f1e4e86b0654efec4224b9e 100644 (file)
@@ -9,6 +9,7 @@ S<[B<-s>|B<--start> I<seconds>]>
 S<[B<-e>|B<--end> I<seconds>]>
 S<[B<-m>|B<--maxrows> I<rows>]>
 S<[B<--step> I<value>]>
+S<[B<--daemon> I<address>]>
 S<[B<DEF:>I<vname>B<=>I<rrd>B<:>I<ds-name>B<:>I<CF>]>
 S<[B<CDEF:>I<vname>B<=>I<rpn-expression>]>
 S<[B<XPORT>B<:>I<vname>[B<:>I<legend>]]>
@@ -48,6 +49,17 @@ for details.
 
 See L<rrdgraph> documentation.
 
+=item B<--daemon> I<address>
+
+Address of the L<rrdcached> daemon. If specified, a C<flush> command is sent
+to the server before reading the RRD files. This allows B<rrdtool> to return
+fresh data even if the daemon is configured to cache values for a long time. To
+specify a UNIX domain socket use the prefix C<unix:>, see example below. Other
+addresses are interpreted as normal network addresses, i.E<nbsp>e. IPv4 or IPv6
+addresses in most cases.
+
+ rrdtool xport --daemon unix:/var/run/rrdcached.sock ...
+
 =item B<--enumds> 
 
 The generated xml should contain the data values in enumerated tags.
@@ -137,6 +149,20 @@ The resulting data section is:
           XPORT:out2:"if2 out bytes" \
          XPORT:sum:"output sum"
 
+=head1 ENVIRONMENT VARIABLES
+
+The following environment variables may be used to change the behavior of
+C<rrdtoolE<nbsp>xport>:
+
+=over 4
+
+=item B<RRDCACHED_ADDRESS>
+
+If this environment variable is set it will have the same effect as specifying
+the C<--daemon> option on the command line. If both are present, the command
+line argument takes precedence.
+
+=back
 
 =head1 AUTHOR
 
index 8ff671d6a28b1b3ed23a6c8529e17fe2710186ed..420aeab365a65bea2f92d3939c5c5178052e41cf 100644 (file)
@@ -23,6 +23,7 @@ UPD_C_FILES =         \
        rrd_info.c      \
        rrd_error.c     \
        rrd_open.c      \
+       rrd_client.c    \
        rrd_nan_inf.c   \
        rrd_rpncalc.c   \
        rrd_update.c
@@ -41,6 +42,7 @@ RRD_C_FILES =         \
        rrd_xport.c     \
        rrd_gfx.c \
        rrd_dump.c      \
+       rrd_flush.c     \
        rrd_fetch.c     \
        rrd_resize.c \
        rrd_tune.c
@@ -82,9 +84,9 @@ librrd_th_la_LDFLAGS         = $(MULTITHREAD_LDFLAGS) -version-info @LIBVERS@
 librrd_th_la_LDFLAGS         += -export-symbols librrd.sym
 librrd_th_la_LIBADD          = $(ALL_LIBS)
 
-include_HEADERS        = rrd.h rrd_format.h
+include_HEADERS        = rrd.h rrd_format.h rrd_client.h
 
-bin_PROGRAMS   = rrdtool rrdupdate
+bin_PROGRAMS   = rrdtool rrdupdate rrdcached
 
 if BUILD_RRDCGI
 bin_PROGRAMS += rrdcgi
@@ -100,6 +102,11 @@ rrdtool_SOURCES = rrd_tool.c
 rrdtool_DEPENDENCIES = librrd.la
 rrdtool_LDADD  = librrd.la
 
+rrdcached_SOURCES = rrd_daemon.c
+rrdcached_DEPENDENCIES = librrd.la
+rrdcached_CPPFLAGS = -DVERSION='"$(VERSION)"' -DLOCALSTATEDIR='"$(localstatedir)"'
+rrdcached_LDADD = librrd.la
+
 # strftime is here because we do not usually need it. unices have propper
 # iso date support
 EXTRA_DIST= strftime.c strftime.h  rrd_getopt.c rrd_getopt1.c rrd_getopt.h \
index a178f5541ce40259534ba77509bc4f20c126306c..22afc86528a96375d224809b124fb7b676bcddda 100644 (file)
@@ -1,5 +1,6 @@
 rrd_clear_error
 rrd_close
+rrd_cmd_flush
 rrd_create
 rrd_create_r
 rrd_dontneed
@@ -25,6 +26,7 @@ rrd_init
 rrd_last
 rrd_last_r
 rrd_lastupdate
+rrd_lastupdate_r
 rrd_lock
 rrd_new_context
 rrd_open
@@ -48,4 +50,8 @@ rrd_update_v
 rrd_version
 rrd_write
 rrd_xport
+rrdc_connect
+rrdc_disconnect
+rrdc_flush
+rrdc_update
 @RRD_GETOPT_LONG@
index a428370bca33a5a22af5066953139ae84281d48b..ad7e15c32eed98f5543cee9f38eae2471cc7d4f4 100644 (file)
--- a/src/rrd.h
+++ b/src/rrd.h
@@ -171,13 +171,7 @@ extern    "C" {
     time_t    rrd_last(
     int,
     char **);
-    int       rrd_lastupdate(
-    int argc,
-    char **argv,
-    time_t *last_update,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    char ***last_ds);
+    int rrd_lastupdate(int argc, char **argv);
     time_t    rrd_first(
     int,
     char **);
@@ -198,6 +192,7 @@ extern    "C" {
     unsigned long *,
     char ***,
     rrd_value_t **);
+    int       rrd_cmd_flush (int argc, char **argv);
 
     void      rrd_freemem(
     void *mem);
@@ -217,20 +212,24 @@ extern    "C" {
     const char *_template,
     int argc,
     const char **argv);
-    int       rrd_fetch_r(
-    const char *filename,
-    const char *cf,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_r (
+            const char *filename,
+            const char *cf,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
     int       rrd_dump_r(
     const char *filename,
     char *outname);
-    time_t    rrd_last_r(
-    const char *filename);
+    time_t    rrd_last_r (const char *filename);
+    int rrd_lastupdate_r (const char *filename,
+            time_t *ret_last_update,
+            unsigned long *ret_ds_count,
+            char ***ret_ds_names,
+            char ***ret_last_ds);
     time_t    rrd_first_r(
     const char *filename,
     int rraindex);
diff --git a/src/rrd_client.c b/src/rrd_client.c
new file mode 100644 (file)
index 0000000..f1253f8
--- /dev/null
@@ -0,0 +1,436 @@
+/**
+ * RRDTool - src/rrd_client.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#include "rrd.h"
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <pthread.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+
+static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+static int sd = -1;
+
+static ssize_t sread (void *buffer_void, size_t buffer_size) /* {{{ */
+{
+  char    *buffer;
+  size_t   buffer_used;
+  size_t   buffer_free;
+  ssize_t  status;
+
+  buffer       = (char *) buffer_void;
+  buffer_used  = 0;
+  buffer_free  = buffer_size;
+
+  while (buffer_free > 0)
+  {
+    status = read (sd, buffer + buffer_used, buffer_free);
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (-1);
+
+    if (status == 0)
+    {
+      close (sd);
+      sd = -1;
+      errno = EPROTO;
+      return (-1);
+    }
+
+    assert ((0 > status) || (buffer_free >= (size_t) status));
+
+    buffer_free = buffer_free - status;
+    buffer_used = buffer_used + status;
+
+    if (buffer[buffer_used - 1] == '\n')
+      break;
+  }
+
+  if (buffer[buffer_used - 1] != '\n')
+  {
+    errno = ENOBUFS;
+    return (-1);
+  }
+
+  buffer[buffer_used - 1] = 0;
+  return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (const void *buf, size_t count) /* {{{ */
+{
+  const char *ptr;
+  size_t      nleft;
+  ssize_t     status;
+
+  ptr   = (const char *) buf;
+  nleft = count;
+
+  while (nleft > 0)
+  {
+    status = write (sd, (const void *) ptr, nleft);
+
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+    {
+      close (sd);
+      sd = -1;
+      return (status);
+    }
+
+    nleft = nleft - status;
+    ptr   = ptr   + status;
+  }
+
+  return (0);
+} /* }}} ssize_t swrite */
+
+static int buffer_add_string (const char *str, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char *buffer;
+  size_t buffer_size;
+  size_t buffer_pos;
+  size_t i;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_size = *buffer_size_ret;
+  buffer_pos = 0;
+
+  i = 0;
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    if (str[i] == 0)
+    {
+      buffer[buffer_pos] = ' ';
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    else if ((str[i] == ' ') || (str[i] == '\\'))
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer[buffer_pos] = '\\';
+      buffer_pos++;
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    else
+    {
+      buffer[buffer_pos] = str[i];
+      buffer_pos++;
+    }
+    i++;
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (-1);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+
+  return (0);
+} /* }}} int buffer_add_string */
+
+static int buffer_add_value (const char *value, /* {{{ */
+    char **buffer_ret, size_t *buffer_size_ret)
+{
+  char temp[4096];
+
+  if (strncmp (value, "N:", 2) == 0)
+    snprintf (temp, sizeof (temp), "%lu:%s",
+        (unsigned long) time (NULL), value + 2);
+  else
+    strncpy (temp, value, sizeof (temp));
+  temp[sizeof (temp) - 1] = 0;
+
+  return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
+} /* }}} int buffer_add_value */
+
+static int rrdc_connect_unix (const char *path) /* {{{ */
+{
+  struct sockaddr_un sa;
+  int status;
+
+  assert (path != NULL);
+
+  pthread_mutex_lock (&lock);
+
+  if (sd >= 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (sd < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  return (0);
+} /* }}} int rrdc_connect_unix */
+
+int rrdc_connect (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  if (addr == NULL)
+    addr = RRDCACHED_DEFAULT_ADDRESS;
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (rrdc_connect_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (rrdc_connect_unix (addr));
+
+  pthread_mutex_lock (&lock);
+
+  if (sd >= 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (sd < 0)
+    {
+      status = errno;
+      sd = -1;
+      continue;
+    }
+
+    status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      status = errno;
+      close (sd);
+      sd = -1;
+      continue;
+    }
+
+    assert (status == 0);
+    break;
+  } /* for (ai_ptr) */
+  pthread_mutex_unlock (&lock);
+
+  return (status);
+} /* }}} int rrdc_connect */
+
+int rrdc_disconnect (void) /* {{{ */
+{
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (0);
+  }
+
+  close (sd);
+  sd = -1;
+
+  pthread_mutex_unlock (&lock);
+
+  return (0);
+} /* }}} int rrdc_disconnect */
+
+int rrdc_update (const char *filename, int values_num, /* {{{ */
+               const char * const *values)
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+  int i;
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  for (i = 0; i < values_num; i++)
+  {
+    status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
+    if (status != 0)
+      return (ENOBUFS);
+  }
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_update */
+
+int rrdc_flush (const char *filename) /* {{{ */
+{
+  char buffer[4096];
+  char *buffer_ptr;
+  size_t buffer_free;
+  size_t buffer_size;
+  int status;
+
+  if (filename == NULL)
+    return (-1);
+
+  memset (buffer, 0, sizeof (buffer));
+  buffer_ptr = &buffer[0];
+  buffer_free = sizeof (buffer);
+
+  status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
+  if (status != 0)
+    return (ENOBUFS);
+
+  assert (buffer_free < sizeof (buffer));
+  buffer_size = sizeof (buffer) - buffer_free;
+  assert (buffer[buffer_size - 1] == ' ');
+  buffer[buffer_size - 1] = '\n';
+
+  pthread_mutex_lock (&lock);
+
+  if (sd < 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENOTCONN);
+  }
+
+  status = swrite (buffer, buffer_size);
+  if (status != 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+
+  status = sread (buffer, sizeof (buffer));
+  if (status < 0)
+  {
+    status = errno;
+    pthread_mutex_unlock (&lock);
+    return (status);
+  }
+  else if (status == 0)
+  {
+    pthread_mutex_unlock (&lock);
+    return (ENODATA);
+  }
+
+  pthread_mutex_unlock (&lock);
+
+  status = atoi (buffer);
+  return (status);
+} /* }}} int rrdc_flush */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
diff --git a/src/rrd_client.h b/src/rrd_client.h
new file mode 100644 (file)
index 0000000..92d4c07
--- /dev/null
@@ -0,0 +1,40 @@
+/**
+ * RRDTool - src/rrd_client.h
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+#ifndef __RRD_CLIENT_H
+#define __RRD_CLIENT_H 1
+
+#ifndef RRDCACHED_DEFAULT_ADDRESS
+# define RRDCACHED_DEFAULT_ADDRESS "unix:/tmp/rrdcached.sock"
+#endif
+
+#define RRDCACHED_DEFAULT_PORT "42217"
+#define ENV_RRDCACHED_ADDRESS "RRDCACHED_ADDRESS"
+
+int rrdc_connect (const char *addr);
+int rrdc_disconnect (void);
+
+int rrdc_update (const char *filename, int values_num,
+        const char * const *values);
+
+int rrdc_flush (const char *filename);
+
+#endif /* __RRD_CLIENT_H */
diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c
new file mode 100644 (file)
index 0000000..bc299f8
--- /dev/null
@@ -0,0 +1,1754 @@
+/**
+ * RRDTool - src/rrd_daemon.c
+ * Copyright (C) 2008 Florian octo Forster
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Florian octo Forster <octo at verplant.org>
+ **/
+
+/*
+ * First tell the compiler to stick to the C99 and POSIX standards as close as
+ * possible.
+ */
+#ifndef __STRICT_ANSI__ /* {{{ */
+# define __STRICT_ANSI__
+#endif
+
+#ifndef _ISOC99_SOURCE
+# define _ISOC99_SOURCE
+#endif
+
+#ifdef _POSIX_C_SOURCE
+# undef _POSIX_C_SOURCE
+#endif
+#define _POSIX_C_SOURCE 200112L
+
+/* Single UNIX needed for strdup. */
+#ifdef _XOPEN_SOURCE
+# undef _XOPEN_SOURCE
+#endif
+#define _XOPEN_SOURCE 500
+
+#ifndef _REENTRANT
+# define _REENTRANT
+#endif
+
+#ifndef _THREAD_SAFE
+# define _THREAD_SAFE
+#endif
+
+#ifdef _GNU_SOURCE
+# undef _GNU_SOURCE
+#endif
+/* }}} */
+
+/*
+ * Now for some includes..
+ */
+#include "rrd.h" /* {{{ */
+#include "rrd_client.h"
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <strings.h>
+#include <stdint.h>
+#include <inttypes.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netdb.h>
+#include <poll.h>
+#include <syslog.h>
+#include <pthread.h>
+#include <errno.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include <glib-2.0/glib.h>
+/* }}} */
+
+#define RRDD_LOG(severity, ...) syslog ((severity), __VA_ARGS__)
+
+#ifndef __GNUC__
+# define __attribute__(x) /**/
+#endif
+
+/*
+ * Types
+ */
+struct listen_socket_s
+{
+  int fd;
+  char path[PATH_MAX + 1];
+};
+typedef struct listen_socket_s listen_socket_t;
+
+struct cache_item_s;
+typedef struct cache_item_s cache_item_t;
+struct cache_item_s
+{
+  char *file;
+  char **values;
+  int values_num;
+  time_t last_flush_time;
+#define CI_FLAGS_IN_TREE  0x01
+#define CI_FLAGS_IN_QUEUE 0x02
+  int flags;
+
+  cache_item_t *next;
+};
+
+struct callback_flush_data_s
+{
+  time_t now;
+  time_t abs_timeout;
+  char **keys;
+  size_t keys_num;
+};
+typedef struct callback_flush_data_s callback_flush_data_t;
+
+enum queue_side_e
+{
+  HEAD,
+  TAIL
+};
+typedef enum queue_side_e queue_side_t;
+
+/*
+ * Variables
+ */
+static listen_socket_t *listen_fds = NULL;
+static size_t listen_fds_num = 0;
+
+static int do_shutdown = 0;
+
+static pthread_t queue_thread;
+
+static pthread_t *connetion_threads = NULL;
+static pthread_mutex_t connetion_threads_lock = PTHREAD_MUTEX_INITIALIZER;
+static int connetion_threads_num = 0;
+
+/* Cache stuff */
+static GTree          *cache_tree = NULL;
+static cache_item_t   *cache_queue_head = NULL;
+static cache_item_t   *cache_queue_tail = NULL;
+static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t  cache_cond = PTHREAD_COND_INITIALIZER;
+
+static pthread_cond_t  flush_cond = PTHREAD_COND_INITIALIZER;
+
+static int config_write_interval = 300;
+static int config_flush_interval = 3600;
+static char *config_pid_file = NULL;
+static char *config_base_dir = NULL;
+
+static char **config_listen_address_list = NULL;
+static int config_listen_address_list_len = 0;
+
+static uint64_t stats_queue_length = 0;
+static uint64_t stats_updates_written = 0;
+static uint64_t stats_data_sets_written = 0;
+static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER;
+
+/* 
+ * Functions
+ */
+static void sig_int_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_int_handler */
+
+static void sig_term_handler (int s __attribute__((unused))) /* {{{ */
+{
+  do_shutdown++;
+} /* }}} void sig_term_handler */
+
+static int write_pidfile (void) /* {{{ */
+{
+  pid_t pid;
+  char *file;
+  FILE *fh;
+
+  pid = getpid ();
+  
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  fh = fopen (file, "w");
+  if (fh == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "write_pidfile: Opening `%s' failed.", file);
+    return (-1);
+  }
+
+  fprintf (fh, "%i\n", (int) pid);
+  fclose (fh);
+
+  return (0);
+} /* }}} int write_pidfile */
+
+static int remove_pidfile (void) /* {{{ */
+{
+  char *file;
+  int status;
+
+  file = (config_pid_file != NULL)
+    ? config_pid_file
+    : LOCALSTATEDIR "/run/rrdcached.pid";
+
+  status = unlink (file);
+  if (status == 0)
+    return (0);
+  return (errno);
+} /* }}} int remove_pidfile */
+
+static ssize_t sread (int fd, void *buffer_void, size_t buffer_size) /* {{{ */
+{
+  char    *buffer;
+  size_t   buffer_used;
+  size_t   buffer_free;
+  ssize_t  status;
+
+  buffer       = (char *) buffer_void;
+  buffer_used  = 0;
+  buffer_free  = buffer_size;
+
+  while (buffer_free > 0)
+  {
+    status = read (fd, buffer + buffer_used, buffer_free);
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (-1);
+
+    if (status == 0)
+      return (0);
+
+    assert ((0 > status) || (buffer_free >= (size_t) status));
+
+    buffer_free = buffer_free - status;
+    buffer_used = buffer_used + status;
+
+    if (buffer[buffer_used - 1] == '\n')
+      break;
+  }
+
+  assert (buffer_used > 0);
+
+  if (buffer[buffer_used - 1] != '\n')
+  {
+    errno = ENOBUFS;
+    return (-1);
+  }
+
+  buffer[buffer_used - 1] = 0;
+
+  /* Fix network line endings. */
+  if ((buffer_used > 1) && (buffer[buffer_used - 2] == '\r'))
+  {
+    buffer_used--;
+    buffer[buffer_used - 1] = 0;
+  }
+
+  return (buffer_used);
+} /* }}} ssize_t sread */
+
+static ssize_t swrite (int fd, const void *buf, size_t count) /* {{{ */
+{
+  const char *ptr;
+  size_t      nleft;
+  ssize_t     status;
+
+  ptr   = (const char *) buf;
+  nleft = count;
+
+  while (nleft > 0)
+  {
+    status = write (fd, (const void *) ptr, nleft);
+
+    if ((status < 0) && ((errno == EAGAIN) || (errno == EINTR)))
+      continue;
+
+    if (status < 0)
+      return (status);
+
+    nleft = nleft - status;
+    ptr   = ptr   + status;
+  }
+
+  return (0);
+} /* }}} ssize_t swrite */
+
+/*
+ * enqueue_cache_item:
+ * `cache_lock' must be acquired before calling this function!
+ */
+static int enqueue_cache_item (cache_item_t *ci, /* {{{ */
+    queue_side_t side)
+{
+  int did_insert = 0;
+
+  if (ci == NULL)
+    return (-1);
+
+  if (ci->values_num == 0)
+    return (0);
+
+  if (side == HEAD)
+  {
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+    {
+      assert (ci->next == NULL);
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      if (cache_queue_tail == NULL)
+        cache_queue_tail = cache_queue_head;
+
+      did_insert = 1;
+    }
+    else if (cache_queue_head == ci)
+    {
+      /* do nothing */
+    }
+    else /* enqueued, but not first entry */
+    {
+      cache_item_t *prev;
+
+      /* find previous entry */
+      for (prev = cache_queue_head; prev != NULL; prev = prev->next)
+        if (prev->next == ci)
+          break;
+      assert (prev != NULL);
+
+      /* move to the front */
+      prev->next = ci->next;
+      ci->next = cache_queue_head;
+      cache_queue_head = ci;
+
+      /* check if we need to adapt the tail */
+      if (cache_queue_tail == ci)
+        cache_queue_tail = prev;
+    }
+  }
+  else /* (side == TAIL) */
+  {
+    /* We don't move values back in the list.. */
+    if ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+      return (0);
+
+    assert (ci->next == NULL);
+
+    if (cache_queue_tail == NULL)
+      cache_queue_head = ci;
+    else
+      cache_queue_tail->next = ci;
+    cache_queue_tail = ci;
+
+    did_insert = 1;
+  }
+
+  ci->flags |= CI_FLAGS_IN_QUEUE;
+
+  if (did_insert)
+  {
+    pthread_mutex_lock (&stats_lock);
+    stats_queue_length++;
+    pthread_mutex_unlock (&stats_lock);
+  }
+
+  return (0);
+} /* }}} int enqueue_cache_item */
+
+/*
+ * tree_callback_flush:
+ * Called via `g_tree_foreach' in `queue_thread_main'. `cache_lock' is held
+ * while this is in progress.
+ */
+static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */
+    gpointer data)
+{
+  cache_item_t *ci;
+  callback_flush_data_t *cfd;
+
+  ci = (cache_item_t *) value;
+  cfd = (callback_flush_data_t *) data;
+
+  if ((ci->last_flush_time <= cfd->abs_timeout)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+  }
+  else if ((do_shutdown != 0)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+  }
+  else if (((cfd->now - ci->last_flush_time) >= config_flush_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num <= 0))
+  {
+    char **temp;
+
+    temp = (char **) realloc (cfd->keys,
+        sizeof (char *) * (cfd->keys_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "tree_callback_flush: realloc failed.");
+      return (FALSE);
+    }
+    cfd->keys = temp;
+    /* Make really sure this points to the _same_ place */
+    assert ((char *) key == ci->file);
+    cfd->keys[cfd->keys_num] = (char *) key;
+    cfd->keys_num++;
+  }
+
+  return (FALSE);
+} /* }}} gboolean tree_callback_flush */
+
+static int flush_old_values (int max_age)
+{
+  callback_flush_data_t cfd;
+  size_t k;
+
+  memset (&cfd, 0, sizeof (cfd));
+  /* Pass the current time as user data so that we don't need to call
+   * `time' for each node. */
+  cfd.now = time (NULL);
+  cfd.keys = NULL;
+  cfd.keys_num = 0;
+
+  if (max_age > 0)
+    cfd.abs_timeout = cfd.now - max_age;
+  else
+    cfd.abs_timeout = cfd.now + 1;
+
+  /* `tree_callback_flush' will return the keys of all values that haven't
+   * been touched in the last `config_flush_interval' seconds in `cfd'.
+   * The char*'s in this array point to the same memory as ci->file, so we
+   * don't need to free them separately. */
+  g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd);
+
+  for (k = 0; k < cfd.keys_num; k++)
+  {
+    cache_item_t *ci;
+
+    /* This must not fail. */
+    ci = (cache_item_t *) g_tree_lookup (cache_tree, cfd.keys[k]);
+    assert (ci != NULL);
+
+    /* If we end up here with values available, something's seriously
+     * messed up. */
+    assert (ci->values_num == 0);
+
+    /* Remove the node from the tree */
+    g_tree_remove (cache_tree, cfd.keys[k]);
+    cfd.keys[k] = NULL;
+
+    /* Now free and clean up `ci'. */
+    free (ci->file);
+    ci->file = NULL;
+    free (ci);
+    ci = NULL;
+  } /* for (k = 0; k < cfd.keys_num; k++) */
+
+  if (cfd.keys != NULL)
+  {
+    free (cfd.keys);
+    cfd.keys = NULL;
+  }
+
+  return (0);
+} /* int flush_old_values */
+
+static void *queue_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct timeval now;
+  struct timespec next_flush;
+
+  gettimeofday (&now, NULL);
+  next_flush.tv_sec = now.tv_sec + config_flush_interval;
+  next_flush.tv_nsec = 1000 * now.tv_usec;
+
+  pthread_mutex_lock (&cache_lock);
+  while ((do_shutdown == 0) || (cache_queue_head != NULL))
+  {
+    cache_item_t *ci;
+    char *file;
+    char **values;
+    int values_num;
+    int status;
+    int i;
+
+    /* First, check if it's time to do the cache flush. */
+    gettimeofday (&now, NULL);
+    if ((now.tv_sec > next_flush.tv_sec)
+        || ((now.tv_sec == next_flush.tv_sec)
+          && ((1000 * now.tv_usec) > next_flush.tv_nsec)))
+    {
+      /* Flush all values that haven't been written in the last
+       * `config_write_interval' seconds. */
+      flush_old_values (config_write_interval);
+
+      /* Determine the time of the next cache flush. */
+      while (next_flush.tv_sec < now.tv_sec)
+        next_flush.tv_sec += config_flush_interval;
+    }
+
+    /* Now, check if there's something to store away. If not, wait until
+     * something comes in or it's time to do the cache flush. */
+    if (cache_queue_head == NULL)
+    {
+      status = pthread_cond_timedwait (&cache_cond, &cache_lock, &next_flush);
+      if ((status != 0) && (status != ETIMEDOUT))
+      {
+        RRDD_LOG (LOG_ERR, "queue_thread_main: "
+            "pthread_cond_timedwait returned %i.", status);
+      }
+    }
+
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+
+    /* Check if a value has arrived. This may be NULL if we timed out or there
+     * was an interrupt such as a signal. */
+    if (cache_queue_head == NULL)
+      continue;
+
+    ci = cache_queue_head;
+
+    /* copy the relevant parts */
+    file = strdup (ci->file);
+    if (file == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed.");
+      continue;
+    }
+
+    values = ci->values;
+    values_num = ci->values_num;
+
+    ci->values = NULL;
+    ci->values_num = 0;
+
+    ci->last_flush_time = time (NULL);
+    ci->flags &= ~(CI_FLAGS_IN_QUEUE);
+
+    cache_queue_head = ci->next;
+    if (cache_queue_head == NULL)
+      cache_queue_tail = NULL;
+    ci->next = NULL;
+
+    pthread_mutex_lock (&stats_lock);
+    assert (stats_queue_length > 0);
+    stats_queue_length--;
+    pthread_mutex_unlock (&stats_lock);
+
+    pthread_mutex_unlock (&cache_lock);
+
+    status = rrd_update_r (file, NULL, values_num, (void *) values);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "queue_thread_main: "
+          "rrd_update_r failed with status %i.",
+          status);
+    }
+
+    free (file);
+    for (i = 0; i < values_num; i++)
+      free (values[i]);
+
+    if (status == 0)
+    {
+      pthread_mutex_lock (&stats_lock);
+      stats_updates_written++;
+      stats_data_sets_written += values_num;
+      pthread_mutex_unlock (&stats_lock);
+    }
+
+    pthread_mutex_lock (&cache_lock);
+    pthread_cond_broadcast (&flush_cond);
+
+    /* We're about to shut down, so lets flush the entire tree. */
+    if ((do_shutdown != 0) && (cache_queue_head == NULL))
+      flush_old_values (/* max age = */ -1);
+  } /* while ((do_shutdown == 0) || (cache_queue_head != NULL)) */
+  pthread_mutex_unlock (&cache_lock);
+
+  return (NULL);
+} /* }}} void *queue_thread_main */
+
+static int buffer_get_field (char **buffer_ret, /* {{{ */
+    size_t *buffer_size_ret, char **field_ret)
+{
+  char *buffer;
+  size_t buffer_pos;
+  size_t buffer_size;
+  char *field;
+  size_t field_size;
+  int status;
+
+  buffer = *buffer_ret;
+  buffer_pos = 0;
+  buffer_size = *buffer_size_ret;
+  field = *buffer_ret;
+  field_size = 0;
+
+  if (buffer_size <= 0)
+    return (-1);
+
+  /* This is ensured by `handle_request'. */
+  assert (buffer[buffer_size - 1] == ' ');
+
+  status = -1;
+  while (buffer_pos < buffer_size)
+  {
+    /* Check for end-of-field or end-of-buffer */
+    if (buffer[buffer_pos] == ' ')
+    {
+      field[field_size] = 0;
+      field_size++;
+      buffer_pos++;
+      status = 0;
+      break;
+    }
+    /* Handle escaped characters. */
+    else if (buffer[buffer_pos] == '\\')
+    {
+      if (buffer_pos >= (buffer_size - 1))
+        break;
+      buffer_pos++;
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+    /* Normal operation */ 
+    else
+    {
+      field[field_size] = buffer[buffer_pos];
+      field_size++;
+      buffer_pos++;
+    }
+  } /* while (buffer_pos < buffer_size) */
+
+  if (status != 0)
+    return (status);
+
+  *buffer_ret = buffer + buffer_pos;
+  *buffer_size_ret = buffer_size - buffer_pos;
+  *field_ret = field;
+
+  return (0);
+} /* }}} int buffer_get_field */
+
+static int flush_file (const char *filename) /* {{{ */
+{
+  cache_item_t *ci;
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = (cache_item_t *) g_tree_lookup (cache_tree, filename);
+  if (ci == NULL)
+  {
+    pthread_mutex_unlock (&cache_lock);
+    return (ENOENT);
+  }
+
+  /* Enqueue at head */
+  enqueue_cache_item (ci, HEAD);
+  pthread_cond_signal (&cache_cond);
+
+  while ((ci->flags & CI_FLAGS_IN_QUEUE) != 0)
+  {
+    ci = NULL;
+
+    pthread_cond_wait (&flush_cond, &cache_lock);
+
+    ci = g_tree_lookup (cache_tree, filename);
+    if (ci == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "flush_file: Tree node went away "
+          "while waiting for flush.");
+      pthread_mutex_unlock (&cache_lock);
+      return (-1);
+    }
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+  return (0);
+} /* }}} int flush_file */
+
+static int handle_request_help (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  int status;
+  char **help_text;
+  size_t help_text_len;
+  char *command;
+  size_t i;
+
+  char *help_help[] =
+  {
+    "4 Command overview\n",
+    "FLUSH <filename>\n",
+    "HELP [<command>]\n",
+    "UPDATE <filename> <values> [<values> ...]\n",
+    "STATS\n"
+  };
+  size_t help_help_len = sizeof (help_help) / sizeof (help_help[0]);
+
+  char *help_flush[] =
+  {
+    "4 Help for FLUSH\n",
+    "Usage: FLUSH <filename>\n",
+    "\n",
+    "Adds the given filename to the head of the update queue and returns\n",
+    "after is has been dequeued.\n"
+  };
+  size_t help_flush_len = sizeof (help_flush) / sizeof (help_flush[0]);
+
+  char *help_update[] =
+  {
+    "9 Help for UPDATE\n",
+    "Usage: UPDATE <filename> <values> [<values> ...]\n"
+    "\n",
+    "Adds the given file to the internal cache if it is not yet known and\n",
+    "appends the given value(s) to the entry. See the rrdcached(1) manpage\n",
+    "for details.\n",
+    "\n",
+    "Each <values> has the following form:\n",
+    "  <values> = <time>:<value>[:<value>[...]]\n",
+    "See the rrdupdate(1) manpage for details.\n"
+  };
+  size_t help_update_len = sizeof (help_update) / sizeof (help_update[0]);
+
+  char *help_stats[] =
+  {
+    "4 Help for STATS\n",
+    "Usage: STATS\n",
+    "\n",
+    "Returns some performance counters, see the rrdcached(1) manpage for\n",
+    "a description of the values.\n"
+  };
+  size_t help_stats_len = sizeof (help_stats) / sizeof (help_stats[0]);
+
+  status = buffer_get_field (&buffer, &buffer_size, &command);
+  if (status != 0)
+  {
+    help_text = help_help;
+    help_text_len = help_help_len;
+  }
+  else
+  {
+    if (strcasecmp (command, "update") == 0)
+    {
+      help_text = help_update;
+      help_text_len = help_update_len;
+    }
+    else if (strcasecmp (command, "flush") == 0)
+    {
+      help_text = help_flush;
+      help_text_len = help_flush_len;
+    }
+    else if (strcasecmp (command, "stats") == 0)
+    {
+      help_text = help_stats;
+      help_text_len = help_stats_len;
+    }
+    else
+    {
+      help_text = help_help;
+      help_text_len = help_help_len;
+    }
+  }
+
+  for (i = 0; i < help_text_len; i++)
+  {
+    status = swrite (fd, help_text[i], strlen (help_text[i]));
+    if (status < 0)
+    {
+      status = errno;
+      RRDD_LOG (LOG_ERR, "handle_request_help: swrite returned an error.");
+      return (status);
+    }
+  }
+
+  return (0);
+} /* }}} int handle_request_help */
+
+static int handle_request_stats (int fd, /* {{{ */
+    char *buffer __attribute__((unused)),
+    size_t buffer_size __attribute__((unused)))
+{
+  int status;
+  char outbuf[4096];
+
+  uint64_t copy_queue_length;
+  uint64_t copy_updates_written;
+  uint64_t copy_data_sets_written;
+
+  uint64_t tree_nodes_number;
+  uint64_t tree_depth;
+
+  pthread_mutex_lock (&stats_lock);
+  copy_queue_length       = stats_queue_length;
+  copy_updates_written    = stats_updates_written;
+  copy_data_sets_written  = stats_data_sets_written;
+  pthread_mutex_unlock (&stats_lock);
+
+  pthread_mutex_lock (&cache_lock);
+  tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree);
+  tree_depth        = (uint64_t) g_tree_height (cache_tree);
+  pthread_mutex_unlock (&cache_lock);
+
+#define RRDD_STATS_SEND \
+  outbuf[sizeof (outbuf) - 1] = 0; \
+  status = swrite (fd, outbuf, strlen (outbuf)); \
+  if (status < 0) \
+  { \
+    status = errno; \
+    RRDD_LOG (LOG_INFO, "handle_request_stats: swrite returned an error."); \
+    return (status); \
+  }
+
+  strncpy (outbuf, "5 Statistics follow\n", sizeof (outbuf));
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "QueueLength: %"PRIu64"\n", copy_queue_length);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "UpdatesWritten: %"PRIu64"\n", copy_updates_written);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number);
+  RRDD_STATS_SEND;
+
+  snprintf (outbuf, sizeof (outbuf),
+      "TreeDepth: %"PRIu64"\n", tree_depth);
+  RRDD_STATS_SEND;
+
+  return (0);
+#undef RRDD_STATS_SEND
+} /* }}} int handle_request_stats */
+
+static int handle_request_flush (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int status;
+  char result[4096];
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    strncpy (result, "-1 Usage: flush <filename>\n", sizeof (result));
+  }
+  else
+  {
+    status = flush_file (file);
+    if (status == 0)
+      snprintf (result, sizeof (result), "0 Successfully flushed %s.\n", file);
+    else if (status == ENOENT)
+      snprintf (result, sizeof (result), "-1 No such file: %s.\n", file);
+    else if (status < 0)
+      strncpy (result, "-1 Internal error.\n", sizeof (result));
+    else
+      snprintf (result, sizeof (result), "-1 Failed with status %i.\n", status);
+  }
+  result[sizeof (result) - 1] = 0;
+
+  status = swrite (fd, result, strlen (result));
+  if (status < 0)
+  {
+    status = errno;
+    RRDD_LOG (LOG_INFO, "handle_request_flush: swrite returned an error.");
+    return (status);
+  }
+
+  return (0);
+} /* }}} int handle_request_flush */
+
+static int handle_request_update (int fd, /* {{{ */
+    char *buffer, size_t buffer_size)
+{
+  char *file;
+  int values_num = 0;
+  int status;
+
+  time_t now;
+
+  cache_item_t *ci;
+  char answer[4096];
+
+#define RRDD_UPDATE_SEND \
+  answer[sizeof (answer) - 1] = 0; \
+  status = swrite (fd, answer, strlen (answer)); \
+  if (status < 0) \
+  { \
+    status = errno; \
+    RRDD_LOG (LOG_INFO, "handle_request_update: swrite returned an error."); \
+    return (status); \
+  }
+
+  now = time (NULL);
+
+  status = buffer_get_field (&buffer, &buffer_size, &file);
+  if (status != 0)
+  {
+    strncpy (answer, "-1 Usage: UPDATE <filename> <values> [<values> ...]\n",
+        sizeof (answer));
+    RRDD_UPDATE_SEND;
+    return (0);
+  }
+
+  pthread_mutex_lock (&cache_lock);
+
+  ci = g_tree_lookup (cache_tree, file);
+  if (ci == NULL) /* {{{ */
+  {
+    struct stat statbuf;
+
+    memset (&statbuf, 0, sizeof (statbuf));
+    status = stat (file, &statbuf);
+    if (status != 0)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: stat (%s) failed.", file);
+
+      status = errno;
+      if (status == ENOENT)
+        snprintf (answer, sizeof (answer), "-1 No such file: %s", file);
+      else
+        snprintf (answer, sizeof (answer), "-1 stat failed with error %i.\n",
+            status);
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+    if (!S_ISREG (statbuf.st_mode))
+    {
+      pthread_mutex_unlock (&cache_lock);
+
+      snprintf (answer, sizeof (answer), "-1 Not a regular file: %s", file);
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+
+    ci = (cache_item_t *) malloc (sizeof (cache_item_t));
+    if (ci == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed.");
+
+      strncpy (answer, "-1 malloc failed.\n", sizeof (answer));
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+    memset (ci, 0, sizeof (cache_item_t));
+
+    ci->file = strdup (file);
+    if (ci->file == NULL)
+    {
+      pthread_mutex_unlock (&cache_lock);
+      free (ci);
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+
+      strncpy (answer, "-1 strdup failed.\n", sizeof (answer));
+      RRDD_UPDATE_SEND;
+      return (0);
+    }
+
+    ci->values = NULL;
+    ci->values_num = 0;
+    ci->last_flush_time = now;
+    ci->flags = CI_FLAGS_IN_TREE;
+
+    g_tree_insert (cache_tree, (void *) ci->file, (void *) ci);
+  } /* }}} */
+  assert (ci != NULL);
+
+  while (buffer_size > 0)
+  {
+    char **temp;
+    char *value;
+
+    status = buffer_get_field (&buffer, &buffer_size, &value);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field.");
+      break;
+    }
+
+    temp = (char **) realloc (ci->values,
+        sizeof (char *) * (ci->values_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: realloc failed.");
+      continue;
+    }
+    ci->values = temp;
+
+    ci->values[ci->values_num] = strdup (value);
+    if (ci->values[ci->values_num] == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed.");
+      continue;
+    }
+    ci->values_num++;
+
+    values_num++;
+  }
+
+  if (((now - ci->last_flush_time) >= config_write_interval)
+      && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0)
+      && (ci->values_num > 0))
+  {
+    enqueue_cache_item (ci, TAIL);
+    pthread_cond_signal (&cache_cond);
+  }
+
+  pthread_mutex_unlock (&cache_lock);
+
+  if (values_num < 1)
+  {
+    strncpy (answer, "-1 No values updated.\n", sizeof (answer));
+  }
+  else
+  {
+    snprintf (answer, sizeof (answer), "0 Enqueued %i value%s\n", values_num,
+        (values_num == 1) ? "" : "s");
+  }
+  RRDD_UPDATE_SEND;
+  return (0);
+#undef RRDD_UPDATE_SEND
+} /* }}} int handle_request_update */
+
+static int handle_request (int fd) /* {{{ */
+{
+  char buffer[4096];
+  size_t buffer_size;
+  char *buffer_ptr;
+  char *command;
+  int status;
+
+  status = (int) sread (fd, buffer, sizeof (buffer));
+  if (status == 0)
+  {
+    return (1);
+  }
+  else if (status < 0)
+  {
+    RRDD_LOG (LOG_ERR, "handle_request: sread failed.");
+    return (-1);
+  }
+  buffer_size = (size_t) status;
+  assert (buffer_size <= sizeof (buffer));
+  assert (buffer[buffer_size - 1] == 0);
+
+  /* Place the normal field separator at the end to simplify
+   * `buffer_get_field's work. */
+  buffer[buffer_size - 1] = ' ';
+
+  buffer_ptr = buffer;
+  command = NULL;
+  status = buffer_get_field (&buffer_ptr, &buffer_size, &command);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_INFO, "handle_request: Unable parse command.");
+    return (-1);
+  }
+
+  if (strcasecmp (command, "update") == 0)
+  {
+    return (handle_request_update (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "flush") == 0)
+  {
+    return (handle_request_flush (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "stats") == 0)
+  {
+    return (handle_request_stats (fd, buffer_ptr, buffer_size));
+  }
+  else if (strcasecmp (command, "help") == 0)
+  {
+    return (handle_request_help (fd, buffer_ptr, buffer_size));
+  }
+  else
+  {
+    char result[4096];
+
+    snprintf (result, sizeof (result), "-1 Unknown command: %s\n", command);
+    result[sizeof (result) - 1] = 0;
+
+    status = swrite (fd, result, strlen (result));
+    if (status < 0)
+    {
+      RRDD_LOG (LOG_ERR, "handle_request: swrite failed.");
+      return (-1);
+    }
+  }
+
+  return (0);
+} /* }}} int handle_request */
+
+static void *connection_thread_main (void *args /* {{{ */
+    __attribute__((unused)))
+{
+  pthread_t self;
+  int i;
+  int fd;
+  
+  fd = *((int *) args);
+
+  pthread_mutex_lock (&connetion_threads_lock);
+  {
+    pthread_t *temp;
+
+    temp = (pthread_t *) realloc (connetion_threads,
+        sizeof (pthread_t) * (connetion_threads_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "connection_thread_main: realloc failed.");
+    }
+    else
+    {
+      connetion_threads = temp;
+      connetion_threads[connetion_threads_num] = pthread_self ();
+      connetion_threads_num++;
+    }
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  while (do_shutdown == 0)
+  {
+    struct pollfd pollfd;
+    int status;
+
+    pollfd.fd = fd;
+    pollfd.events = POLLIN | POLLPRI;
+    pollfd.revents = 0;
+
+    status = poll (&pollfd, 1, /* timeout = */ 500);
+    if (status == 0) /* timeout */
+      continue;
+    else if (status < 0) /* error */
+    {
+      status = errno;
+      if (status == EINTR)
+        continue;
+      RRDD_LOG (LOG_ERR, "connection_thread_main: poll(2) failed.");
+      continue;
+    }
+
+    if ((pollfd.revents & POLLHUP) != 0) /* normal shutdown */
+    {
+      close (fd);
+      break;
+    }
+    else if ((pollfd.revents & (POLLIN | POLLPRI)) == 0)
+    {
+      RRDD_LOG (LOG_WARNING, "connection_thread_main: "
+          "poll(2) returned something unexpected: %#04hx",
+          pollfd.revents);
+      close (fd);
+      break;
+    }
+
+    status = handle_request (fd);
+    if (status != 0)
+    {
+      close (fd);
+      break;
+    }
+  }
+
+  self = pthread_self ();
+  /* Remove this thread from the connection threads list */
+  pthread_mutex_lock (&connetion_threads_lock);
+  /* Find out own index in the array */
+  for (i = 0; i < connetion_threads_num; i++)
+    if (pthread_equal (connetion_threads[i], self) != 0)
+      break;
+  assert (i < connetion_threads_num);
+
+  /* Move the trailing threads forward. */
+  if (i < (connetion_threads_num - 1))
+  {
+    memmove (connetion_threads + i,
+        connetion_threads + i + 1,
+        sizeof (pthread_t) * (connetion_threads_num - i - 1));
+  }
+
+  connetion_threads_num--;
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  free (args);
+  return (NULL);
+} /* }}} void *connection_thread_main */
+
+static int open_listen_socket_unix (const char *path) /* {{{ */
+{
+  int fd;
+  struct sockaddr_un sa;
+  listen_socket_t *temp;
+  int status;
+
+  temp = (listen_socket_t *) realloc (listen_fds,
+      sizeof (listen_fds[0]) * (listen_fds_num + 1));
+  if (temp == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: realloc failed.");
+    return (-1);
+  }
+  listen_fds = temp;
+  memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+  fd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
+  if (fd < 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: socket(2) failed.");
+    return (-1);
+  }
+
+  memset (&sa, 0, sizeof (sa));
+  sa.sun_family = AF_UNIX;
+  strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
+
+  status = bind (fd, (struct sockaddr *) &sa, sizeof (sa));
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: bind(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+
+  status = listen (fd, /* backlog = */ 10);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket_unix: listen(2) failed.");
+    close (fd);
+    unlink (path);
+    return (-1);
+  }
+  
+  listen_fds[listen_fds_num].fd = fd;
+  snprintf (listen_fds[listen_fds_num].path,
+      sizeof (listen_fds[listen_fds_num].path) - 1,
+      "unix:%s", path);
+  listen_fds_num++;
+
+  return (0);
+} /* }}} int open_listen_socket_unix */
+
+static int open_listen_socket (const char *addr) /* {{{ */
+{
+  struct addrinfo ai_hints;
+  struct addrinfo *ai_res;
+  struct addrinfo *ai_ptr;
+  int status;
+
+  assert (addr != NULL);
+
+  if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
+    return (open_listen_socket_unix (addr + strlen ("unix:")));
+  else if (addr[0] == '/')
+    return (open_listen_socket_unix (addr));
+
+  memset (&ai_hints, 0, sizeof (ai_hints));
+  ai_hints.ai_flags = 0;
+#ifdef AI_ADDRCONFIG
+  ai_hints.ai_flags |= AI_ADDRCONFIG;
+#endif
+  ai_hints.ai_family = AF_UNSPEC;
+  ai_hints.ai_socktype = SOCK_STREAM;
+
+  ai_res = NULL;
+  status = getaddrinfo (addr, RRDCACHED_DEFAULT_PORT, &ai_hints, &ai_res);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "open_listen_socket: getaddrinfo(%s) failed: "
+        "%s", addr, gai_strerror (status));
+    return (-1);
+  }
+
+  for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
+  {
+    int fd;
+    listen_socket_t *temp;
+
+    temp = (listen_socket_t *) realloc (listen_fds,
+        sizeof (listen_fds[0]) * (listen_fds_num + 1));
+    if (temp == NULL)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: realloc failed.");
+      continue;
+    }
+    listen_fds = temp;
+    memset (listen_fds + listen_fds_num, 0, sizeof (listen_fds[0]));
+
+    fd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
+    if (fd < 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: socket(2) failed.");
+      continue;
+    }
+
+    status = bind (fd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: bind(2) failed.");
+      close (fd);
+      continue;
+    }
+
+    status = listen (fd, /* backlog = */ 10);
+    if (status != 0)
+    {
+      RRDD_LOG (LOG_ERR, "open_listen_socket: listen(2) failed.");
+      close (fd);
+      return (-1);
+    }
+
+    listen_fds[listen_fds_num].fd = fd;
+    strncpy (listen_fds[listen_fds_num].path, addr,
+        sizeof (listen_fds[listen_fds_num].path) - 1);
+    listen_fds_num++;
+  } /* for (ai_ptr) */
+
+  return (0);
+} /* }}} int open_listen_socket */
+
+static int close_listen_sockets (void) /* {{{ */
+{
+  size_t i;
+
+  for (i = 0; i < listen_fds_num; i++)
+  {
+    close (listen_fds[i].fd);
+    if (strncmp ("unix:", listen_fds[i].path, strlen ("unix:")) == 0)
+      unlink (listen_fds[i].path + strlen ("unix:"));
+  }
+
+  free (listen_fds);
+  listen_fds = NULL;
+  listen_fds_num = 0;
+
+  return (0);
+} /* }}} int close_listen_sockets */
+
+static void *listen_thread_main (void *args __attribute__((unused))) /* {{{ */
+{
+  struct pollfd *pollfds;
+  int pollfds_num;
+  int status;
+  int i;
+
+  for (i = 0; i < config_listen_address_list_len; i++)
+    open_listen_socket (config_listen_address_list[i]);
+
+  if (config_listen_address_list_len < 1)
+    open_listen_socket (RRDCACHED_DEFAULT_ADDRESS);
+
+  if (listen_fds_num < 1)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: No listen sockets "
+        "could be opened. Sorry.");
+    return (NULL);
+  }
+
+  pollfds_num = listen_fds_num;
+  pollfds = (struct pollfd *) malloc (sizeof (*pollfds) * pollfds_num);
+  if (pollfds == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+    return (NULL);
+  }
+  memset (pollfds, 0, sizeof (*pollfds) * pollfds_num);
+
+  while (do_shutdown == 0)
+  {
+    assert (pollfds_num == ((int) listen_fds_num));
+    for (i = 0; i < pollfds_num; i++)
+    {
+      pollfds[i].fd = listen_fds[i].fd;
+      pollfds[i].events = POLLIN | POLLPRI;
+      pollfds[i].revents = 0;
+    }
+
+    status = poll (pollfds, pollfds_num, /* timeout = */ -1);
+    if (status < 1)
+    {
+      status = errno;
+      if (status != EINTR)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: poll(2) failed.");
+      }
+      continue;
+    }
+
+    for (i = 0; i < pollfds_num; i++)
+    {
+      int *client_sd;
+      struct sockaddr_storage client_sa;
+      socklen_t client_sa_size;
+      pthread_t tid;
+      pthread_attr_t attr;
+
+      if (pollfds[i].revents == 0)
+        continue;
+
+      if ((pollfds[i].revents & (POLLIN | POLLPRI)) == 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: "
+            "poll(2) returned something unexpected for listen FD #%i.",
+            pollfds[i].fd);
+        continue;
+      }
+
+      client_sd = (int *) malloc (sizeof (int));
+      if (client_sd == NULL)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: malloc failed.");
+        continue;
+      }
+
+      client_sa_size = sizeof (client_sa);
+      *client_sd = accept (pollfds[i].fd,
+          (struct sockaddr *) &client_sa, &client_sa_size);
+      if (*client_sd < 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: accept(2) failed.");
+        continue;
+      }
+
+      pthread_attr_init (&attr);
+      pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+      status = pthread_create (&tid, &attr, connection_thread_main,
+          /* args = */ (void *) client_sd);
+      if (status != 0)
+      {
+        RRDD_LOG (LOG_ERR, "listen_thread_main: pthread_create failed.");
+        close (*client_sd);
+        free (client_sd);
+        continue;
+      }
+    } /* for (pollfds_num) */
+  } /* while (do_shutdown == 0) */
+
+  close_listen_sockets ();
+
+  pthread_mutex_lock (&connetion_threads_lock);
+  while (connetion_threads_num > 0)
+  {
+    pthread_t wait_for;
+
+    wait_for = connetion_threads[0];
+
+    pthread_mutex_unlock (&connetion_threads_lock);
+    pthread_join (wait_for, /* retval = */ NULL);
+    pthread_mutex_lock (&connetion_threads_lock);
+  }
+  pthread_mutex_unlock (&connetion_threads_lock);
+
+  return (NULL);
+} /* }}} void *listen_thread_main */
+
+static int daemonize (void) /* {{{ */
+{
+  pid_t child;
+  int status;
+  char *base_dir;
+
+  /* These structures are static, because `sigaction' behaves weird if the are
+   * overwritten.. */
+  static struct sigaction sa_int;
+  static struct sigaction sa_term;
+  static struct sigaction sa_pipe;
+
+  child = fork ();
+  if (child < 0)
+  {
+    fprintf (stderr, "daemonize: fork(2) failed.\n");
+    return (-1);
+  }
+  else if (child > 0)
+  {
+    return (1);
+  }
+
+  /* Change into the /tmp directory. */
+  base_dir = (config_base_dir != NULL)
+    ? config_base_dir
+    : "/tmp";
+  status = chdir (base_dir);
+  if (status != 0)
+  {
+    fprintf (stderr, "daemonize: chdir (%s) failed.\n", base_dir);
+    return (-1);
+  }
+
+  /* Become session leader */
+  setsid ();
+
+  /* Open the first three file descriptors to /dev/null */
+  close (2);
+  close (1);
+  close (0);
+
+  open ("/dev/null", O_RDWR);
+  dup (0);
+  dup (0);
+
+  /* Install signal handlers */
+  memset (&sa_int, 0, sizeof (sa_int));
+  sa_int.sa_handler = sig_int_handler;
+  sigaction (SIGINT, &sa_int, NULL);
+
+  memset (&sa_term, 0, sizeof (sa_term));
+  sa_term.sa_handler = sig_term_handler;
+  sigaction (SIGTERM, &sa_term, NULL);
+
+  memset (&sa_pipe, 0, sizeof (sa_pipe));
+  sa_pipe.sa_handler = SIG_IGN;
+  sigaction (SIGPIPE, &sa_pipe, NULL);
+
+  openlog ("rrdcached", LOG_PID, LOG_DAEMON);
+
+  cache_tree = g_tree_new ((GCompareFunc) strcmp);
+  if (cache_tree == NULL)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: g_tree_new failed.");
+    return (-1);
+  }
+
+  memset (&queue_thread, 0, sizeof (queue_thread));
+  status = pthread_create (&queue_thread, /* attr = */ NULL,
+      queue_thread_main, /* args = */ NULL);
+  if (status != 0)
+  {
+    RRDD_LOG (LOG_ERR, "daemonize: pthread_create failed.");
+    return (-1);
+  }
+
+  write_pidfile ();
+
+  return (0);
+} /* }}} int daemonize */
+
+static int cleanup (void) /* {{{ */
+{
+  do_shutdown++;
+
+  pthread_cond_signal (&cache_cond);
+  pthread_join (queue_thread, /* return = */ NULL);
+
+  remove_pidfile ();
+
+  closelog ();
+
+  return (0);
+} /* }}} int cleanup */
+
+static int read_options (int argc, char **argv) /* {{{ */
+{
+  int option;
+  int status = 0;
+
+  while ((option = getopt(argc, argv, "l:f:w:b:p:h?")) != -1)
+  {
+    switch (option)
+    {
+      case 'l':
+      {
+        char **temp;
+
+        temp = (char **) realloc (config_listen_address_list,
+            sizeof (char *) * (config_listen_address_list_len + 1));
+        if (temp == NULL)
+        {
+          fprintf (stderr, "read_options: realloc failed.\n");
+          return (2);
+        }
+        config_listen_address_list = temp;
+
+        temp[config_listen_address_list_len] = strdup (optarg);
+        if (temp[config_listen_address_list_len] == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (2);
+        }
+        config_listen_address_list_len++;
+      }
+      break;
+
+      case 'f':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_flush_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid flush interval: %s\n", optarg);
+          status = 3;
+        }
+      }
+      break;
+
+      case 'w':
+      {
+        int temp;
+
+        temp = atoi (optarg);
+        if (temp > 0)
+          config_write_interval = temp;
+        else
+        {
+          fprintf (stderr, "Invalid write interval: %s\n", optarg);
+          status = 2;
+        }
+      }
+      break;
+
+      case 'b':
+      {
+        size_t len;
+
+        if (config_base_dir != NULL)
+          free (config_base_dir);
+        config_base_dir = strdup (optarg);
+        if (config_base_dir == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+
+        len = strlen (config_base_dir);
+        while ((len > 0) && (config_base_dir[len - 1] == '/'))
+        {
+          config_base_dir[len - 1] = 0;
+          len--;
+        }
+
+        if (len < 1)
+        {
+          fprintf (stderr, "Invalid base directory: %s\n", optarg);
+          return (4);
+        }
+      }
+      break;
+
+      case 'p':
+      {
+        if (config_pid_file != NULL)
+          free (config_pid_file);
+        config_pid_file = strdup (optarg);
+        if (config_pid_file == NULL)
+        {
+          fprintf (stderr, "read_options: strdup failed.\n");
+          return (3);
+        }
+      }
+      break;
+
+      case 'h':
+      case '?':
+        printf ("RRDd %s  Copyright (C) 2008 Florian octo Forster\n"
+            "\n"
+            "Usage: rrdcached [options]\n"
+            "\n"
+            "Valid options are:\n"
+            "  -l <address>  Socket address to listen to.\n"
+            "  -w <seconds>  Interval in which to write data.\n"
+            "  -f <seconds>  Interval in which to flush dead data.\n"
+            "  -p <file>     Location of the PID-file.\n"
+            "  -b <dir>      Base directory to change to.\n"
+            "\n"
+            "For more information and a detailed description of all options "
+            "please refer\n"
+            "to the rrdcached(1) manual page.\n",
+            VERSION);
+        status = -1;
+        break;
+    } /* switch (option) */
+  } /* while (getopt) */
+
+  return (status);
+} /* }}} int read_options */
+
+int main (int argc, char **argv)
+{
+  int status;
+
+  status = read_options (argc, argv);
+  if (status != 0)
+  {
+    if (status < 0)
+      status = 0;
+    return (status);
+  }
+
+  status = daemonize ();
+  if (status == 1)
+  {
+    struct sigaction sigchld;
+
+    memset (&sigchld, 0, sizeof (sigchld));
+    sigchld.sa_handler = SIG_IGN;
+    sigaction (SIGCHLD, &sigchld, NULL);
+
+    return (0);
+  }
+  else if (status != 0)
+  {
+    fprintf (stderr, "daemonize failed, exiting.\n");
+    return (1);
+  }
+
+  listen_thread_main (NULL);
+
+  cleanup ();
+
+  return (0);
+} /* int main */
+
+/*
+ * vim: set sw=2 sts=2 ts=8 et fdm=marker :
+ */
index 093de989a015d13bfdb107b83d5e885dd6bb42e8..552c636c472e799601ac6657321302019605c367 100644 (file)
@@ -43,6 +43,7 @@
  *****************************************************************************/
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 
 #if !(defined(NETWARE) || defined(WIN32))
 extern char *tzname[2];
@@ -441,6 +442,7 @@ int rrd_dump(
 {
     int       rc;
     int       opt_noheader = 0;
+    char     *opt_daemon = NULL;
 
     /* init rrd clean */
 
@@ -451,16 +453,28 @@ int rrd_dump(
         int       opt;
         int       option_index = 0;
         static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
             {"no-header", no_argument, 0, 'n'},
             {0, 0, 0, 0}
         };
 
-        opt = getopt_long(argc, argv, "n", long_options, &option_index);
+        opt = getopt_long(argc, argv, "d:n", long_options, &option_index);
 
         if (opt == EOF)
             break;
 
         switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
         case 'n':
             opt_noheader = 1;
             break;
@@ -479,6 +493,44 @@ int rrd_dump(
         return (-1);
     }
 
+    if (opt_daemon == NULL)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                return (-1);
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        int status;
+
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (-1);
+        }
+
+        status = rrdc_flush (argv[optind]);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                    argv[optind], status);
+            return (-1);
+        }
+
+        rrdc_disconnect ();
+    } /* if (opt_daemon) */
+
     if ((argc - optind) == 2) {
         rc = rrd_dump_opt_r(argv[optind], argv[optind + 1], opt_noheader);
     } else {
index 5da64c5c374f0d42997df2b61de565d2816a6f40..563e76b3c05973509630084b7b812dddf0f7034c 100644 (file)
@@ -53,6 +53,7 @@
  *****************************************************************************/
 
 #include "rrd_tool.h"
+#include "rrd_client.h"
 
 #include "rrd_is_thread_safe.h"
 /*#define DEBUG*/
@@ -72,6 +73,8 @@ int rrd_fetch(
     long      step_tmp = 1;
     time_t    start_tmp = 0, end_tmp = 0;
     const char *cf;
+    char *opt_daemon = NULL;
+    int status;
 
     rrd_time_value_t start_tv, end_tv;
     char     *parsetime_error = NULL;
@@ -79,6 +82,7 @@ int rrd_fetch(
         {"resolution", required_argument, 0, 'r'},
         {"start", required_argument, 0, 's'},
         {"end", required_argument, 0, 'e'},
+        {"daemon", required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
 
@@ -93,7 +97,7 @@ int rrd_fetch(
         int       option_index = 0;
         int       opt;
 
-        opt = getopt_long(argc, argv, "r:s:e:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "r:s:e:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -114,6 +118,18 @@ int rrd_fetch(
         case 'r':
             step_tmp = atol(optarg);
             break;
+
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '-%c'", optopt);
             return (-1);
@@ -151,10 +167,47 @@ int rrd_fetch(
         return -1;
     }
 
+    if (opt_daemon == NULL)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                return (-1);
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (-1);
+        }
+
+        status = rrdc_flush (argv[optind]);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                    argv[optind], status);
+            return (-1);
+        }
+
+        rrdc_disconnect ();
+    } /* if (opt_daemon) */
+
     cf = argv[optind + 1];
 
-    if (rrd_fetch_r(argv[optind], cf, start, end, step, ds_cnt, ds_namv, data)
-        != 0)
+    status = rrd_fetch_r(argv[optind], cf, start, end, step,
+            ds_cnt, ds_namv, data);
+    if (status != 0)
         return (-1);
     return (0);
 }
@@ -179,7 +232,7 @@ int rrd_fetch_r(
 
     return (rrd_fetch_fn
             (filename, cf_idx, start, end, step, ds_cnt, ds_namv, data));
-}
+} /* int rrd_fetch_r */
 
 int rrd_fetch_fn(
     const char *filename,   /* name of the rrd */
index cb0627cc5febc5bf8bab5a7250c9524d48c2b773..9acf82e590c72ca4812391cd6d53491369a07022 100644 (file)
@@ -26,6 +26,7 @@
 #endif
 
 #include "rrd_graph.h"
+#include "rrd_client.h"
 
 /* some constant definitions */
 
@@ -305,6 +306,13 @@ int im_free(
 
     if (im == NULL)
         return 0;
+
+    if (im->use_rrdcached)
+    {
+        rrdc_disconnect ();
+        im->use_rrdcached = 0;
+    }
+
     for (i = 0; i < (unsigned) im->gdes_c; i++) {
         if (im->gdes[i].data_first) {
             /* careful here, because a single pointer can occur several times */
@@ -833,6 +841,36 @@ int data_fetch(
         if (!skip) {
             unsigned long ft_step = im->gdes[i].step;   /* ft_step will record what we got from fetch */
 
+            /* Flush the file if
+             * - a connection to the daemon has been established
+             * - this is the first occurrence of that RRD file
+             */
+            if (im->use_rrdcached)
+            {
+                int status;
+
+                status = 0;
+                for (ii = 0; ii < i; ii++)
+                {
+                    if (strcmp (im->gdes[i].rrd, im->gdes[ii].rrd) == 0)
+                    {
+                        status = 1;
+                        break;
+                    }
+                }
+
+                if (status == 0)
+                {
+                    status = rrdc_flush (im->gdes[i].rrd);
+                    if (status != 0)
+                    {
+                        rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                                im->gdes[i].rrd, status);
+                        return (-1);
+                    }
+                }
+            } /* if (im->use_rrdcached) */
+
             if ((rrd_fetch_fn(im->gdes[i].rrd,
                               im->gdes[i].cf,
                               &im->gdes[i].start,
@@ -3725,6 +3763,7 @@ void rrd_graph_init(
     im->grinfo_current = (rrd_info_t *) NULL;
     im->imgformat = IF_PNG;
     im->imginfo = NULL;
+    im->use_rrdcached = 0;
     im->lazy = 0;
     im->logarithmic = 0;
     im->maxval = DNAN;
@@ -3856,6 +3895,7 @@ void rrd_graph_options(
         { "watermark",          required_argument, 0, 'W'},
         { "alt-y-mrtg",         no_argument,       0, 1000},    /* this has no effect it is just here to save old apps from crashing when they use it */
         { "pango-markup",       no_argument,       0, 'P'},
+        { "daemon",             required_argument, 0, 'd'},
         {  0, 0, 0, 0}
 };
 /* *INDENT-ON* */
@@ -3870,7 +3910,7 @@ void rrd_graph_options(
         int       col_start, col_end;
 
         opt = getopt_long(argc, argv,
-                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kP",
+                          "s:e:x:y:v:w:h:D:iu:l:rb:oc:n:m:t:f:a:I:zgjFYAMEX:L:S:T:NR:B:W:kPd:",
                           long_options, &option_index);
         if (opt == EOF)
             break;
@@ -4212,6 +4252,25 @@ void rrd_graph_options(
             strncpy(im->watermark, optarg, 100);
             im->watermark[99] = '\0';
             break;
+        case 'd':
+        {
+            int status;
+            if (im->use_rrdcached)
+            {
+                rrd_set_error ("You cannot specify --daemon "
+                        "more than once.");
+                return;
+            }
+            status = rrdc_connect (optarg);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        optarg, status);
+                return;
+            }
+            im->use_rrdcached = 1;
+            break;
+        }
         case '?':
             if (optopt != 0)
                 rrd_set_error("unknown option '%c'", optopt);
@@ -4219,6 +4278,26 @@ void rrd_graph_options(
                 rrd_set_error("unknown option '%s'", argv[optind - 1]);
             return;
         }
+    } /* while (1) */
+
+    if (im->use_rrdcached == 0)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            int status;
+
+            status = rrdc_connect (temp);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        temp, status);
+                return;
+            }
+            im->use_rrdcached = 1;
+        }
     }
     
     pango_cairo_context_set_font_options(pango_layout_get_context(im->layout), im->font_options);
index 62765086a69c5d7b4605d38a8037c275507071ce..cffce4b00570710674437aa0b2ec670988b3e7a3 100644 (file)
@@ -213,6 +213,7 @@ typedef struct image_desc_t {
     char     *imginfo;  /* construct an <IMG ... tag and return 
                            as first retval */
     enum gfx_if_en imgformat;   /* image format */
+    int       use_rrdcached;
     int       lazy;     /* only update the image if there is
                            reasonable probablility that the
                            existing one is out of date */
index 5641edaf198d0d13c215c6bae15f5150f1b4c147..50aab7202ed6e91a775ea89d6ee2a5aa51c28216 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 #include <stdarg.h>
 
 /* proto */
@@ -80,18 +81,92 @@ rrd_info_t *rrd_info(
     char **argv)
 {
     rrd_info_t *info;
+    char *opt_daemon = NULL;
 
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
-        return NULL;
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
+
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long(argc, argv, "d:", long_options, &option_index);
+
+        if (opt == EOF)
+            break;
+
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (NULL);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (NULL);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
+        return (NULL);
     }
 
-    info = rrd_info_r(argv[1]);
+    if (opt_daemon == NULL)
+    {
+        char *temp;
 
-    return (info);
-}
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                return (NULL);
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        int status;
+
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (NULL);
+        }
 
+        status = rrdc_flush (argv[optind]);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                    argv[optind], status);
+            return (NULL);
+        }
 
+        rrdc_disconnect ();
+    } /* if (opt_daemon) */
+
+    info = rrd_info_r(argv[optind]);
+
+    return (info);
+} /* rrd_info_t *rrd_info */
 
 rrd_info_t *rrd_info_r(
     char *filename)
index 2749febda26b12ed106f1143c119cb7d568e98cf..aa9b917385a6da4da6ff555ca41bbe47f424c34e 100644 (file)
@@ -7,19 +7,96 @@
  *****************************************************************************/
 
 #include "rrd_tool.h"
+#include "rrd_client.h"
 
 time_t rrd_last(
     int argc,
     char **argv)
 {
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
+    char *opt_daemon = NULL;
+
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
+
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long(argc, argv, "d:", long_options, &option_index);
+
+        if (opt == EOF)
+            break;
+
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (-1);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
         return (-1);
     }
 
-    return (rrd_last_r(argv[1]));
-}
+    if (opt_daemon == NULL)
+    {
+        char *temp;
 
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                return (-1);
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        int status;
+
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (-1);
+        }
+
+        status = rrdc_flush (argv[optind]);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                    argv[optind], status);
+            return (-1);
+        }
+
+        rrdc_disconnect ();
+    } /* if (opt_daemon) */
+
+    return (rrd_last_r (argv[optind]));
+}
 
 time_t rrd_last_r(
     const char *filename)
index e2ad334a30532d5d27fd1d67f8bfd4cba70f3783..120cf17f5baf6136d085fa2d674b87e1a46310ee 100644 (file)
 /*****************************************************************************
  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
+ *                Copyright by Florian Forster, 2008
  *****************************************************************************
  * rrd_lastupdate  Get the last datum entered for each DS
  *****************************************************************************/
 
 #include "rrd_tool.h"
 #include "rrd_rpncalc.h"
+#include "rrd_client.h"
 #include <stdarg.h>
 
-int rrd_lastupdate(
-    int argc,
-    char **argv,
-    time_t *last_update,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    char ***last_ds)
+int rrd_lastupdate (int argc, char **argv)
+{
+    time_t    last_update;
+    char    **ds_names;
+    char    **last_ds;
+    unsigned long ds_count, i;
+    int status;
+
+    char *opt_daemon = NULL;
+
+    optind = 0;
+    opterr = 0;         /* initialize getopt */
+
+    while (42) {
+        int       opt;
+        int       option_index = 0;
+        static struct option long_options[] = {
+            {"daemon", required_argument, 0, 'd'},
+            {0, 0, 0, 0}
+        };
+
+        opt = getopt_long (argc, argv, "d:", long_options, &option_index);
+
+        if (opt == EOF)
+            break;
+
+        switch (opt) {
+        case 'd':
+            if (opt_daemon != NULL)
+                    free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error ("strdup failed.");
+                return (-1);
+            }
+            break;
+
+        default:
+            rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                    argv[0]);
+            return (-1);
+            break;
+        }
+    }                   /* while (42) */
+
+    if ((argc - optind) != 1) {
+        rrd_set_error ("Usage: rrdtool %s [--daemon <addr>] <file>",
+                argv[0]);
+        return (-1);
+    }
+
+    if (opt_daemon == NULL)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                return (-1);
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_connect failed with status %i.", status);
+            return (-1);
+        }
+
+        status = rrdc_flush (argv[optind]);
+        if (status != 0)
+        {
+            rrd_set_error ("rrdc_flush (%s) failed with status %i.",
+                    argv[optind], status);
+            return (-1);
+        }
+
+        rrdc_disconnect ();
+    } /* if (opt_daemon) */
+
+    status = rrd_lastupdate_r (argv[optind],
+            &last_update, &ds_count, &ds_names, &last_ds);
+    if (status != 0)
+        return (status);
+
+    for (i = 0; i < ds_count; i++)
+        printf(" %s", ds_names[i]);
+    printf ("\n\n");
+
+    printf ("%10lu:", last_update);
+    for (i = 0; i < ds_count; i++) {
+        printf(" %s", last_ds[i]);
+        free(last_ds[i]);
+        free(ds_names[i]);
+    }
+    printf("\n");
+
+    free(last_ds);
+    free(ds_names);
+
+    return (0);
+} /* int rrd_lastupdate */
+
+int rrd_lastupdate_r(const char *filename,
+        time_t *ret_last_update,
+        unsigned long *ret_ds_count,
+        char ***ret_ds_names,
+        char ***ret_last_ds)
 {
     unsigned long i = 0;
-    char     *filename;
     rrd_t     rrd;
     rrd_file_t *rrd_file;
 
-    if (argc < 2) {
-        rrd_set_error("please specify an rrd");
-        goto err_out;
+    rrd_file = rrd_open(filename, &rrd, RRD_READONLY);
+    if (rrd_file == NULL) {
+        rrd_free(&rrd);
+        return (-1);
     }
-    filename = argv[1];
 
-    rrd_file = rrd_open(filename, &rrd, RRD_READONLY);
-    if (rrd_file == NULL)
-        goto err_free;
-
-    *last_update = rrd.live_head->last_up;
-    *ds_cnt = rrd.stat_head->ds_cnt;
-    if (((*ds_namv) =
-         (char **) malloc(rrd.stat_head->ds_cnt * sizeof(char *))) == NULL) {
-        rrd_set_error("malloc fetch ds_namv array");
-        goto err_close;
+    *ret_last_update = rrd.live_head->last_up;
+    *ret_ds_count = rrd.stat_head->ds_cnt;
+    *ret_ds_names = (char **) malloc (rrd.stat_head->ds_cnt * sizeof(char *));
+    if (*ret_ds_names == NULL) {
+        rrd_set_error ("malloc fetch ret_ds_names array");
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
+    memset (*ret_ds_names, 0, rrd.stat_head->ds_cnt * sizeof(char *));
 
-    if (((*last_ds) =
-         (char **) malloc(rrd.stat_head->ds_cnt * sizeof(char *))) == NULL) {
-        rrd_set_error("malloc fetch last_ds array");
-        goto err_free_ds_namv;
+    *ret_last_ds = (char **) malloc (rrd.stat_head->ds_cnt * sizeof(char *));
+    if (*ret_last_ds == NULL) {
+        rrd_set_error ("malloc fetch ret_last_ds array");
+        free (*ret_ds_names);
+        *ret_ds_names = NULL;
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
+    memset (*ret_last_ds, 0, rrd.stat_head->ds_cnt * sizeof(char *));
 
     for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
-        (*ds_namv)[i] = sprintf_alloc("%s", rrd.ds_def[i].ds_nam);
-        (*last_ds)[i] = sprintf_alloc("%s", rrd.pdp_prep[i].last_ds);
+        (*ret_ds_names)[i] = sprintf_alloc("%s", rrd.ds_def[i].ds_nam);
+        (*ret_last_ds)[i] = sprintf_alloc("%s", rrd.pdp_prep[i].last_ds);
+
+        if (((*ret_ds_names)[i] == NULL) || ((*ret_last_ds)[i] == NULL))
+            break;
+    }
+
+    /* Check if all names and values could be copied and free everything if
+     * not. */
+    if (i < rrd.stat_head->ds_cnt) {
+        rrd_set_error ("sprintf_alloc failed");
+        for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
+            if ((*ret_ds_names)[i] != NULL)
+            {
+                free ((*ret_ds_names)[i]);
+                (*ret_ds_names)[i] = NULL;
+            }
+            if ((*ret_last_ds)[i] != NULL)
+            {
+                free ((*ret_last_ds)[i]);
+                (*ret_last_ds)[i] = NULL;
+            }
+        }
+        free (*ret_ds_names);
+        *ret_ds_names = NULL;
+        free (*ret_last_ds);
+        *ret_last_ds = NULL;
+        rrd_close (rrd_file);
+        rrd_free (&rrd);
+        return (-1);
     }
 
     rrd_free(&rrd);
     rrd_close(rrd_file);
     return (0);
-
-  err_free_ds_namv:
-    free(*ds_namv);
-  err_close:
-    rrd_close(rrd_file);
-  err_free:
-    rrd_free(&rrd);
-  err_out:
-    return (-1);
-}
+} /* int rrd_lastupdate_r */
index d50e6081059cc14bed4d0b0e36ed8015188f886a..d054386074dcfe7915a370193d47a34c99ec785a 100644 (file)
@@ -55,7 +55,7 @@ void PrintUsage(
         N_
         ("Valid commands: create, update, updatev, graph, graphv,  dump, restore,\n"
          "\t\tlast, lastupdate, first, info, fetch, tune,\n"
-         "\t\tresize, xport\n\n");
+         "\t\tresize, xport, flush\n\n");
 
     const char *help_listremote =
         N_("Valid remote commands: quit, ls, cd, mkdir, pwd\n\n");
@@ -95,7 +95,8 @@ void PrintUsage(
     const char *help_update =
         N_("* update - update an RRD\n\n"
            "\trrdtool update filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
+          "\t\t[--daemon <address>]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -104,7 +105,7 @@ void PrintUsage(
         N_("* updatev - a verbose version of update\n"
            "\treturns information about values, RRAs, and datasources updated\n\n"
            "\trrdtool updatev filename\n"
-           "\t\t--template|-t ds-name:ds-name:...\n"
+           "\t\t[--template|-t ds-name:ds-name:...]\n"
            "\t\ttime|N:value[:value...]\n\n"
            "\t\tat-time@value[:value...]\n\n"
            "\t\t[ time:value[:value...] ..]\n\n");
@@ -113,7 +114,13 @@ void PrintUsage(
         N_("* fetch - fetch data out of an RRD\n\n"
            "\trrdtool fetch filename.rrd CF\n"
            "\t\t[-r|--resolution resolution]\n"
-           "\t\t[-s|--start start] [-e|--end end]\n\n");
+           "\t\t[-s|--start start] [-e|--end end]\n"
+          "\t\t[--daemon <address>]\n\n");
+
+    const char *help_flush =
+        N_("* flush - flush cached data out to an RRD file\n\n"
+           "\trrdtool flush filename.rrd\n"
+          "\t\t[--daemon <address>]\n\n");
 
 /* break up very large strings (help_graph, help_tune) for ISO C89 compliance*/
 
@@ -132,7 +139,7 @@ void PrintUsage(
            "\t\t[-h|--height pixels] [-o|--logarithmic]\n"
            "\t\t[-u|--upper-limit value] [-z|--lazy]\n"
            "\t\t[-l|--lower-limit value] [-r|--rigid]\n"
-           "\t\t[-g|--no-legend]\n"
+           "\t\t[-g|--no-legend] [--daemon <address>]\n"
            "\t\t[-F|--force-rules-legend]\n" "\t\t[-j|--only-graph]\n");
     const char *help_graph2 =
         N_("\t\t[-n|--font FONTTAG:size:font]\n"
@@ -217,7 +224,7 @@ void PrintUsage(
         C_LASTUPDATE, C_FIRST, C_UPDATE, C_FETCH, C_GRAPH, C_GRAPHV,
         C_TUNE,
         C_RESIZE, C_XPORT, C_QUIT, C_LS, C_CD, C_MKDIR, C_PWD,
-        C_UPDATEV
+        C_UPDATEV, C_FLUSH
     };
     int       help_cmd = C_NONE;
 
@@ -242,6 +249,8 @@ void PrintUsage(
             help_cmd = C_UPDATEV;
         else if (!strcmp(cmd, "fetch"))
             help_cmd = C_FETCH;
+        else if (!strcmp(cmd, "flush"))
+            help_cmd = C_FLUSH;
         else if (!strcmp(cmd, "graph"))
             help_cmd = C_GRAPH;
         else if (!strcmp(cmd, "graphv"))
@@ -302,6 +311,9 @@ void PrintUsage(
     case C_FETCH:
         fputs(_(help_fetch), stdout);
         break;
+    case C_FLUSH:
+        fputs(_(help_flush), stdout);
+        break;
     case C_GRAPH:
         fputs(_(help_graph0), stdout);
         fputs(_(help_graph1), stdout);
@@ -647,26 +659,7 @@ int HandleInputLine(
     else if (strcmp("last", argv[1]) == 0)
         printf("%ld\n", rrd_last(argc - 1, &argv[1]));
     else if (strcmp("lastupdate", argv[1]) == 0) {
-        time_t    last_update;
-        char    **ds_namv;
-        char    **last_ds;
-        unsigned long ds_cnt, i;
-
-        if (rrd_lastupdate(argc - 1, &argv[1], &last_update,
-                           &ds_cnt, &ds_namv, &last_ds) == 0) {
-            for (i = 0; i < ds_cnt; i++)
-                printf(" %s", ds_namv[i]);
-            printf("\n\n");
-            printf("%10lu:", last_update);
-            for (i = 0; i < ds_cnt; i++) {
-                printf(" %s", last_ds[i]);
-                free(last_ds[i]);
-                free(ds_namv[i]);
-            }
-            printf("\n");
-            free(last_ds);
-            free(ds_namv);
-        }
+        rrd_lastupdate(argc - 1, &argv[1]);
     } else if (strcmp("first", argv[1]) == 0)
         printf("%ld\n", rrd_first(argc - 1, &argv[1]));
     else if (strcmp("update", argv[1]) == 0)
@@ -815,6 +808,8 @@ int HandleInputLine(
 
     } else if (strcmp("tune", argv[1]) == 0)
         rrd_tune(argc - 1, &argv[1]);
+    else if (strcmp("flush", argv[1]) == 0)
+        rrd_cmd_flush(argc - 1, &argv[1]);
     else {
         rrd_set_error("unknown function '%s'", argv[1]);
     }
index 858b82b3ddc760158960f4e34fa0733a2e547141..4cfe07e7b5352b3511efd3be9960c4f04bd353c3 100644 (file)
@@ -83,15 +83,14 @@ extern    "C" {
     int       rrd_create_fn(
     const char *file_name,
     rrd_t *rrd);
-    int       rrd_fetch_fn(
-    const char *filename,
-    enum cf_en cf_idx,
-    time_t *start,
-    time_t *end,
-    unsigned long *step,
-    unsigned long *ds_cnt,
-    char ***ds_namv,
-    rrd_value_t **data);
+    int rrd_fetch_fn (const char *filename,
+            enum cf_en cf_idx,
+            time_t *start,
+            time_t *end,
+            unsigned long *step,
+            unsigned long *ds_cnt,
+            char ***ds_namv,
+            rrd_value_t **data);
 
 #define RRD_READONLY    (1<<0)
 #define RRD_READWRITE   (1<<1)
index d7ee4ac798ff612db5feb8cca78901826b13600e..2b4a29300844208026abfca9331bc6222c78cb8a 100644 (file)
@@ -1,6 +1,6 @@
-
 /*****************************************************************************
  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
+ *                Copyright by Florian Forster, 2008
  *****************************************************************************
  * rrd_update.c  RRD Update Function
  *****************************************************************************
@@ -23,6 +23,8 @@
 #include "rrd_is_thread_safe.h"
 #include "unused.h"
 
+#include "rrd_client.h"
+
 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
 /*
  * WIN32 does not have gettimeofday    and struct timeval. This is a quick and dirty
@@ -369,18 +371,20 @@ int rrd_update(
 {
     struct option long_options[] = {
         {"template", required_argument, 0, 't'},
+        {"daemon",   required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
     int       option_index = 0;
     int       opt;
     char     *tmplt = NULL;
     int       rc = -1;
+    char     *opt_daemon = NULL;
 
     optind = 0;
     opterr = 0;         /* initialize getopt */
 
     while (1) {
-        opt = getopt_long(argc, argv, "t:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -390,6 +394,17 @@ int rrd_update(
             tmplt = strdup(optarg);
             break;
 
+        case 'd':
+            if (opt_daemon != NULL)
+                free (opt_daemon);
+            opt_daemon = strdup (optarg);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                goto out;
+            }
+            break;
+
         case '?':
             rrd_set_error("unknown option '%s'", argv[optind - 1]);
             goto out;
@@ -402,10 +417,75 @@ int rrd_update(
         goto out;
     }
 
+    if ((tmplt != NULL) && (opt_daemon != NULL))
+    {
+        rrd_set_error("The caching opt_daemon cannot be used together with "
+                "templates yet.");
+        goto out;
+    }
+
+    if ((tmplt == NULL) && (opt_daemon == NULL))
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            opt_daemon = strdup (temp);
+            if (opt_daemon == NULL)
+            {
+                rrd_set_error("strdup failed.");
+                goto out;
+            }
+        }
+    }
+
+    if (opt_daemon != NULL)
+    {
+        int status;
+
+        status = rrdc_connect (opt_daemon);
+        if (status != 0)
+        {
+            rrd_set_error("Unable to connect to opt_daemon: %s",
+                    (status < 0)
+                    ? "Internal error"
+                    : rrd_strerror (status));
+            goto out;
+        }
+
+        status = rrdc_update (/* file = */ argv[optind],
+                /* values_num = */ argc - optind - 1,
+                /* values = */ (void *) (argv + optind + 1));
+        if (status != 0)
+        {
+            rrd_set_error("Failed sending the values to the opt_daemon: %s",
+                    (status < 0)
+                    ? "Internal error"
+                    : rrd_strerror (status));
+        }
+        else
+        {
+            rc = 0;
+        }
+
+        rrdc_disconnect ();
+        goto out;
+    } /* if (opt_daemon != NULL) */
+
     rc = rrd_update_r(argv[optind], tmplt,
                       argc - optind - 1, (const char **) (argv + optind + 1));
   out:
-    free(tmplt);
+    if (tmplt != NULL)
+    {
+        free(tmplt);
+        tmplt = NULL;
+    }
+    if (opt_daemon != NULL)
+    {
+        free (opt_daemon);
+        opt_daemon = NULL;
+    }
     return rc;
 }
 
index 0dc64861304a6dec9e37bffc015eb7183faa6886..2d0233d824c476aca80756c8cb5e4c286c7ac620 100644 (file)
@@ -10,6 +10,7 @@
 #include "rrd_graph.h"
 #include "rrd_xport.h"
 #include "unused.h"
+#include "rrd_client.h"
 
 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
 #include <io.h>
@@ -53,7 +54,6 @@ int rrd_xport(
     char ***legend_v,   /* legend entries */
     rrd_value_t **data)
 {                       /* two dimensional array containing the data */
-
     image_desc_t im;
     time_t    start_tmp = 0, end_tmp = 0;
     rrd_time_value_t start_tv, end_tv;
@@ -64,6 +64,7 @@ int rrd_xport(
         {"maxrows", required_argument, 0, 'm'},
         {"step", required_argument, 0, 261},
         {"enumds", no_argument, 0, 262},    /* these are handled in the frontend ... */
+        {"daemon", required_argument, 0, 'd'},
         {0, 0, 0, 0}
     };
 
@@ -79,7 +80,7 @@ int rrd_xport(
         int       option_index = 0;
         int       opt;
 
-        opt = getopt_long(argc, argv, "s:e:m:", long_options, &option_index);
+        opt = getopt_long(argc, argv, "s:e:m:d:", long_options, &option_index);
 
         if (opt == EOF)
             break;
@@ -109,6 +110,26 @@ int rrd_xport(
                 return -1;
             }
             break;
+        case 'd':
+        {
+            int status;
+            if (im.use_rrdcached)
+            {
+                rrd_set_error ("You cannot specify --daemon "
+                        "more than once.");
+                return (-1);
+            }
+            status = rrdc_connect (optarg);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        optarg, status);
+                return (-1);
+            }
+            im.use_rrdcached = 1;
+            break;
+        }
+
         case '?':
             rrd_set_error("unknown option '%s'", argv[optind - 1]);
             return -1;
@@ -147,6 +168,26 @@ int rrd_xport(
         return (-1);
     }
 
+    if (im.use_rrdcached == 0)
+    {
+        char *temp;
+
+        temp = getenv (ENV_RRDCACHED_ADDRESS);
+        if (temp != NULL)
+        {
+            int status;
+
+            status = rrdc_connect (temp);
+            if (status != 0)
+            {
+                rrd_set_error ("rrdc_connect(%s) failed with status %i.",
+                        temp, status);
+                return (-1);
+            }
+            im.use_rrdcached = 1;
+        }
+    }
+
     if (rrd_xport_fn(&im, start, end, step, col_cnt, legend_v, data) == -1) {
         im_free(&im);
         return -1;