Code

email plugin: Unlock mutexes before submitting the data.
[collectd.git] / src / email.c
index 401ac928108ec6db4e7b8b1067de20c2dbea27ef..64b82e465c1af5ea9df81f0eb59fbfdf50f7478e 100644 (file)
@@ -84,6 +84,9 @@
 #define MAX_CONNS 5
 #define MAX_CONNS_LIMIT 16384
 
+#define log_err(...) syslog (LOG_ERR, MODULE_NAME": "__VA_ARGS__)
+#define log_warn(...) syslog (LOG_WARNING, MODULE_NAME": "__VA_ARGS__)
+
 /*
  * Private data structures
  */
@@ -115,7 +118,8 @@ typedef struct conn {
 
        /* buffer to read data to */
        char *buffer;
-       int  idx; /* current position in buffer */
+       int  idx; /* current write position in buffer */
+       int  length; /* length of the current line, i.e. index of '\0' */
 
        struct conn *next;
 } conn_t;
@@ -286,7 +290,7 @@ static void type_list_incr (type_list_t *list, char *name, int incr)
 
 /* Read a single character from the socket. If an error occurs or end-of-file
  * is reached return '\0'. */
-char read_char (conn_t *src)
+static char read_char (conn_t *src)
 {
        char ret = '\0';
 
@@ -296,7 +300,7 @@ char read_char (conn_t *src)
        FD_SET (src->socket, &fdset);
 
        if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
-               syslog (LOG_ERR, "select() failed: %s", strerror (errno));
+               log_err ("select() failed: %s", strerror (errno));
                return '\0';
        }
 
@@ -308,7 +312,7 @@ char read_char (conn_t *src)
                errno = 0;
                if (0 > (len = read (src->socket, (void *)&ret, 1))) {
                        if (EINTR != errno) {
-                               syslog (LOG_ERR, "read() failed: %s", strerror (errno));
+                               log_err ("read() failed: %s", strerror (errno));
                                return '\0';
                        }
                }
@@ -317,13 +321,12 @@ char read_char (conn_t *src)
                        return '\0';
        } while (EINTR == errno);
        return ret;
-} /* char read_char (conn_t *) */
+} /* static char read_char (conn_t *) */
 
 /* Read a single line (terminated by '\n') from the the socket.
  *
  * The return value is zero terminated and does not contain any newline
- * characters. In case that no complete line is available (non-blocking mode
- * should be enabled) an empty string is returned.
+ * characters.
  *
  * If an error occurs or end-of-file is reached return NULL.
  *
@@ -331,12 +334,18 @@ char read_char (conn_t *src)
  * characters of the input stream, the line will will be ignored! By
  * definition we should not get any longer input lines, thus this is
  * acceptable in this case ;-) */
-char *read_line (conn_t *src)
+static char *read_line (conn_t *src)
 {
-       int  i = 0;
-       char *ret;
+       int i = 0;
 
-       assert (BUFSIZE > src->idx);
+       assert ((BUFSIZE >= src->idx) && (src->idx >= 0));
+       assert ((src->idx > src->length) || (src->length == 0));
+
+       if (src->length > 0) { /* remove old line */
+               src->idx -= (src->length + 1);
+               memmove (src->buffer, src->buffer + src->length + 1, src->idx);
+               src->length = 0;
+       }
 
        for (i = 0; i < src->idx; ++i) {
                if ('\n' == src->buffer[i])
@@ -352,7 +361,7 @@ char *read_line (conn_t *src)
                FD_SET (src->socket, &fdset);
 
                if (-1 == select (src->socket + 1, &fdset, NULL, NULL, NULL)) {
-                       syslog (LOG_ERR, "select() failed: %s", strerror (errno));
+                       log_err ("select() failed: %s", strerror (errno));
                        return NULL;
                }
 
@@ -364,7 +373,7 @@ char *read_line (conn_t *src)
                                                        (void *)(src->buffer + src->idx),
                                                        BUFSIZE - src->idx))) {
                                if (EINTR != errno) {
-                                       syslog (LOG_ERR, "read() failed: %s", strerror (errno));
+                                       log_err ("read() failed: %s", strerror (errno));
                                        return NULL;
                                }
                        }
