Code

Imported upstream version 1.4.8
[pkg-rrdtool.git] / src / rrd_client.c
1 /**
2  * RRDTool - src/rrd_client.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to
7  * deal in the Software without restriction, including without limitation the
8  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9  * sell copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  * 
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  * 
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21  * IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at verplant.org>
25  *   Sebastian tokkee Harl <sh at tokkee.org>
26  **/
28 #include "rrd.h"
29 #include "rrd_tool.h"
30 #include "rrd_client.h"
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/un.h>
42 #include <netdb.h>
43 #include <limits.h>
45 #ifndef ENODATA
46 #define ENODATA ENOENT
47 #endif
49 struct rrdc_response_s
50 {
51   int status;
52   char *message;
53   char **lines;
54   size_t lines_num;
55 };
56 typedef struct rrdc_response_s rrdc_response_t;
58 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
59 static int sd = -1;
60 static FILE *sh = NULL;
61 static char *sd_path = NULL; /* cache the path for sd */
63 /* get_path: Return a path name appropriate to be sent to the daemon.
64  *
65  * When talking to a local daemon (thru a UNIX socket), relative path names
66  * are resolved to absolute path names to allow for transparent integration
67  * into existing solutions (as requested by Tobi). Else, absolute path names
68  * are not allowed, since path name translation is done by the server.
69  *
70  * One must hold `lock' when calling this function. */
71 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
72 {
73   const char *ret = path;
74   int is_unix = 0;
76   if ((path == NULL) || (resolved_path == NULL) || (sd_path == NULL))
77     return (NULL);
79   if ((*sd_path == '/')
80       || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
81     is_unix = 1;
83   if (is_unix)
84   {
85     ret = realpath(path, resolved_path);
86     if (ret == NULL)
87       rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno));
88     return ret;
89   }
90   else
91   {
92     if (*path == '/') /* not absolute path */
93     {
94       rrd_set_error ("absolute path names not allowed when talking "
95           "to a remote daemon");
96       return NULL;
97     }
98   }
100   return path;
101 } /* }}} char *get_path */
103 /* One must hold `lock' when calling `close_connection'. */
104 static void close_connection (void) /* {{{ */
106   if (sh != NULL)
107   {
108     fclose (sh);
109     sh = NULL;
110     sd = -1;
111   }
112   else if (sd >= 0)
113   {
114     close (sd);
115     sd = -1;
116   }
118   if (sd_path != NULL)
119     free (sd_path);
120   sd_path = NULL;
121 } /* }}} void close_connection */
123 static int buffer_add_string (const char *str, /* {{{ */
124     char **buffer_ret, size_t *buffer_size_ret)
126   char *buffer;
127   size_t buffer_size;
128   size_t buffer_pos;
129   size_t i;
130   int status;
132   buffer = *buffer_ret;
133   buffer_size = *buffer_size_ret;
134   buffer_pos = 0;
136   i = 0;
137   status = -1;
138   while (buffer_pos < buffer_size)
139   {
140     if (str[i] == 0)
141     {
142       buffer[buffer_pos] = ' ';
143       buffer_pos++;
144       status = 0;
145       break;
146     }
147     else if ((str[i] == ' ') || (str[i] == '\\'))
148     {
149       if (buffer_pos >= (buffer_size - 1))
150         break;
151       buffer[buffer_pos] = '\\';
152       buffer_pos++;
153       buffer[buffer_pos] = str[i];
154       buffer_pos++;
155     }
156     else
157     {
158       buffer[buffer_pos] = str[i];
159       buffer_pos++;
160     }
161     i++;
162   } /* while (buffer_pos < buffer_size) */
164   if (status != 0)
165     return (-1);
167   *buffer_ret = buffer + buffer_pos;
168   *buffer_size_ret = buffer_size - buffer_pos;
170   return (0);
171 } /* }}} int buffer_add_string */
173 static int buffer_add_value (const char *value, /* {{{ */
174     char **buffer_ret, size_t *buffer_size_ret)
176   char temp[4096];
178   if (strncmp (value, "N:", 2) == 0)
179     snprintf (temp, sizeof (temp), "%lu:%s",
180         (unsigned long) time (NULL), value + 2);
181   else
182     strncpy (temp, value, sizeof (temp));
183   temp[sizeof (temp) - 1] = 0;
185   return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
186 } /* }}} int buffer_add_value */
188 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
189  * the Perl function `chomp'. Returns the number of characters that have been
190  * removed. */
191 static int chomp (char *str) /* {{{ */
193   size_t len;
194   int removed;
196   if (str == NULL)
197     return (-1);
199   len = strlen (str);
200   removed = 0;
201   while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
202   {
203     str[len - 1] = 0;
204     len--;
205     removed++;
206   }
208   return (removed);
209 } /* }}} int chomp */
211 static void response_free (rrdc_response_t *res) /* {{{ */
213   if (res == NULL)
214     return;
216   if (res->lines != NULL)
217   {
218     size_t i;
220     for (i = 0; i < res->lines_num; i++)
221       if (res->lines[i] != NULL)
222         free (res->lines[i]);
223     free (res->lines);
224   }
226   free (res);
227 } /* }}} void response_free */
229 static int response_read (rrdc_response_t **ret_response) /* {{{ */
231   rrdc_response_t *ret = NULL;
232   int status = 0;
234   char buffer[4096];
235   char *buffer_ptr;
237   size_t i;
239 #define DIE(code) do { status = code; goto err_out; } while(0)
241   if (sh == NULL)
242     DIE(-1);
244   ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
245   if (ret == NULL)
246     DIE(-2);
247   memset (ret, 0, sizeof (*ret));
248   ret->lines = NULL;
249   ret->lines_num = 0;
251   buffer_ptr = fgets (buffer, sizeof (buffer), sh);
252   if (buffer_ptr == NULL)
253     DIE(-3);
255   chomp (buffer);
257   ret->status = strtol (buffer, &ret->message, 0);
258   if (buffer == ret->message)
259     DIE(-4);
261   /* Skip leading whitespace of the status message */
262   ret->message += strspn (ret->message, " \t");
264   if (ret->status <= 0)
265   {
266     if (ret->status < 0)
267       rrd_set_error("rrdcached: %s", ret->message);
268     goto out;
269   }
271   ret->lines = (char **) malloc (sizeof (char *) * ret->status);
272   if (ret->lines == NULL)
273     DIE(-5);
275   memset (ret->lines, 0, sizeof (char *) * ret->status);
276   ret->lines_num = (size_t) ret->status;
278   for (i = 0; i < ret->lines_num; i++)
279   {
280     buffer_ptr = fgets (buffer, sizeof (buffer), sh);
281     if (buffer_ptr == NULL)
282       DIE(-6);
284     chomp (buffer);
286     ret->lines[i] = strdup (buffer);
287     if (ret->lines[i] == NULL)
288       DIE(-7);
289   }
291 out:
292   *ret_response = ret;
293   fflush(sh);
294   return (status);
296 err_out:
297   response_free(ret);
298   close_connection();
299   return (status);
301 #undef DIE
303 } /* }}} rrdc_response_t *response_read */
305 static int request (const char *buffer, size_t buffer_size, /* {{{ */
306     rrdc_response_t **ret_response)
308   int status;
309   rrdc_response_t *res;
311   if (sh == NULL)
312     return (ENOTCONN);
314   status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
315   if (status != 1)
316   {
317     close_connection ();
318     rrd_set_error("request: socket error (%d) while talking to rrdcached",
319                   status);
320     return (-1);
321   }
322   fflush (sh);
324   res = NULL;
325   status = response_read (&res);
327   if (status != 0)
328   {
329     if (status < 0)
330       rrd_set_error("request: internal error while talking to rrdcached");
331     return (status);
332   }
334   *ret_response = res;
335   return (0);
336 } /* }}} int request */
338 /* determine whether we are connected to the specified daemon_addr if
339  * NULL, return whether we are connected at all
340  */
341 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
343   if (sd < 0)
344     return 0;
345   else if (daemon_addr == NULL)
346   {
347     /* here we have to handle the case i.e.
348      *   UPDATE --daemon ...; UPDATEV (no --daemon) ...
349      * In other words: we have a cached connection,
350      * but it is not specified in the current command.
351      * Daemon is only implied in this case if set in ENV
352      */
353     char *addr = getenv(ENV_RRDCACHED_ADDRESS);
354     if (addr != NULL && strcmp(addr,"") != 0)
355       return 1;
356     else
357       return 0;
358   }
359   else if (strcmp(daemon_addr, sd_path) == 0)
360     return 1;
361   else
362     return 0;
364 } /* }}} int rrdc_is_connected */
366 static int rrdc_connect_unix (const char *path) /* {{{ */
368   struct sockaddr_un sa;
369   int status;
371   assert (path != NULL);
372   assert (sd == -1);
374   sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
375   if (sd < 0)
376   {
377     status = errno;
378     return (status);
379   }
381   memset (&sa, 0, sizeof (sa));
382   sa.sun_family = AF_UNIX;
383   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
385   status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
386   if (status != 0)
387   {
388     status = errno;
389     close_connection ();
390     return (status);
391   }
393   sh = fdopen (sd, "r+");
394   if (sh == NULL)
395   {
396     status = errno;
397     close_connection ();
398     return (status);
399   }
401   return (0);
402 } /* }}} int rrdc_connect_unix */
404 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
406   struct addrinfo ai_hints;
407   struct addrinfo *ai_res;
408   struct addrinfo *ai_ptr;
409   char addr_copy[NI_MAXHOST];
410   char *addr;
411   char *port;
413   assert (addr_orig != NULL);
414   assert (sd == -1);
416   strncpy(addr_copy, addr_orig, sizeof(addr_copy));
417   addr_copy[sizeof(addr_copy) - 1] = '\0';
418   addr = addr_copy;
420   int status;
421   memset (&ai_hints, 0, sizeof (ai_hints));
422   ai_hints.ai_flags = 0;
423 #ifdef AI_ADDRCONFIG
424   ai_hints.ai_flags |= AI_ADDRCONFIG;
425 #endif
426   ai_hints.ai_family = AF_UNSPEC;
427   ai_hints.ai_socktype = SOCK_STREAM;
429   port = NULL;
430   if (*addr == '[') /* IPv6+port format */
431   {
432     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
433     addr++;
435     port = strchr (addr, ']');
436     if (port == NULL)
437     {
438       rrd_set_error("malformed address: %s", addr_orig);
439       return (-1);
440     }
441     *port = 0;
442     port++;
444     if (*port == ':')
445       port++;
446     else if (*port == 0)
447       port = NULL;
448     else
449     {
450       rrd_set_error("garbage after address: %s", port);
451       return (-1);
452     }
453   } /* if (*addr == '[') */
454   else
455   {
456     port = rindex(addr, ':');
457     if (port != NULL)
458     {
459       *port = 0;
460       port++;
461     }
462   }
464   ai_res = NULL;
465   status = getaddrinfo (addr,
466                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
467                         &ai_hints, &ai_res);
468   if (status != 0)
469   {
470     rrd_set_error ("failed to resolve address `%s' (port %s): %s",
471         addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
472         gai_strerror (status));
473     return (-1);
474   }
476   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
477   {
478     sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
479     if (sd < 0)
480     {
481       status = errno;
482       sd = -1;
483       continue;
484     }
486     status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
487     if (status != 0)
488     {
489       status = errno;
490       close_connection();
491       continue;
492     }
494     sh = fdopen (sd, "r+");
495     if (sh == NULL)
496     {
497       status = errno;
498       close_connection ();
499       continue;
500     }
502     assert (status == 0);
503     break;
504   } /* for (ai_ptr) */
506   freeaddrinfo(ai_res);
508   return (status);
509 } /* }}} int rrdc_connect_network */
511 int rrdc_connect (const char *addr) /* {{{ */
513   int status = 0;
515   if (addr == NULL) {
516     addr = getenv (ENV_RRDCACHED_ADDRESS);
517   }
519   if (addr == NULL || strcmp(addr,"") == 0 ) {
520     addr = NULL;
521     return 0;   
522   }
524   pthread_mutex_lock(&lock);
526   if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
527   {
528     /* connection to the same daemon; use cached connection */
529     pthread_mutex_unlock (&lock);
530     return (0);
531   }
532   else
533   {
534     close_connection();
535   }
537   rrd_clear_error ();
538   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
539     status = rrdc_connect_unix (addr + strlen ("unix:"));
540   else if (addr[0] == '/')
541     status = rrdc_connect_unix (addr);
542   else
543     status = rrdc_connect_network(addr);
545   if (status == 0 && sd >= 0)
546     sd_path = strdup(addr);
547   else
548   {
549     char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
550     /* err points the string that gets written to by rrd_set_error(), thus we
551      * cannot pass it to that function */
552     err = strdup (err);
553     rrd_set_error("Unable to connect to rrdcached: %s",
554                   (status < 0)
555                   ? (err ? err : "Internal error")
556                   : rrd_strerror (status));
557     if (err != NULL)
558       free (err);
559   }
561   pthread_mutex_unlock (&lock);
562   return (status);
563 } /* }}} int rrdc_connect */
565 int rrdc_disconnect (void) /* {{{ */
567   pthread_mutex_lock (&lock);
569   close_connection();
571   pthread_mutex_unlock (&lock);
573   return (0);
574 } /* }}} int rrdc_disconnect */
576 int rrdc_update (const char *filename, int values_num, /* {{{ */
577                 const char * const *values)
579   char buffer[4096];
580   char *buffer_ptr;
581   size_t buffer_free;
582   size_t buffer_size;
583   rrdc_response_t *res;
584   int status;
585   int i;
586   char file_path[PATH_MAX];
588   memset (buffer, 0, sizeof (buffer));
589   buffer_ptr = &buffer[0];
590   buffer_free = sizeof (buffer);
592   status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
593   if (status != 0)
594     return (ENOBUFS);
596   pthread_mutex_lock (&lock);
597   filename = get_path (filename, file_path);
598   if (filename == NULL)
599   {
600     pthread_mutex_unlock (&lock);
601     return (-1);
602   }
604   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
605   if (status != 0)
606   {
607     pthread_mutex_unlock (&lock);
608     return (ENOBUFS);
609   }
611   for (i = 0; i < values_num; i++)
612   {
613     status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
614     if (status != 0)
615     {
616       pthread_mutex_unlock (&lock);
617       return (ENOBUFS);
618     }
619   }
621   assert (buffer_free < sizeof (buffer));
622   buffer_size = sizeof (buffer) - buffer_free;
623   assert (buffer[buffer_size - 1] == ' ');
624   buffer[buffer_size - 1] = '\n';
626   res = NULL;
627   status = request (buffer, buffer_size, &res);
628   pthread_mutex_unlock (&lock);
630   if (status != 0)
631     return (status);
633   status = res->status;
634   response_free (res);
636   return (status);
637 } /* }}} int rrdc_update */
639 int rrdc_flush (const char *filename) /* {{{ */
641   char buffer[4096];
642   char *buffer_ptr;
643   size_t buffer_free;
644   size_t buffer_size;
645   rrdc_response_t *res;
646   int status;
647   char file_path[PATH_MAX];
649   if (filename == NULL)
650     return (-1);
652   memset (buffer, 0, sizeof (buffer));
653   buffer_ptr = &buffer[0];
654   buffer_free = sizeof (buffer);
656   status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
657   if (status != 0)
658     return (ENOBUFS);
660   pthread_mutex_lock (&lock);
661   filename = get_path (filename, file_path);
662   if (filename == NULL)
663   {
664     pthread_mutex_unlock (&lock);
665     return (-1);
666   }
668   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
669   if (status != 0)
670   {
671     pthread_mutex_unlock (&lock);
672     return (ENOBUFS);
673   }
675   assert (buffer_free < sizeof (buffer));
676   buffer_size = sizeof (buffer) - buffer_free;
677   assert (buffer[buffer_size - 1] == ' ');
678   buffer[buffer_size - 1] = '\n';
680   res = NULL;
681   status = request (buffer, buffer_size, &res);
682   pthread_mutex_unlock (&lock);
684   if (status != 0)
685     return (status);
687   status = res->status;
688   response_free (res);
690   return (status);
691 } /* }}} int rrdc_flush */
694 /* convenience function; if there is a daemon specified, or if we can
695  * detect one from the environment, then flush the file.  Otherwise, no-op
696  */
697 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
699   int status = 0;
701   rrdc_connect(opt_daemon);
703   if (rrdc_is_connected(opt_daemon))
704   {
705     rrd_clear_error();
706     status = rrdc_flush (filename);
708     if (status != 0 && !rrd_test_error())
709     {
710       if (status > 0)
711       {
712         rrd_set_error("rrdc_flush (%s) failed: %s",
713                       filename, rrd_strerror(status));
714       }
715       else if (status < 0)
716       {
717         rrd_set_error("rrdc_flush (%s) failed with status %i.",
718                       filename, status);
719       }
720     }
721   } /* if (rrdc_is_connected(..)) */
723   return status;
724 } /* }}} int rrdc_flush_if_daemon */
727 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
729   rrdc_stats_t *head;
730   rrdc_stats_t *tail;
732   rrdc_response_t *res;
734   int status;
735   size_t i;
737   /* Protocol example: {{{
738    * ->  STATS
739    * <-  5 Statistics follow
740    * <-  QueueLength: 0
741    * <-  UpdatesWritten: 0
742    * <-  DataSetsWritten: 0
743    * <-  TreeNodesNumber: 0
744    * <-  TreeDepth: 0
745    * }}} */
747   res = NULL;
748   pthread_mutex_lock (&lock);
749   status = request ("STATS\n", strlen ("STATS\n"), &res);
750   pthread_mutex_unlock (&lock);
752   if (status != 0)
753     return (status);
755   if (res->status <= 0)
756   {
757     response_free (res);
758     return (EIO);
759   }
761   head = NULL;
762   tail = NULL;
763   for (i = 0; i < res->lines_num; i++)
764   {
765     char *key;
766     char *value;
767     char *endptr;
768     rrdc_stats_t *s;
770     key = res->lines[i];
771     value = strchr (key, ':');
772     if (value == NULL)
773       continue;
774     *value = 0;
775     value++;
777     while ((value[0] == ' ') || (value[0] == '\t'))
778       value++;
780     s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
781     if (s == NULL)
782       continue;
783     memset (s, 0, sizeof (*s));
785     s->name = strdup (key);
787     endptr = NULL;
788     if ((strcmp ("QueueLength", key) == 0)
789         || (strcmp ("TreeDepth", key) == 0)
790         || (strcmp ("TreeNodesNumber", key) == 0))
791     {
792       s->type = RRDC_STATS_TYPE_GAUGE;
793       s->value.gauge = strtod (value, &endptr);
794     }
795     else if ((strcmp ("DataSetsWritten", key) == 0)
796         || (strcmp ("FlushesReceived", key) == 0)
797         || (strcmp ("JournalBytes", key) == 0)
798         || (strcmp ("JournalRotate", key) == 0)
799         || (strcmp ("UpdatesReceived", key) == 0)
800         || (strcmp ("UpdatesWritten", key) == 0))
801     {
802       s->type = RRDC_STATS_TYPE_COUNTER;
803       s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
804     }
805     else
806     {
807       free (s);
808       continue;
809     }
811     /* Conversion failed */
812     if (endptr == value)
813     {
814       free (s);
815       continue;
816     }
818     if (head == NULL)
819     {
820       head = s;
821       tail = s;
822       s->next = NULL;
823     }
824     else
825     {
826       tail->next = s;
827       tail = s;
828     }
829   } /* for (i = 0; i < res->lines_num; i++) */
831   response_free (res);
833   if (head == NULL)
834 #ifdef EPROTO
835     return (EPROTO);
836 #else
837     return (EINVAL);
838 #endif
840   *ret_stats = head;
841   return (0);
842 } /* }}} int rrdc_stats_get */
844 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
846   rrdc_stats_t *this;
848   this = ret_stats;
849   while (this != NULL)
850   {
851     rrdc_stats_t *next;
853     next = this->next;
855     if (this->name != NULL)
856     {
857       free ((char *)this->name);
858       this->name = NULL;
859     }
860     free (this);
862     this = next;
863   } /* while (this != NULL) */
864 } /* }}} void rrdc_stats_free */
866 /*
867  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
868  */