e7c227b8564338c1d74ab2f29c8bd033d459944d
1 /**
2 * RRDTool - src/rrd_client.c
3 * Copyright (C) 2008 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to
7 * deal in the Software without restriction, including without limitation the
8 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 * sell copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 * IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at verplant.org>
25 * Sebastian tokkee Harl <sh at tokkee.org>
26 **/
28 #include "rrd.h"
29 #include "rrd_tool.h"
30 #include "rrd_client.h"
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/un.h>
42 #include <netdb.h>
43 #include <limits.h>
45 #ifndef ENODATA
46 #define ENODATA ENOENT
47 #endif
49 struct rrdc_response_s
50 {
51 int status;
52 char *message;
53 char **lines;
54 size_t lines_num;
55 };
56 typedef struct rrdc_response_s rrdc_response_t;
58 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
59 static int sd = -1;
60 static FILE *sh = NULL;
61 static char *sd_path = NULL; /* cache the path for sd */
63 /* get_path: Return a path name appropriate to be sent to the daemon.
64 *
65 * When talking to a local daemon (thru a UNIX socket), relative path names
66 * are resolved to absolute path names to allow for transparent integration
67 * into existing solutions (as requested by Tobi). Else, absolute path names
68 * are not allowed, since path name translation is done by the server.
69 *
70 * One must hold `lock' when calling this function. */
71 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
72 {
73 const char *ret = path;
74 int is_unix = 0;
76 if ((path == NULL) || (resolved_path == NULL) || (sd_path == NULL))
77 return (NULL);
79 if ((*sd_path == '/')
80 || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
81 is_unix = 1;
83 if (is_unix)
84 {
85 ret = realpath(path, resolved_path);
86 if (ret == NULL)
87 rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno));
88 return ret;
89 }
90 else
91 {
92 if (*path == '/') /* not absolute path */
93 {
94 rrd_set_error ("absolute path names not allowed when talking "
95 "to a remote daemon");
96 return NULL;
97 }
98 }
100 return path;
101 } /* }}} char *get_path */
103 /* One must hold `lock' when calling `close_connection'. */
104 static void close_connection (void) /* {{{ */
105 {
106 if (sh != NULL)
107 {
108 fclose (sh);
109 sh = NULL;
110 sd = -1;
111 }
112 else if (sd >= 0)
113 {
114 close (sd);
115 sd = -1;
116 }
118 if (sd_path != NULL)
119 free (sd_path);
120 sd_path = NULL;
121 } /* }}} void close_connection */
123 static int buffer_add_string (const char *str, /* {{{ */
124 char **buffer_ret, size_t *buffer_size_ret)
125 {
126 char *buffer;
127 size_t buffer_size;
128 size_t buffer_pos;
129 size_t i;
130 int status;
132 buffer = *buffer_ret;
133 buffer_size = *buffer_size_ret;
134 buffer_pos = 0;
136 i = 0;
137 status = -1;
138 while (buffer_pos < buffer_size)
139 {
140 if (str[i] == 0)
141 {
142 buffer[buffer_pos] = ' ';
143 buffer_pos++;
144 status = 0;
145 break;
146 }
147 else if ((str[i] == ' ') || (str[i] == '\\'))
148 {
149 if (buffer_pos >= (buffer_size - 1))
150 break;
151 buffer[buffer_pos] = '\\';
152 buffer_pos++;
153 buffer[buffer_pos] = str[i];
154 buffer_pos++;
155 }
156 else
157 {
158 buffer[buffer_pos] = str[i];
159 buffer_pos++;
160 }
161 i++;
162 } /* while (buffer_pos < buffer_size) */
164 if (status != 0)
165 return (-1);
167 *buffer_ret = buffer + buffer_pos;
168 *buffer_size_ret = buffer_size - buffer_pos;
170 return (0);
171 } /* }}} int buffer_add_string */
173 static int buffer_add_value (const char *value, /* {{{ */
174 char **buffer_ret, size_t *buffer_size_ret)
175 {
176 char temp[4096];
178 if (strncmp (value, "N:", 2) == 0)
179 snprintf (temp, sizeof (temp), "%lu:%s",
180 (unsigned long) time (NULL), value + 2);
181 else
182 strncpy (temp, value, sizeof (temp));
183 temp[sizeof (temp) - 1] = 0;
185 return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
186 } /* }}} int buffer_add_value */
188 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
189 * the Perl function `chomp'. Returns the number of characters that have been
190 * removed. */
191 static int chomp (char *str) /* {{{ */
192 {
193 size_t len;
194 int removed;
196 if (str == NULL)
197 return (-1);
199 len = strlen (str);
200 removed = 0;
201 while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
202 {
203 str[len - 1] = 0;
204 len--;
205 removed++;
206 }
208 return (removed);
209 } /* }}} int chomp */
211 static void response_free (rrdc_response_t *res) /* {{{ */
212 {
213 if (res == NULL)
214 return;
216 if (res->lines != NULL)
217 {
218 size_t i;
220 for (i = 0; i < res->lines_num; i++)
221 if (res->lines[i] != NULL)
222 free (res->lines[i]);
223 free (res->lines);
224 }
226 free (res);
227 } /* }}} void response_free */
229 static int response_read (rrdc_response_t **ret_response) /* {{{ */
230 {
231 rrdc_response_t *ret = NULL;
232 int status = 0;
234 char buffer[4096];
235 char *buffer_ptr;
237 size_t i;
239 #define DIE(code) do { status = code; goto err_out; } while(0)
241 if (sh == NULL)
242 DIE(-1);
244 ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
245 if (ret == NULL)
246 DIE(-2);
247 memset (ret, 0, sizeof (*ret));
248 ret->lines = NULL;
249 ret->lines_num = 0;
251 buffer_ptr = fgets (buffer, sizeof (buffer), sh);
252 if (buffer_ptr == NULL)
253 DIE(-3);
255 chomp (buffer);
257 ret->status = strtol (buffer, &ret->message, 0);
258 if (buffer == ret->message)
259 DIE(-4);
261 /* Skip leading whitespace of the status message */
262 ret->message += strspn (ret->message, " \t");
264 if (ret->status <= 0)
265 {
266 if (ret->status < 0)
267 rrd_set_error("rrdcached: %s", ret->message);
268 goto out;
269 }
271 ret->lines = (char **) malloc (sizeof (char *) * ret->status);
272 if (ret->lines == NULL)
273 DIE(-5);
275 memset (ret->lines, 0, sizeof (char *) * ret->status);
276 ret->lines_num = (size_t) ret->status;
278 for (i = 0; i < ret->lines_num; i++)
279 {
280 buffer_ptr = fgets (buffer, sizeof (buffer), sh);
281 if (buffer_ptr == NULL)
282 DIE(-6);
284 chomp (buffer);
286 ret->lines[i] = strdup (buffer);
287 if (ret->lines[i] == NULL)
288 DIE(-7);
289 }
291 out:
292 *ret_response = ret;
293 fflush(sh);
294 return (status);
296 err_out:
297 response_free(ret);
298 close_connection();
299 return (status);
301 #undef DIE
303 } /* }}} rrdc_response_t *response_read */
305 static int request (const char *buffer, size_t buffer_size, /* {{{ */
306 rrdc_response_t **ret_response)
307 {
308 int status;
309 rrdc_response_t *res;
311 if (sh == NULL)
312 return (ENOTCONN);
314 status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
315 if (status != 1)
316 {
317 close_connection ();
318 rrd_set_error("request: socket error (%d) while talking to rrdcached",
319 status);
320 return (-1);
321 }
322 fflush (sh);
324 res = NULL;
325 status = response_read (&res);
327 if (status != 0)
328 {
329 if (status < 0)
330 rrd_set_error("request: internal error while talking to rrdcached");
331 return (status);
332 }
334 *ret_response = res;
335 return (0);
336 } /* }}} int request */
338 /* determine whether we are connected to the specified daemon_addr if
339 * NULL, return whether we are connected at all
340 */
341 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
342 {
343 if (sd < 0)
344 return 0;
345 else if (daemon_addr == NULL)
346 {
347 /* here we have to handle the case i.e.
348 * UPDATE --daemon ...; UPDATEV (no --daemon) ...
349 * In other words: we have a cached connection,
350 * but it is not specified in the current command.
351 * Daemon is only implied in this case if set in ENV
352 */
353 char *addr = getenv(ENV_RRDCACHED_ADDRESS);
354 if (addr != NULL && strcmp(addr,"") != 0)
355 return 1;
356 else
357 return 0;
358 }
359 else if (strcmp(daemon_addr, sd_path) == 0)
360 return 1;
361 else
362 return 0;
364 } /* }}} int rrdc_is_connected */
366 static int rrdc_connect_unix (const char *path) /* {{{ */
367 {
368 struct sockaddr_un sa;
369 int status;
371 assert (path != NULL);
372 assert (sd == -1);
374 sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
375 if (sd < 0)
376 {
377 status = errno;
378 return (status);
379 }
381 memset (&sa, 0, sizeof (sa));
382 sa.sun_family = AF_UNIX;
383 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
385 status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
386 if (status != 0)
387 {
388 status = errno;
389 close_connection ();
390 return (status);
391 }
393 sh = fdopen (sd, "r+");
394 if (sh == NULL)
395 {
396 status = errno;
397 close_connection ();
398 return (status);
399 }
401 return (0);
402 } /* }}} int rrdc_connect_unix */
404 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
405 {
406 struct addrinfo ai_hints;
407 struct addrinfo *ai_res;
408 struct addrinfo *ai_ptr;
409 char addr_copy[NI_MAXHOST];
410 char *addr;
411 char *port;
413 assert (addr_orig != NULL);
414 assert (sd == -1);
416 strncpy(addr_copy, addr_orig, sizeof(addr_copy));
417 addr_copy[sizeof(addr_copy) - 1] = '\0';
418 addr = addr_copy;
420 int status;
421 memset (&ai_hints, 0, sizeof (ai_hints));
422 ai_hints.ai_flags = 0;
423 #ifdef AI_ADDRCONFIG
424 ai_hints.ai_flags |= AI_ADDRCONFIG;
425 #endif
426 ai_hints.ai_family = AF_UNSPEC;
427 ai_hints.ai_socktype = SOCK_STREAM;
429 port = NULL;
430 if (*addr == '[') /* IPv6+port format */
431 {
432 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
433 addr++;
435 port = strchr (addr, ']');
436 if (port == NULL)
437 {
438 rrd_set_error("malformed address: %s", addr_orig);
439 return (-1);
440 }
441 *port = 0;
442 port++;
444 if (*port == ':')
445 port++;
446 else if (*port == 0)
447 port = NULL;
448 else
449 {
450 rrd_set_error("garbage after address: %s", port);
451 return (-1);
452 }
453 } /* if (*addr == '[') */
454 else
455 {
456 port = rindex(addr, ':');
457 if (port != NULL)
458 {
459 *port = 0;
460 port++;
461 }
462 }
464 ai_res = NULL;
465 status = getaddrinfo (addr,
466 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
467 &ai_hints, &ai_res);
468 if (status != 0)
469 {
470 rrd_set_error ("failed to resolve address `%s' (port %s): %s",
471 addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
472 gai_strerror (status));
473 return (-1);
474 }
476 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
477 {
478 sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
479 if (sd < 0)
480 {
481 status = errno;
482 sd = -1;
483 continue;
484 }
486 status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
487 if (status != 0)
488 {
489 status = errno;
490 close_connection();
491 continue;
492 }
494 sh = fdopen (sd, "r+");
495 if (sh == NULL)
496 {
497 status = errno;
498 close_connection ();
499 continue;
500 }
502 assert (status == 0);
503 break;
504 } /* for (ai_ptr) */
506 freeaddrinfo(ai_res);
508 return (status);
509 } /* }}} int rrdc_connect_network */
511 int rrdc_connect (const char *addr) /* {{{ */
512 {
513 int status = 0;
515 if (addr == NULL) {
516 addr = getenv (ENV_RRDCACHED_ADDRESS);
517 }
519 if (addr == NULL || strcmp(addr,"") == 0 ) {
520 addr = NULL;
521 return 0;
522 }
524 pthread_mutex_lock(&lock);
526 if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
527 {
528 /* connection to the same daemon; use cached connection */
529 pthread_mutex_unlock (&lock);
530 return (0);
531 }
532 else
533 {
534 close_connection();
535 }
537 rrd_clear_error ();
538 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
539 status = rrdc_connect_unix (addr + strlen ("unix:"));
540 else if (addr[0] == '/')
541 status = rrdc_connect_unix (addr);
542 else
543 status = rrdc_connect_network(addr);
545 if (status == 0 && sd >= 0)
546 sd_path = strdup(addr);
547 else
548 {
549 char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
550 /* err points the string that gets written to by rrd_set_error(), thus we
551 * cannot pass it to that function */
552 err = strdup (err);
553 rrd_set_error("Unable to connect to rrdcached: %s",
554 (status < 0)
555 ? (err ? err : "Internal error")
556 : rrd_strerror (status));
557 if (err != NULL)
558 free (err);
559 }
561 pthread_mutex_unlock (&lock);
562 return (status);
563 } /* }}} int rrdc_connect */
565 int rrdc_disconnect (void) /* {{{ */
566 {
567 pthread_mutex_lock (&lock);
569 close_connection();
571 pthread_mutex_unlock (&lock);
573 return (0);
574 } /* }}} int rrdc_disconnect */
576 int rrdc_update (const char *filename, int values_num, /* {{{ */
577 const char * const *values)
578 {
579 char buffer[4096];
580 char *buffer_ptr;
581 size_t buffer_free;
582 size_t buffer_size;
583 rrdc_response_t *res;
584 int status;
585 int i;
586 char file_path[PATH_MAX];
588 memset (buffer, 0, sizeof (buffer));
589 buffer_ptr = &buffer[0];
590 buffer_free = sizeof (buffer);
592 status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
593 if (status != 0)
594 return (ENOBUFS);
596 pthread_mutex_lock (&lock);
597 filename = get_path (filename, file_path);
598 if (filename == NULL)
599 {
600 pthread_mutex_unlock (&lock);
601 return (-1);
602 }
604 status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
605 if (status != 0)
606 {
607 pthread_mutex_unlock (&lock);
608 return (ENOBUFS);
609 }
611 for (i = 0; i < values_num; i++)
612 {
613 status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
614 if (status != 0)
615 {
616 pthread_mutex_unlock (&lock);
617 return (ENOBUFS);
618 }
619 }
621 assert (buffer_free < sizeof (buffer));
622 buffer_size = sizeof (buffer) - buffer_free;
623 assert (buffer[buffer_size - 1] == ' ');
624 buffer[buffer_size - 1] = '\n';
626 res = NULL;
627 status = request (buffer, buffer_size, &res);
628 pthread_mutex_unlock (&lock);
630 if (status != 0)
631 return (status);
633 status = res->status;
634 response_free (res);
636 return (status);
637 } /* }}} int rrdc_update */
639 int rrdc_flush (const char *filename) /* {{{ */
640 {
641 char buffer[4096];
642 char *buffer_ptr;
643 size_t buffer_free;
644 size_t buffer_size;
645 rrdc_response_t *res;
646 int status;
647 char file_path[PATH_MAX];
649 if (filename == NULL)
650 return (-1);
652 memset (buffer, 0, sizeof (buffer));
653 buffer_ptr = &buffer[0];
654 buffer_free = sizeof (buffer);
656 status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
657 if (status != 0)
658 return (ENOBUFS);
660 pthread_mutex_lock (&lock);
661 filename = get_path (filename, file_path);
662 if (filename == NULL)
663 {
664 pthread_mutex_unlock (&lock);
665 return (-1);
666 }
668 status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
669 if (status != 0)
670 {
671 pthread_mutex_unlock (&lock);
672 return (ENOBUFS);
673 }
675 assert (buffer_free < sizeof (buffer));
676 buffer_size = sizeof (buffer) - buffer_free;
677 assert (buffer[buffer_size - 1] == ' ');
678 buffer[buffer_size - 1] = '\n';
680 res = NULL;
681 status = request (buffer, buffer_size, &res);
682 pthread_mutex_unlock (&lock);
684 if (status != 0)
685 return (status);
687 status = res->status;
688 response_free (res);
690 return (status);
691 } /* }}} int rrdc_flush */
694 /* convenience function; if there is a daemon specified, or if we can
695 * detect one from the environment, then flush the file. Otherwise, no-op
696 */
697 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
698 {
699 int status = 0;
701 rrdc_connect(opt_daemon);
703 if (rrdc_is_connected(opt_daemon))
704 {
705 rrd_clear_error();
706 status = rrdc_flush (filename);
708 if (status != 0 && !rrd_test_error())
709 {
710 if (status > 0)
711 {
712 rrd_set_error("rrdc_flush (%s) failed: %s",
713 filename, rrd_strerror(status));
714 }
715 else if (status < 0)
716 {
717 rrd_set_error("rrdc_flush (%s) failed with status %i.",
718 filename, status);
719 }
720 }
721 } /* if (rrdc_is_connected(..)) */
723 return status;
724 } /* }}} int rrdc_flush_if_daemon */
727 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
728 {
729 rrdc_stats_t *head;
730 rrdc_stats_t *tail;
732 rrdc_response_t *res;
734 int status;
735 size_t i;
737 /* Protocol example: {{{
738 * -> STATS
739 * <- 5 Statistics follow
740 * <- QueueLength: 0
741 * <- UpdatesWritten: 0
742 * <- DataSetsWritten: 0
743 * <- TreeNodesNumber: 0
744 * <- TreeDepth: 0
745 * }}} */
747 res = NULL;
748 pthread_mutex_lock (&lock);
749 status = request ("STATS\n", strlen ("STATS\n"), &res);
750 pthread_mutex_unlock (&lock);
752 if (status != 0)
753 return (status);
755 if (res->status <= 0)
756 {
757 response_free (res);
758 return (EIO);
759 }
761 head = NULL;
762 tail = NULL;
763 for (i = 0; i < res->lines_num; i++)
764 {
765 char *key;
766 char *value;
767 char *endptr;
768 rrdc_stats_t *s;
770 key = res->lines[i];
771 value = strchr (key, ':');
772 if (value == NULL)
773 continue;
774 *value = 0;
775 value++;
777 while ((value[0] == ' ') || (value[0] == '\t'))
778 value++;
780 s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
781 if (s == NULL)
782 continue;
783 memset (s, 0, sizeof (*s));
785 s->name = strdup (key);
787 endptr = NULL;
788 if ((strcmp ("QueueLength", key) == 0)
789 || (strcmp ("TreeDepth", key) == 0)
790 || (strcmp ("TreeNodesNumber", key) == 0))
791 {
792 s->type = RRDC_STATS_TYPE_GAUGE;
793 s->value.gauge = strtod (value, &endptr);
794 }
795 else if ((strcmp ("DataSetsWritten", key) == 0)
796 || (strcmp ("FlushesReceived", key) == 0)
797 || (strcmp ("JournalBytes", key) == 0)
798 || (strcmp ("JournalRotate", key) == 0)
799 || (strcmp ("UpdatesReceived", key) == 0)
800 || (strcmp ("UpdatesWritten", key) == 0))
801 {
802 s->type = RRDC_STATS_TYPE_COUNTER;
803 s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
804 }
805 else
806 {
807 free (s);
808 continue;
809 }
811 /* Conversion failed */
812 if (endptr == value)
813 {
814 free (s);
815 continue;
816 }
818 if (head == NULL)
819 {
820 head = s;
821 tail = s;
822 s->next = NULL;
823 }
824 else
825 {
826 tail->next = s;
827 tail = s;
828 }
829 } /* for (i = 0; i < res->lines_num; i++) */
831 response_free (res);
833 if (head == NULL)
834 return (EPROTO);
836 *ret_stats = head;
837 return (0);
838 } /* }}} int rrdc_stats_get */
840 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
841 {
842 rrdc_stats_t *this;
844 this = ret_stats;
845 while (this != NULL)
846 {
847 rrdc_stats_t *next;
849 next = this->next;
851 if (this->name != NULL)
852 {
853 free ((char *)this->name);
854 this->name = NULL;
855 }
856 free (this);
858 this = next;
859 } /* while (this != NULL) */
860 } /* }}} void rrdc_stats_free */
862 /*
863 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
864 */