@@ -381,9 +390,7 @@ char *read_line (conn_t *src)
                }
 
                if (i == src->idx) {
-                       ret = (char *)smalloc (1);
-
-                       ret[0] = '\0';
+                       src->length = 0;
 
                        if (BUFSIZE == src->idx) { /* no space left in buffer */
                                while ('\n' != read_char (src))
@@ -391,22 +398,15 @@ char *read_line (conn_t *src)
 
                                src->idx = 0;
                        }
-                       return ret;
+                       return read_line (src);
                }
        }
 
-       ret = (char *)smalloc (i + 1);
-       memcpy (ret, src->buffer, i + 1);
-       ret[i] = '\0';
-
-       src->idx -= (i + 1);
+       src->buffer[i] = '\0';
+       src->length    = i;
 
-       if (0 == src->idx)
-               src->buffer[0] = '\0';
-       else
-               memmove (src->buffer, src->buffer + i + 1, src->idx);
-       return ret;
-} /* char *read_line (conn_t *) */
+       return src->buffer;
+} /* static char *read_line (conn_t *) */
 
 static void *collect (void *arg)
 {
@@ -438,19 +438,20 @@ static void *collect (void *arg)
 
                connection->buffer = buffer;
                connection->idx    = 0;
+               connection->length = 0;
 
                { /* put the socket in non-blocking mode */
                        int flags = 0;
 
                        errno = 0;
                        if (-1 == fcntl (connection->socket, F_GETFL, &flags)) {
-                               syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
+                               log_err ("fcntl() failed: %s", strerror (errno));
                                loop = 0;
                        }
 
                        errno = 0;
                        if (-1 == fcntl (connection->socket, F_SETFL, flags | O_NONBLOCK)) {
-                               syslog (LOG_ERR, "fcntl() failed: %s", strerror (errno));
+                               log_err ("fcntl() failed: %s", strerror (errno));
                                loop = 0;
                        }
                }
@@ -463,14 +464,8 @@ static void *collect (void *arg)
                                break;
                        }
 
-                       if ('\0' == line[0]) {
-                               free (line);
-                               continue;
-                       }
-
                        if (':' != line[1]) {
-                               syslog (LOG_ERR, "email: syntax error in line '%s'", line);
-                               free (line);
+                               log_err ("syntax error in line '%s'", line);
                                continue;
                        }
 
@@ -481,8 +476,7 @@ static void *collect (void *arg)
                                int  bytes = 0;
 
                                if (NULL == tmp) {
-                                       syslog (LOG_ERR, "email: syntax error in line '%s'", line);
-                                       free (line);
+                                       log_err ("syntax error in line '%s'", line);
                                        continue;
                                }
 
@@ -514,10 +508,8 @@ static void *collect (void *arg)
                                } while (NULL != (type = strtok_r (NULL, ",", &ptr)));
                        }
                        else {
-                               syslog (LOG_ERR, "email: unknown type '%c'", line[0]);
+                               log_err ("unknown type '%c'", line[0]);
                        }
-
-                       free (line);
                } /* while (loop) */
 
                close (connection->socket);
@@ -533,7 +525,7 @@ static void *collect (void *arg)
 
        free (buffer);
        pthread_exit ((void *)0);
-} /* void *collect (void *) */
+} /* static void *collect (void *) */
 
 static void *open_connection (void *arg)
 {
@@ -543,7 +535,7 @@ static void *open_connection (void *arg)
        errno = 0;
        if (-1 == (connector_socket = socket (PF_UNIX, SOCK_STREAM, 0))) {
                disabled = 1;
-               syslog (LOG_ERR, "socket() failed: %s", strerror (errno));
+               log_err ("socket() failed: %s", strerror (errno));
                pthread_exit ((void *)1);
        }
 
@@ -558,14 +550,14 @@ static void *open_connection (void *arg)
                                offsetof (struct sockaddr_un, sun_path)
                                        + strlen(addr.sun_path))) {
                disabled = 1;
-               syslog (LOG_ERR, "bind() failed: %s", strerror (errno));
+               log_err ("bind() failed: %s", strerror (errno));
                pthread_exit ((void *)1);
        }
 
        errno = 0;
        if (-1 == listen (connector_socket, 5)) {
                disabled = 1;
-               syslog (LOG_ERR, "listen() failed: %s", strerror (errno));
+               log_err ("listen() failed: %s", strerror (errno));
                pthread_exit ((void *)1);
        }
 
