Code

18dd0dde44be11f85277a9310353b65f3fe4b8e4
[rrdtool-all.git] / program / src / rrd_client.c
1 /**
2  * RRDTool - src/rrd_client.c
3  * Copyright (C) 2008 Florian octo Forster
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a copy
6  * of this software and associated documentation files (the "Software"), to
7  * deal in the Software without restriction, including without limitation the
8  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9  * sell copies of the Software, and to permit persons to whom the Software is
10  * furnished to do so, subject to the following conditions:
11  * 
12  * The above copyright notice and this permission notice shall be included in
13  * all copies or substantial portions of the Software.
14  * 
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21  * IN THE SOFTWARE.
22  *
23  * Authors:
24  *   Florian octo Forster <octo at verplant.org>
25  *   Sebastian tokkee Harl <sh at tokkee.org>
26  **/
28 #include "rrd.h"
29 #include "rrd_tool.h"
30 #include "rrd_client.h"
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/un.h>
42 #include <netdb.h>
43 #include <limits.h>
45 #ifndef ENODATA
46 #define ENODATA ENOENT
47 #endif
49 struct rrdc_response_s
50 {
51   int status;
52   char *message;
53   char **lines;
54   size_t lines_num;
55 };
56 typedef struct rrdc_response_s rrdc_response_t;
58 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
59 static int sd = -1;
60 static FILE *sh = NULL;
61 static char *sd_path = NULL; /* cache the path for sd */
63 /* get_path: Return a path name appropriate to be sent to the daemon.
64  *
65  * When talking to a local daemon (thru a UNIX socket), relative path names
66  * are resolved to absolute path names to allow for transparent integration
67  * into existing solutions (as requested by Tobi). Else, absolute path names
68  * are not allowed, since path name translation is done by the server.
69  *
70  * One must hold `lock' when calling this function. */
71 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
72 {
73   const char *ret = path;
74   int is_unix = 0;
76   if ((*sd_path == '/')
77       || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
78     is_unix = 1;
80   if (is_unix)
81   {
82     ret = realpath(path, resolved_path);
83     if (ret == NULL)
84       rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno));
85     return ret;
86   }
87   else
88   {
89     if (*path == '/') /* not absolute path */
90     {
91       rrd_set_error ("absolute path names not allowed when talking "
92           "to a remote daemon");
93       return NULL;
94     }
95   }
97   return path;
98 } /* }}} char *get_path */
100 /* One must hold `lock' when calling `close_connection'. */
101 static void close_connection (void) /* {{{ */
103   if (sh != NULL)
104   {
105     fclose (sh);
106     sh = NULL;
107     sd = -1;
108   }
109   else if (sd >= 0)
110   {
111     close (sd);
112     sd = -1;
113   }
115   if (sd_path != NULL)
116     free (sd_path);
117   sd_path = NULL;
118 } /* }}} void close_connection */
120 static int buffer_add_string (const char *str, /* {{{ */
121     char **buffer_ret, size_t *buffer_size_ret)
123   char *buffer;
124   size_t buffer_size;
125   size_t buffer_pos;
126   size_t i;
127   int status;
129   buffer = *buffer_ret;
130   buffer_size = *buffer_size_ret;
131   buffer_pos = 0;
133   i = 0;
134   status = -1;
135   while (buffer_pos < buffer_size)
136   {
137     if (str[i] == 0)
138     {
139       buffer[buffer_pos] = ' ';
140       buffer_pos++;
141       status = 0;
142       break;
143     }
144     else if ((str[i] == ' ') || (str[i] == '\\'))
145     {
146       if (buffer_pos >= (buffer_size - 1))
147         break;
148       buffer[buffer_pos] = '\\';
149       buffer_pos++;
150       buffer[buffer_pos] = str[i];
151       buffer_pos++;
152     }
153     else
154     {
155       buffer[buffer_pos] = str[i];
156       buffer_pos++;
157     }
158     i++;
159   } /* while (buffer_pos < buffer_size) */
161   if (status != 0)
162     return (-1);
164   *buffer_ret = buffer + buffer_pos;
165   *buffer_size_ret = buffer_size - buffer_pos;
167   return (0);
168 } /* }}} int buffer_add_string */
170 static int buffer_add_value (const char *value, /* {{{ */
171     char **buffer_ret, size_t *buffer_size_ret)
173   char temp[4096];
175   if (strncmp (value, "N:", 2) == 0)
176     snprintf (temp, sizeof (temp), "%lu:%s",
177         (unsigned long) time (NULL), value + 2);
178   else
179     strncpy (temp, value, sizeof (temp));
180   temp[sizeof (temp) - 1] = 0;
182   return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
183 } /* }}} int buffer_add_value */
185 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
186  * the Perl function `chomp'. Returns the number of characters that have been
187  * removed. */
188 static int chomp (char *str) /* {{{ */
190   size_t len;
191   int removed;
193   if (str == NULL)
194     return (-1);
196   len = strlen (str);
197   removed = 0;
198   while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
199   {
200     str[len - 1] = 0;
201     len--;
202     removed++;
203   }
205   return (removed);
206 } /* }}} int chomp */
208 static void response_free (rrdc_response_t *res) /* {{{ */
210   if (res == NULL)
211     return;
213   if (res->lines != NULL)
214   {
215     size_t i;
217     for (i = 0; i < res->lines_num; i++)
218       if (res->lines[i] != NULL)
219         free (res->lines[i]);
220     free (res->lines);
221   }
223   free (res);
224 } /* }}} void response_free */
226 static int response_read (rrdc_response_t **ret_response) /* {{{ */
228   rrdc_response_t *ret;
230   char buffer[4096];
231   char *buffer_ptr;
233   size_t i;
235   if (sh == NULL)
236     return (-1);
238   ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
239   if (ret == NULL)
240     return (-2);
241   memset (ret, 0, sizeof (*ret));
242   ret->lines = NULL;
243   ret->lines_num = 0;
245   buffer_ptr = fgets (buffer, sizeof (buffer), sh);
246   if (buffer_ptr == NULL) {
247     close_connection();
248     return (-3);
249   }
250   chomp (buffer);
252   ret->status = strtol (buffer, &ret->message, 0);
253   if (buffer == ret->message)
254   {
255     response_free (ret);
256     close_connection();
257     return (-4);
258   }
259   /* Skip leading whitespace of the status message */
260   ret->message += strspn (ret->message, " \t");
262   if (ret->status <= 0)
263   {
264     if (ret->status < 0)
265       rrd_set_error("rrdcached: %s", ret->message);
266     *ret_response = ret;
267     return (0);
268   }
270   ret->lines = (char **) malloc (sizeof (char *) * ret->status);
271   if (ret->lines == NULL)
272   {
273     response_free (ret);
274     close_connection();
275     return (-5);
276   }
277   memset (ret->lines, 0, sizeof (char *) * ret->status);
278   ret->lines_num = (size_t) ret->status;
280   for (i = 0; i < ret->lines_num; i++)
281   {
282     buffer_ptr = fgets (buffer, sizeof (buffer), sh);
283     if (buffer_ptr == NULL)
284     {
285       response_free (ret);
286       close_connection();
287       return (-6);
288     }
289     chomp (buffer);
291     ret->lines[i] = strdup (buffer);
292     if (ret->lines[i] == NULL)
293     {
294       response_free (ret);
295       close_connection();
296       return (-7);
297     }
298   }
300   *ret_response = ret;
301   return (0);
302 } /* }}} rrdc_response_t *response_read */
304 static int request (const char *buffer, size_t buffer_size, /* {{{ */
305     rrdc_response_t **ret_response)
307   int status;
308   rrdc_response_t *res;
310   if (sh == NULL)
311     return (ENOTCONN);
313   status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
314   if (status != 1)
315   {
316     close_connection ();
317     rrd_set_error("request: socket error (%d) while talking to rrdcached",
318                   status);
319     return (-1);
320   }
321   fflush (sh);
323   res = NULL;
324   status = response_read (&res);
326   if (status != 0)
327   {
328     if (status < 0)
329       rrd_set_error("request: internal error while talking to rrdcached");
330     return (status);
331   }
333   *ret_response = res;
334   return (0);
335 } /* }}} int request */
337 /* determine whether we are connected to the specified daemon_addr if
338  * NULL, return whether we are connected at all
339  */
340 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
342   if (sd < 0)
343     return 0;
344   else if (daemon_addr == NULL)
345   {
346     /* here we have to handle the case i.e.
347      *   UPDATE --daemon ...; UPDATEV (no --daemon) ...
348      * In other words: we have a cached connection,
349      * but it is not specified in the current command.
350      * Daemon is only implied in this case if set in ENV
351      */
352     if (getenv(ENV_RRDCACHED_ADDRESS) != NULL)
353       return 1;
354     else
355       return 0;
356   }
357   else if (strcmp(daemon_addr, sd_path) == 0)
358     return 1;
359   else
360     return 0;
362 } /* }}} int rrdc_is_connected */
364 static int rrdc_connect_unix (const char *path) /* {{{ */
366   struct sockaddr_un sa;
367   int status;
369   assert (path != NULL);
370   assert (sd == -1);
372   sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
373   if (sd < 0)
374   {
375     status = errno;
376     return (status);
377   }
379   memset (&sa, 0, sizeof (sa));
380   sa.sun_family = AF_UNIX;
381   strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
383   status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
384   if (status != 0)
385   {
386     status = errno;
387     close_connection ();
388     return (status);
389   }
391   sh = fdopen (sd, "r+");
392   if (sh == NULL)
393   {
394     status = errno;
395     close_connection ();
396     return (status);
397   }
399   return (0);
400 } /* }}} int rrdc_connect_unix */
402 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
404   struct addrinfo ai_hints;
405   struct addrinfo *ai_res;
406   struct addrinfo *ai_ptr;
407   char addr_copy[NI_MAXHOST];
408   char *addr;
409   char *port;
411   assert (addr_orig != NULL);
412   assert (sd == -1);
414   strncpy(addr_copy, addr_orig, sizeof(addr_copy));
415   addr_copy[sizeof(addr_copy) - 1] = '\0';
416   addr = addr_copy;
418   int status;
419   memset (&ai_hints, 0, sizeof (ai_hints));
420   ai_hints.ai_flags = 0;
421 #ifdef AI_ADDRCONFIG
422   ai_hints.ai_flags |= AI_ADDRCONFIG;
423 #endif
424   ai_hints.ai_family = AF_UNSPEC;
425   ai_hints.ai_socktype = SOCK_STREAM;
427   port = NULL;
428   if (*addr == '[') /* IPv6+port format */
429   {
430     /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
431     addr++;
433     port = strchr (addr, ']');
434     if (port == NULL)
435     {
436       rrd_set_error("malformed address: %s", addr_orig);
437       return (-1);
438     }
439     *port = 0;
440     port++;
442     if (*port == ':')
443       port++;
444     else if (*port == 0)
445       port = NULL;
446     else
447     {
448       rrd_set_error("garbage after address: %s", port);
449       return (-1);
450     }
451   } /* if (*addr == '[') */
452   else
453   {
454     port = rindex(addr, ':');
455     if (port != NULL)
456     {
457       *port = 0;
458       port++;
459     }
460   }
462   ai_res = NULL;
463   status = getaddrinfo (addr,
464                         port == NULL ? RRDCACHED_DEFAULT_PORT : port,
465                         &ai_hints, &ai_res);
466   if (status != 0)
467   {
468     rrd_set_error ("failed to resolve address `%s' (port %s): %s",
469         addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
470         gai_strerror (status));
471     return (-1);
472   }
474   for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
475   {
476     sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
477     if (sd < 0)
478     {
479       status = errno;
480       sd = -1;
481       continue;
482     }
484     status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
485     if (status != 0)
486     {
487       status = errno;
488       close_connection();
489       continue;
490     }
492     sh = fdopen (sd, "r+");
493     if (sh == NULL)
494     {
495       status = errno;
496       close_connection ();
497       continue;
498     }
500     assert (status == 0);
501     break;
502   } /* for (ai_ptr) */
504   return (status);
505 } /* }}} int rrdc_connect_network */
507 int rrdc_connect (const char *addr) /* {{{ */
509   int status = 0;
511   if (addr == NULL)
512     addr = getenv (ENV_RRDCACHED_ADDRESS);
514   if (addr == NULL)
515     return 0;
517   pthread_mutex_lock(&lock);
519   if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
520   {
521     /* connection to the same daemon; use cached connection */
522     pthread_mutex_unlock (&lock);
523     return (0);
524   }
525   else
526   {
527     close_connection();
528   }
530   rrd_clear_error ();
531   if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
532     status = rrdc_connect_unix (addr + strlen ("unix:"));
533   else if (addr[0] == '/')
534     status = rrdc_connect_unix (addr);
535   else
536     status = rrdc_connect_network(addr);
538   if (status == 0 && sd >= 0)
539     sd_path = strdup(addr);
540   else
541   {
542     char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
543     /* err points the string that gets written to by rrd_set_error(), thus we
544      * cannot pass it to that function */
545     err = strdup (err);
546     rrd_set_error("Unable to connect to rrdcached: %s",
547                   (status < 0)
548                   ? (err ? err : "Internal error")
549                   : rrd_strerror (status));
550     if (err != NULL)
551       free (err);
552   }
554   pthread_mutex_unlock (&lock);
555   return (status);
556 } /* }}} int rrdc_connect */
558 int rrdc_disconnect (void) /* {{{ */
560   pthread_mutex_lock (&lock);
562   close_connection();
564   pthread_mutex_unlock (&lock);
566   return (0);
567 } /* }}} int rrdc_disconnect */
569 int rrdc_update (const char *filename, int values_num, /* {{{ */
570                 const char * const *values)
572   char buffer[4096];
573   char *buffer_ptr;
574   size_t buffer_free;
575   size_t buffer_size;
576   rrdc_response_t *res;
577   int status;
578   int i;
579   char file_path[PATH_MAX];
581   memset (buffer, 0, sizeof (buffer));
582   buffer_ptr = &buffer[0];
583   buffer_free = sizeof (buffer);
585   status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
586   if (status != 0)
587     return (ENOBUFS);
589   pthread_mutex_lock (&lock);
590   filename = get_path (filename, file_path);
591   if (filename == NULL)
592   {
593     pthread_mutex_unlock (&lock);
594     return (-1);
595   }
597   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
598   if (status != 0)
599   {
600     pthread_mutex_unlock (&lock);
601     return (ENOBUFS);
602   }
604   for (i = 0; i < values_num; i++)
605   {
606     status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
607     if (status != 0)
608     {
609       pthread_mutex_unlock (&lock);
610       return (ENOBUFS);
611     }
612   }
614   assert (buffer_free < sizeof (buffer));
615   buffer_size = sizeof (buffer) - buffer_free;
616   assert (buffer[buffer_size - 1] == ' ');
617   buffer[buffer_size - 1] = '\n';
619   res = NULL;
620   status = request (buffer, buffer_size, &res);
621   pthread_mutex_unlock (&lock);
623   if (status != 0)
624     return (status);
626   status = res->status;
627   response_free (res);
629   return (status);
630 } /* }}} int rrdc_update */
632 int rrdc_flush (const char *filename) /* {{{ */
634   char buffer[4096];
635   char *buffer_ptr;
636   size_t buffer_free;
637   size_t buffer_size;
638   rrdc_response_t *res;
639   int status;
640   char file_path[PATH_MAX];
642   if (filename == NULL)
643     return (-1);
645   memset (buffer, 0, sizeof (buffer));
646   buffer_ptr = &buffer[0];
647   buffer_free = sizeof (buffer);
649   status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
650   if (status != 0)
651     return (ENOBUFS);
653   pthread_mutex_lock (&lock);
654   filename = get_path (filename, file_path);
655   if (filename == NULL)
656   {
657     pthread_mutex_unlock (&lock);
658     return (-1);
659   }
661   status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
662   if (status != 0)
663   {
664     pthread_mutex_unlock (&lock);
665     return (ENOBUFS);
666   }
668   assert (buffer_free < sizeof (buffer));
669   buffer_size = sizeof (buffer) - buffer_free;
670   assert (buffer[buffer_size - 1] == ' ');
671   buffer[buffer_size - 1] = '\n';
673   res = NULL;
674   status = request (buffer, buffer_size, &res);
675   pthread_mutex_unlock (&lock);
677   if (status != 0)
678     return (status);
680   status = res->status;
681   response_free (res);
683   return (status);
684 } /* }}} int rrdc_flush */
687 /* convenience function; if there is a daemon specified, or if we can
688  * detect one from the environment, then flush the file.  Otherwise, no-op
689  */
690 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
692   int status = 0;
694   rrdc_connect(opt_daemon);
696   if (rrdc_is_connected(opt_daemon))
697   {
698     rrd_clear_error();
699     status = rrdc_flush (filename);
701     if (status != 0 && !rrd_test_error())
702     {
703       if (status > 0)
704       {
705         rrd_set_error("rrdc_flush (%s) failed: %s",
706                       filename, rrd_strerror(status));
707       }
708       else if (status < 0)
709       {
710         rrd_set_error("rrdc_flush (%s) failed with status %i.",
711                       filename, status);
712       }
713     }
714   } /* if (rrdc_is_connected(..)) */
716   return status;
717 } /* }}} int rrdc_flush_if_daemon */
720 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
722   rrdc_stats_t *head;
723   rrdc_stats_t *tail;
725   rrdc_response_t *res;
727   int status;
728   size_t i;
730   /* Protocol example: {{{
731    * ->  STATS
732    * <-  5 Statistics follow
733    * <-  QueueLength: 0
734    * <-  UpdatesWritten: 0
735    * <-  DataSetsWritten: 0
736    * <-  TreeNodesNumber: 0
737    * <-  TreeDepth: 0
738    * }}} */
740   res = NULL;
741   pthread_mutex_lock (&lock);
742   status = request ("STATS\n", strlen ("STATS\n"), &res);
743   pthread_mutex_unlock (&lock);
745   if (status != 0)
746     return (status);
748   if (res->status <= 0)
749   {
750     response_free (res);
751     return (EIO);
752   }
754   head = NULL;
755   tail = NULL;
756   for (i = 0; i < res->lines_num; i++)
757   {
758     char *key;
759     char *value;
760     char *endptr;
761     rrdc_stats_t *s;
763     key = res->lines[i];
764     value = strchr (key, ':');
765     if (value == NULL)
766       continue;
767     *value = 0;
768     value++;
770     while ((value[0] == ' ') || (value[0] == '\t'))
771       value++;
773     s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
774     if (s == NULL)
775       continue;
776     memset (s, 0, sizeof (*s));
778     s->name = strdup (key);
780     endptr = NULL;
781     if ((strcmp ("QueueLength", key) == 0)
782         || (strcmp ("TreeDepth", key) == 0)
783         || (strcmp ("TreeNodesNumber", key) == 0))
784     {
785       s->type = RRDC_STATS_TYPE_GAUGE;
786       s->value.gauge = strtod (value, &endptr);
787     }
788     else if ((strcmp ("DataSetsWritten", key) == 0)
789         || (strcmp ("FlushesReceived", key) == 0)
790         || (strcmp ("JournalBytes", key) == 0)
791         || (strcmp ("JournalRotate", key) == 0)
792         || (strcmp ("UpdatesReceived", key) == 0)
793         || (strcmp ("UpdatesWritten", key) == 0))
794     {
795       s->type = RRDC_STATS_TYPE_COUNTER;
796       s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
797     }
798     else
799     {
800       free (s);
801       continue;
802     }
804     /* Conversion failed */
805     if (endptr == value)
806     {
807       free (s);
808       continue;
809     }
811     if (head == NULL)
812     {
813       head = s;
814       tail = s;
815       s->next = NULL;
816     }
817     else
818     {
819       tail->next = s;
820       tail = s;
821     }
822   } /* for (i = 0; i < res->lines_num; i++) */
824   response_free (res);
826   if (head == NULL)
827     return (EPROTO);
829   *ret_stats = head;
830   return (0);
831 } /* }}} int rrdc_stats_get */
833 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
835   rrdc_stats_t *this;
837   this = ret_stats;
838   while (this != NULL)
839   {
840     rrdc_stats_t *next;
842     next = this->next;
844     if (this->name != NULL)
845     {
846       free ((char *)this->name);
847       this->name = NULL;
848     }
849     free (this);
851     this = next;
852   } /* while (this != NULL) */
853 } /* }}} void rrdc_stats_free */
855 /*
856  * vim: set sw=2 sts=2 ts=8 et fdm=marker :
857  */