18dd0dde44be11f85277a9310353b65f3fe4b8e4
1 /**
2 * RRDTool - src/rrd_client.c
3 * Copyright (C) 2008 Florian octo Forster
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a copy
6 * of this software and associated documentation files (the "Software"), to
7 * deal in the Software without restriction, including without limitation the
8 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 * sell copies of the Software, and to permit persons to whom the Software is
10 * furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be included in
13 * all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 * IN THE SOFTWARE.
22 *
23 * Authors:
24 * Florian octo Forster <octo at verplant.org>
25 * Sebastian tokkee Harl <sh at tokkee.org>
26 **/
28 #include "rrd.h"
29 #include "rrd_tool.h"
30 #include "rrd_client.h"
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <strings.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <pthread.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/un.h>
42 #include <netdb.h>
43 #include <limits.h>
45 #ifndef ENODATA
46 #define ENODATA ENOENT
47 #endif
49 struct rrdc_response_s
50 {
51 int status;
52 char *message;
53 char **lines;
54 size_t lines_num;
55 };
56 typedef struct rrdc_response_s rrdc_response_t;
58 static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
59 static int sd = -1;
60 static FILE *sh = NULL;
61 static char *sd_path = NULL; /* cache the path for sd */
63 /* get_path: Return a path name appropriate to be sent to the daemon.
64 *
65 * When talking to a local daemon (thru a UNIX socket), relative path names
66 * are resolved to absolute path names to allow for transparent integration
67 * into existing solutions (as requested by Tobi). Else, absolute path names
68 * are not allowed, since path name translation is done by the server.
69 *
70 * One must hold `lock' when calling this function. */
71 static const char *get_path (const char *path, char *resolved_path) /* {{{ */
72 {
73 const char *ret = path;
74 int is_unix = 0;
76 if ((*sd_path == '/')
77 || (strncmp ("unix:", sd_path, strlen ("unix:")) == 0))
78 is_unix = 1;
80 if (is_unix)
81 {
82 ret = realpath(path, resolved_path);
83 if (ret == NULL)
84 rrd_set_error("realpath(%s): %s", path, rrd_strerror(errno));
85 return ret;
86 }
87 else
88 {
89 if (*path == '/') /* not absolute path */
90 {
91 rrd_set_error ("absolute path names not allowed when talking "
92 "to a remote daemon");
93 return NULL;
94 }
95 }
97 return path;
98 } /* }}} char *get_path */
100 /* One must hold `lock' when calling `close_connection'. */
101 static void close_connection (void) /* {{{ */
102 {
103 if (sh != NULL)
104 {
105 fclose (sh);
106 sh = NULL;
107 sd = -1;
108 }
109 else if (sd >= 0)
110 {
111 close (sd);
112 sd = -1;
113 }
115 if (sd_path != NULL)
116 free (sd_path);
117 sd_path = NULL;
118 } /* }}} void close_connection */
120 static int buffer_add_string (const char *str, /* {{{ */
121 char **buffer_ret, size_t *buffer_size_ret)
122 {
123 char *buffer;
124 size_t buffer_size;
125 size_t buffer_pos;
126 size_t i;
127 int status;
129 buffer = *buffer_ret;
130 buffer_size = *buffer_size_ret;
131 buffer_pos = 0;
133 i = 0;
134 status = -1;
135 while (buffer_pos < buffer_size)
136 {
137 if (str[i] == 0)
138 {
139 buffer[buffer_pos] = ' ';
140 buffer_pos++;
141 status = 0;
142 break;
143 }
144 else if ((str[i] == ' ') || (str[i] == '\\'))
145 {
146 if (buffer_pos >= (buffer_size - 1))
147 break;
148 buffer[buffer_pos] = '\\';
149 buffer_pos++;
150 buffer[buffer_pos] = str[i];
151 buffer_pos++;
152 }
153 else
154 {
155 buffer[buffer_pos] = str[i];
156 buffer_pos++;
157 }
158 i++;
159 } /* while (buffer_pos < buffer_size) */
161 if (status != 0)
162 return (-1);
164 *buffer_ret = buffer + buffer_pos;
165 *buffer_size_ret = buffer_size - buffer_pos;
167 return (0);
168 } /* }}} int buffer_add_string */
170 static int buffer_add_value (const char *value, /* {{{ */
171 char **buffer_ret, size_t *buffer_size_ret)
172 {
173 char temp[4096];
175 if (strncmp (value, "N:", 2) == 0)
176 snprintf (temp, sizeof (temp), "%lu:%s",
177 (unsigned long) time (NULL), value + 2);
178 else
179 strncpy (temp, value, sizeof (temp));
180 temp[sizeof (temp) - 1] = 0;
182 return (buffer_add_string (temp, buffer_ret, buffer_size_ret));
183 } /* }}} int buffer_add_value */
185 /* Remove trailing newline (NL) and carriage return (CR) characters. Similar to
186 * the Perl function `chomp'. Returns the number of characters that have been
187 * removed. */
188 static int chomp (char *str) /* {{{ */
189 {
190 size_t len;
191 int removed;
193 if (str == NULL)
194 return (-1);
196 len = strlen (str);
197 removed = 0;
198 while ((len > 0) && ((str[len - 1] == '\n') || (str[len - 1] == '\r')))
199 {
200 str[len - 1] = 0;
201 len--;
202 removed++;
203 }
205 return (removed);
206 } /* }}} int chomp */
208 static void response_free (rrdc_response_t *res) /* {{{ */
209 {
210 if (res == NULL)
211 return;
213 if (res->lines != NULL)
214 {
215 size_t i;
217 for (i = 0; i < res->lines_num; i++)
218 if (res->lines[i] != NULL)
219 free (res->lines[i]);
220 free (res->lines);
221 }
223 free (res);
224 } /* }}} void response_free */
226 static int response_read (rrdc_response_t **ret_response) /* {{{ */
227 {
228 rrdc_response_t *ret;
230 char buffer[4096];
231 char *buffer_ptr;
233 size_t i;
235 if (sh == NULL)
236 return (-1);
238 ret = (rrdc_response_t *) malloc (sizeof (rrdc_response_t));
239 if (ret == NULL)
240 return (-2);
241 memset (ret, 0, sizeof (*ret));
242 ret->lines = NULL;
243 ret->lines_num = 0;
245 buffer_ptr = fgets (buffer, sizeof (buffer), sh);
246 if (buffer_ptr == NULL) {
247 close_connection();
248 return (-3);
249 }
250 chomp (buffer);
252 ret->status = strtol (buffer, &ret->message, 0);
253 if (buffer == ret->message)
254 {
255 response_free (ret);
256 close_connection();
257 return (-4);
258 }
259 /* Skip leading whitespace of the status message */
260 ret->message += strspn (ret->message, " \t");
262 if (ret->status <= 0)
263 {
264 if (ret->status < 0)
265 rrd_set_error("rrdcached: %s", ret->message);
266 *ret_response = ret;
267 return (0);
268 }
270 ret->lines = (char **) malloc (sizeof (char *) * ret->status);
271 if (ret->lines == NULL)
272 {
273 response_free (ret);
274 close_connection();
275 return (-5);
276 }
277 memset (ret->lines, 0, sizeof (char *) * ret->status);
278 ret->lines_num = (size_t) ret->status;
280 for (i = 0; i < ret->lines_num; i++)
281 {
282 buffer_ptr = fgets (buffer, sizeof (buffer), sh);
283 if (buffer_ptr == NULL)
284 {
285 response_free (ret);
286 close_connection();
287 return (-6);
288 }
289 chomp (buffer);
291 ret->lines[i] = strdup (buffer);
292 if (ret->lines[i] == NULL)
293 {
294 response_free (ret);
295 close_connection();
296 return (-7);
297 }
298 }
300 *ret_response = ret;
301 return (0);
302 } /* }}} rrdc_response_t *response_read */
304 static int request (const char *buffer, size_t buffer_size, /* {{{ */
305 rrdc_response_t **ret_response)
306 {
307 int status;
308 rrdc_response_t *res;
310 if (sh == NULL)
311 return (ENOTCONN);
313 status = (int) fwrite (buffer, buffer_size, /* nmemb = */ 1, sh);
314 if (status != 1)
315 {
316 close_connection ();
317 rrd_set_error("request: socket error (%d) while talking to rrdcached",
318 status);
319 return (-1);
320 }
321 fflush (sh);
323 res = NULL;
324 status = response_read (&res);
326 if (status != 0)
327 {
328 if (status < 0)
329 rrd_set_error("request: internal error while talking to rrdcached");
330 return (status);
331 }
333 *ret_response = res;
334 return (0);
335 } /* }}} int request */
337 /* determine whether we are connected to the specified daemon_addr if
338 * NULL, return whether we are connected at all
339 */
340 int rrdc_is_connected(const char *daemon_addr) /* {{{ */
341 {
342 if (sd < 0)
343 return 0;
344 else if (daemon_addr == NULL)
345 {
346 /* here we have to handle the case i.e.
347 * UPDATE --daemon ...; UPDATEV (no --daemon) ...
348 * In other words: we have a cached connection,
349 * but it is not specified in the current command.
350 * Daemon is only implied in this case if set in ENV
351 */
352 if (getenv(ENV_RRDCACHED_ADDRESS) != NULL)
353 return 1;
354 else
355 return 0;
356 }
357 else if (strcmp(daemon_addr, sd_path) == 0)
358 return 1;
359 else
360 return 0;
362 } /* }}} int rrdc_is_connected */
364 static int rrdc_connect_unix (const char *path) /* {{{ */
365 {
366 struct sockaddr_un sa;
367 int status;
369 assert (path != NULL);
370 assert (sd == -1);
372 sd = socket (PF_UNIX, SOCK_STREAM, /* protocol = */ 0);
373 if (sd < 0)
374 {
375 status = errno;
376 return (status);
377 }
379 memset (&sa, 0, sizeof (sa));
380 sa.sun_family = AF_UNIX;
381 strncpy (sa.sun_path, path, sizeof (sa.sun_path) - 1);
383 status = connect (sd, (struct sockaddr *) &sa, sizeof (sa));
384 if (status != 0)
385 {
386 status = errno;
387 close_connection ();
388 return (status);
389 }
391 sh = fdopen (sd, "r+");
392 if (sh == NULL)
393 {
394 status = errno;
395 close_connection ();
396 return (status);
397 }
399 return (0);
400 } /* }}} int rrdc_connect_unix */
402 static int rrdc_connect_network (const char *addr_orig) /* {{{ */
403 {
404 struct addrinfo ai_hints;
405 struct addrinfo *ai_res;
406 struct addrinfo *ai_ptr;
407 char addr_copy[NI_MAXHOST];
408 char *addr;
409 char *port;
411 assert (addr_orig != NULL);
412 assert (sd == -1);
414 strncpy(addr_copy, addr_orig, sizeof(addr_copy));
415 addr_copy[sizeof(addr_copy) - 1] = '\0';
416 addr = addr_copy;
418 int status;
419 memset (&ai_hints, 0, sizeof (ai_hints));
420 ai_hints.ai_flags = 0;
421 #ifdef AI_ADDRCONFIG
422 ai_hints.ai_flags |= AI_ADDRCONFIG;
423 #endif
424 ai_hints.ai_family = AF_UNSPEC;
425 ai_hints.ai_socktype = SOCK_STREAM;
427 port = NULL;
428 if (*addr == '[') /* IPv6+port format */
429 {
430 /* `addr' is something like "[2001:780:104:2:211:24ff:feab:26f8]:12345" */
431 addr++;
433 port = strchr (addr, ']');
434 if (port == NULL)
435 {
436 rrd_set_error("malformed address: %s", addr_orig);
437 return (-1);
438 }
439 *port = 0;
440 port++;
442 if (*port == ':')
443 port++;
444 else if (*port == 0)
445 port = NULL;
446 else
447 {
448 rrd_set_error("garbage after address: %s", port);
449 return (-1);
450 }
451 } /* if (*addr == '[') */
452 else
453 {
454 port = rindex(addr, ':');
455 if (port != NULL)
456 {
457 *port = 0;
458 port++;
459 }
460 }
462 ai_res = NULL;
463 status = getaddrinfo (addr,
464 port == NULL ? RRDCACHED_DEFAULT_PORT : port,
465 &ai_hints, &ai_res);
466 if (status != 0)
467 {
468 rrd_set_error ("failed to resolve address `%s' (port %s): %s",
469 addr, port == NULL ? RRDCACHED_DEFAULT_PORT : port,
470 gai_strerror (status));
471 return (-1);
472 }
474 for (ai_ptr = ai_res; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next)
475 {
476 sd = socket (ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol);
477 if (sd < 0)
478 {
479 status = errno;
480 sd = -1;
481 continue;
482 }
484 status = connect (sd, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
485 if (status != 0)
486 {
487 status = errno;
488 close_connection();
489 continue;
490 }
492 sh = fdopen (sd, "r+");
493 if (sh == NULL)
494 {
495 status = errno;
496 close_connection ();
497 continue;
498 }
500 assert (status == 0);
501 break;
502 } /* for (ai_ptr) */
504 return (status);
505 } /* }}} int rrdc_connect_network */
507 int rrdc_connect (const char *addr) /* {{{ */
508 {
509 int status = 0;
511 if (addr == NULL)
512 addr = getenv (ENV_RRDCACHED_ADDRESS);
514 if (addr == NULL)
515 return 0;
517 pthread_mutex_lock(&lock);
519 if (sd >= 0 && sd_path != NULL && strcmp(addr, sd_path) == 0)
520 {
521 /* connection to the same daemon; use cached connection */
522 pthread_mutex_unlock (&lock);
523 return (0);
524 }
525 else
526 {
527 close_connection();
528 }
530 rrd_clear_error ();
531 if (strncmp ("unix:", addr, strlen ("unix:")) == 0)
532 status = rrdc_connect_unix (addr + strlen ("unix:"));
533 else if (addr[0] == '/')
534 status = rrdc_connect_unix (addr);
535 else
536 status = rrdc_connect_network(addr);
538 if (status == 0 && sd >= 0)
539 sd_path = strdup(addr);
540 else
541 {
542 char *err = rrd_test_error () ? rrd_get_error () : "Internal error";
543 /* err points the string that gets written to by rrd_set_error(), thus we
544 * cannot pass it to that function */
545 err = strdup (err);
546 rrd_set_error("Unable to connect to rrdcached: %s",
547 (status < 0)
548 ? (err ? err : "Internal error")
549 : rrd_strerror (status));
550 if (err != NULL)
551 free (err);
552 }
554 pthread_mutex_unlock (&lock);
555 return (status);
556 } /* }}} int rrdc_connect */
558 int rrdc_disconnect (void) /* {{{ */
559 {
560 pthread_mutex_lock (&lock);
562 close_connection();
564 pthread_mutex_unlock (&lock);
566 return (0);
567 } /* }}} int rrdc_disconnect */
569 int rrdc_update (const char *filename, int values_num, /* {{{ */
570 const char * const *values)
571 {
572 char buffer[4096];
573 char *buffer_ptr;
574 size_t buffer_free;
575 size_t buffer_size;
576 rrdc_response_t *res;
577 int status;
578 int i;
579 char file_path[PATH_MAX];
581 memset (buffer, 0, sizeof (buffer));
582 buffer_ptr = &buffer[0];
583 buffer_free = sizeof (buffer);
585 status = buffer_add_string ("update", &buffer_ptr, &buffer_free);
586 if (status != 0)
587 return (ENOBUFS);
589 pthread_mutex_lock (&lock);
590 filename = get_path (filename, file_path);
591 if (filename == NULL)
592 {
593 pthread_mutex_unlock (&lock);
594 return (-1);
595 }
597 status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
598 if (status != 0)
599 {
600 pthread_mutex_unlock (&lock);
601 return (ENOBUFS);
602 }
604 for (i = 0; i < values_num; i++)
605 {
606 status = buffer_add_value (values[i], &buffer_ptr, &buffer_free);
607 if (status != 0)
608 {
609 pthread_mutex_unlock (&lock);
610 return (ENOBUFS);
611 }
612 }
614 assert (buffer_free < sizeof (buffer));
615 buffer_size = sizeof (buffer) - buffer_free;
616 assert (buffer[buffer_size - 1] == ' ');
617 buffer[buffer_size - 1] = '\n';
619 res = NULL;
620 status = request (buffer, buffer_size, &res);
621 pthread_mutex_unlock (&lock);
623 if (status != 0)
624 return (status);
626 status = res->status;
627 response_free (res);
629 return (status);
630 } /* }}} int rrdc_update */
632 int rrdc_flush (const char *filename) /* {{{ */
633 {
634 char buffer[4096];
635 char *buffer_ptr;
636 size_t buffer_free;
637 size_t buffer_size;
638 rrdc_response_t *res;
639 int status;
640 char file_path[PATH_MAX];
642 if (filename == NULL)
643 return (-1);
645 memset (buffer, 0, sizeof (buffer));
646 buffer_ptr = &buffer[0];
647 buffer_free = sizeof (buffer);
649 status = buffer_add_string ("flush", &buffer_ptr, &buffer_free);
650 if (status != 0)
651 return (ENOBUFS);
653 pthread_mutex_lock (&lock);
654 filename = get_path (filename, file_path);
655 if (filename == NULL)
656 {
657 pthread_mutex_unlock (&lock);
658 return (-1);
659 }
661 status = buffer_add_string (filename, &buffer_ptr, &buffer_free);
662 if (status != 0)
663 {
664 pthread_mutex_unlock (&lock);
665 return (ENOBUFS);
666 }
668 assert (buffer_free < sizeof (buffer));
669 buffer_size = sizeof (buffer) - buffer_free;
670 assert (buffer[buffer_size - 1] == ' ');
671 buffer[buffer_size - 1] = '\n';
673 res = NULL;
674 status = request (buffer, buffer_size, &res);
675 pthread_mutex_unlock (&lock);
677 if (status != 0)
678 return (status);
680 status = res->status;
681 response_free (res);
683 return (status);
684 } /* }}} int rrdc_flush */
687 /* convenience function; if there is a daemon specified, or if we can
688 * detect one from the environment, then flush the file. Otherwise, no-op
689 */
690 int rrdc_flush_if_daemon (const char *opt_daemon, const char *filename) /* {{{ */
691 {
692 int status = 0;
694 rrdc_connect(opt_daemon);
696 if (rrdc_is_connected(opt_daemon))
697 {
698 rrd_clear_error();
699 status = rrdc_flush (filename);
701 if (status != 0 && !rrd_test_error())
702 {
703 if (status > 0)
704 {
705 rrd_set_error("rrdc_flush (%s) failed: %s",
706 filename, rrd_strerror(status));
707 }
708 else if (status < 0)
709 {
710 rrd_set_error("rrdc_flush (%s) failed with status %i.",
711 filename, status);
712 }
713 }
714 } /* if (rrdc_is_connected(..)) */
716 return status;
717 } /* }}} int rrdc_flush_if_daemon */
720 int rrdc_stats_get (rrdc_stats_t **ret_stats) /* {{{ */
721 {
722 rrdc_stats_t *head;
723 rrdc_stats_t *tail;
725 rrdc_response_t *res;
727 int status;
728 size_t i;
730 /* Protocol example: {{{
731 * -> STATS
732 * <- 5 Statistics follow
733 * <- QueueLength: 0
734 * <- UpdatesWritten: 0
735 * <- DataSetsWritten: 0
736 * <- TreeNodesNumber: 0
737 * <- TreeDepth: 0
738 * }}} */
740 res = NULL;
741 pthread_mutex_lock (&lock);
742 status = request ("STATS\n", strlen ("STATS\n"), &res);
743 pthread_mutex_unlock (&lock);
745 if (status != 0)
746 return (status);
748 if (res->status <= 0)
749 {
750 response_free (res);
751 return (EIO);
752 }
754 head = NULL;
755 tail = NULL;
756 for (i = 0; i < res->lines_num; i++)
757 {
758 char *key;
759 char *value;
760 char *endptr;
761 rrdc_stats_t *s;
763 key = res->lines[i];
764 value = strchr (key, ':');
765 if (value == NULL)
766 continue;
767 *value = 0;
768 value++;
770 while ((value[0] == ' ') || (value[0] == '\t'))
771 value++;
773 s = (rrdc_stats_t *) malloc (sizeof (rrdc_stats_t));
774 if (s == NULL)
775 continue;
776 memset (s, 0, sizeof (*s));
778 s->name = strdup (key);
780 endptr = NULL;
781 if ((strcmp ("QueueLength", key) == 0)
782 || (strcmp ("TreeDepth", key) == 0)
783 || (strcmp ("TreeNodesNumber", key) == 0))
784 {
785 s->type = RRDC_STATS_TYPE_GAUGE;
786 s->value.gauge = strtod (value, &endptr);
787 }
788 else if ((strcmp ("DataSetsWritten", key) == 0)
789 || (strcmp ("FlushesReceived", key) == 0)
790 || (strcmp ("JournalBytes", key) == 0)
791 || (strcmp ("JournalRotate", key) == 0)
792 || (strcmp ("UpdatesReceived", key) == 0)
793 || (strcmp ("UpdatesWritten", key) == 0))
794 {
795 s->type = RRDC_STATS_TYPE_COUNTER;
796 s->value.counter = (uint64_t) strtoll (value, &endptr, /* base = */ 0);
797 }
798 else
799 {
800 free (s);
801 continue;
802 }
804 /* Conversion failed */
805 if (endptr == value)
806 {
807 free (s);
808 continue;
809 }
811 if (head == NULL)
812 {
813 head = s;
814 tail = s;
815 s->next = NULL;
816 }
817 else
818 {
819 tail->next = s;
820 tail = s;
821 }
822 } /* for (i = 0; i < res->lines_num; i++) */
824 response_free (res);
826 if (head == NULL)
827 return (EPROTO);
829 *ret_stats = head;
830 return (0);
831 } /* }}} int rrdc_stats_get */
833 void rrdc_stats_free (rrdc_stats_t *ret_stats) /* {{{ */
834 {
835 rrdc_stats_t *this;
837 this = ret_stats;
838 while (this != NULL)
839 {
840 rrdc_stats_t *next;
842 next = this->next;
844 if (this->name != NULL)
845 {
846 free ((char *)this->name);
847 this->name = NULL;
848 }
849 free (this);
851 this = next;
852 } /* while (this != NULL) */
853 } /* }}} void rrdc_stats_free */
855 /*
856 * vim: set sw=2 sts=2 ts=8 et fdm=marker :
857 */