@@ -576,20 +568,20 @@ static void *open_connection (void *arg)
                if (NULL != (grp = getgrnam (sock_group))) {
                        errno = 0;
                        if (0 != chown (SOCK_PATH, (uid_t)-1, grp->gr_gid)) {
-                               syslog (LOG_WARNING, "chown() failed: %s", strerror (errno));
+                               log_warn ("chown() failed: %s", strerror (errno));
                        }
                }
                else {
-                       syslog (LOG_WARNING, "getgrnam() failed: %s", strerror (errno));
+                       log_warn ("getgrnam() failed: %s", strerror (errno));
                }
        }
        else {
-               syslog (LOG_WARNING, "not running as root");
+               log_warn ("not running as root");
        }
 
        errno = 0;
        if (0 != chmod (SOCK_PATH, sock_perms)) {
-               syslog (LOG_WARNING, "chmod() failed: %s", strerror (errno));
+               log_warn ("chmod() failed: %s", strerror (errno));
        }
 
        { /* initialize collector threads */
@@ -615,8 +607,7 @@ static void *open_connection (void *arg)
 
                        if (0 != (err = pthread_create (&collectors[i]->thread, &ptattr,
                                                        collect, collectors[i]))) {
-                               syslog (LOG_ERR, "pthread_create() failed: %s",
-                                               strerror (err));
+                               log_err ("pthread_create() failed: %s", strerror (err));
                        }
                }
 
@@ -643,7 +634,7 @@ static void *open_connection (void *arg)
                        if (-1 == (remote = accept (connector_socket, NULL, NULL))) {
                                if (EINTR != errno) {
                                        disabled = 1;
-                                       syslog (LOG_ERR, "accept() failed: %s", strerror (errno));
+                                       log_err ("accept() failed: %s", strerror (errno));
                                        pthread_exit ((void *)1);
                                }
                        }
@@ -670,7 +661,7 @@ static void *open_connection (void *arg)
                pthread_cond_signal (&conn_available);
        }
        pthread_exit ((void *)0);
-} /* void *open_connection (void *) */
+} /* static void *open_connection (void *) */
 #endif /* EMAIL_HAVE_READ */
 
 static void email_init (void)
@@ -681,7 +672,7 @@ static void email_init (void)
        if (0 != (err = pthread_create (&connector, NULL,
                                open_connection, NULL))) {
                disabled = 1;
-               syslog (LOG_ERR, "pthread_create() failed: %s", strerror (err));
+               log_err ("pthread_create() failed: %s", strerror (err));
                return;
        }
 #endif /* EMAIL_HAVE_READ */
@@ -696,15 +687,15 @@ static void email_shutdown (void)
        if (disabled)
                return;
 
-       close (connector_socket);
        pthread_kill (connector, SIGTERM);
+       close (connector_socket);
 
        /* don't allow any more connections to be processed */
        pthread_mutex_lock (&conns_mutex);
 
        for (i = 0; i < max_conns; ++i) {
-               close (collectors[i]->socket);
                pthread_kill (collectors[i]->thread, SIGTERM);
+               close (collectors[i]->socket);
        }
 
        pthread_mutex_unlock (&conns_mutex);
@@ -765,9 +756,6 @@ static void type_submit (char *plugin, char *inst, int value)
        char buf[BUFSIZE] = "";
        int  len          = 0;
 
-       if (0 == value)
-               return;
-
        len = snprintf (buf, BUFSIZE, "%u:%i", (unsigned int)curtime, value);
        if ((len < 0) || (len >= BUFSIZE))
                return;
@@ -781,58 +769,123 @@ static void score_submit (double value)
        char buf[BUFSIZE] = "";
        int  len          = 0;
 
-       if (0.0 == value)
-               return;
-
        len = snprintf (buf, BUFSIZE, "%u:%.2f", (unsigned int)curtime, value);
        if ((len < 0) || (len >= BUFSIZE))
                return;
 
        plugin_submit ("email_spam_score", NULL, buf);
        return;
+} /* static void score_submit (double) */
+
+/* Copy list l1 to list l2. l2 may partly exist already, but it is assumed
+ * that neither the order nor the name of any element of either list is
+ * changed and no elements are deleted. The values of l1 are reset to zero
+ * after they have been copied to l2. */
+static void copy_type_list (type_list_t *l1, type_list_t *l2)
+{
+       type_t *ptr1;
+       type_t *ptr2;
+
+       type_t *last = NULL;
+
+       for (ptr1 = l1->head, ptr2 = l2->head; NULL != ptr1;
+                       ptr1 = ptr1->next, last = ptr2, ptr2 = ptr2->next) {
+               if (NULL == ptr2) {
+                       ptr2 = (type_t *)smalloc (sizeof (type_t));
+                       ptr2->name = NULL;
+                       ptr2->next = NULL;
+
+                       if (NULL == last) {
+                               l2->head = ptr2;
+                       }
+                       else {
+                               last->next = ptr2;
+                       }
+
+                       l2->tail = ptr2;
+               }
+
+               if (NULL == ptr2->name) {
+                       ptr2->name = sstrdup (ptr1->name);
+               }
+
+               ptr2->value = ptr1->value;
+               ptr1->value = 0;
+       }
+       return;
 }
 
 static void email_read (void)
 {
        type_t *ptr;
 
+       double sc;
+
+       static type_list_t *cnt;
+       static type_list_t *sz;
+       static type_list_t *chk;
+
        if (disabled)
                return;
 
-       pthread_mutex_lock (&count_mutex);
+       if (NULL == cnt) {
+               cnt = (type_list_t *)smalloc (sizeof (type_list_t));
+               cnt->head = NULL;
+       }
 
-       for (ptr = count.head; NULL != ptr; ptr = ptr->next) {
-               type_submit ("email_count", ptr->name, ptr->value);
-               ptr->value = 0;
+       if (NULL == sz) {
+               sz = (type_list_t *)smalloc (sizeof (type_list_t));
+               sz->head = NULL;
+       }
+
+       if (NULL == chk) {
+               chk = (type_list_t *)smalloc (sizeof (type_list_t));
+               chk->head = NULL;
        }
 
+       /* email count */
+       pthread_mutex_lock (&count_mutex);
+
+       copy_type_list (&count, cnt);
+
        pthread_mutex_unlock (&count_mutex);
 
+       for (ptr = cnt->head; NULL != ptr; ptr = ptr->next) {
+               type_submit ("email_count", ptr->name, ptr->value);
+       }
+
+       /* email size */
        pthread_mutex_lock (&size_mutex);
 
-       for (ptr = size.head; NULL != ptr; ptr = ptr->next) {
-               type_submit ("email_size", ptr->name, ptr->value);
-               ptr->value = 0;
-       }
+       copy_type_list (&size, sz);
 
        pthread_mutex_unlock (&size_mutex);
 
+       for (ptr = sz->head; NULL != ptr; ptr = ptr->next) {
+               type_submit ("email_size", ptr->name, ptr->value);
+       }
+
+       /* spam score */
        pthread_mutex_lock (&score_mutex);
 
-       score_submit (score);
+       sc = score;
        score = 0.0;
        score_count = 0;
 
        pthread_mutex_unlock (&score_mutex);
 
+       score_submit (sc);
+
+       /* spam checks */
        pthread_mutex_lock (&check_mutex);
 
-       for (ptr = check.head; NULL != ptr; ptr = ptr->next) {
-               type_submit ("email_spam_check", ptr->name, ptr->value);
-               ptr->value = 0;
-       }
+       copy_type_list (&check, chk);
 
        pthread_mutex_unlock (&check_mutex);
+
+       for (ptr = chk->head; NULL != ptr; ptr = ptr->next) {
+               type_submit ("email_spam_check", ptr->name, ptr->value);
+       }
        return;
 } /* static void read (void) */
 #else /* if !EMAIL_HAVE_READ */