Code

libcollectdclient: Introduce a "parser" callback.
authorFlorian Forster <octo@collectd.org>
Tue, 2 May 2017 08:49:12 +0000 (10:49 +0200)
committerFlorian Forster <octo@collectd.org>
Tue, 2 May 2017 08:49:12 +0000 (10:49 +0200)
This will allow the network plugin to enqueue received network packets and
parse them using a thread pool.

src/libcollectdclient/collectd/server.h
src/libcollectdclient/server.c

index 90c1c0b70b172da2c95c8387f4215ae3b299ad7c..48b5874418ec8c2dded749f6a171717e45486760 100644 (file)
 
 #include "collectd/lcc_features.h"
 
+#include "collectd/network.h"       /* for lcc_security_level_t */
+#include "collectd/network_parse.h" /* for lcc_network_parse_options_t */
 #include "collectd/types.h"
-#include "collectd/network.h" /* for lcc_security_level_t */
 
 #include <stdint.h>
 
 LCC_BEGIN_DECLS
 
+/* lcc_network_parser_t is a callback that parses received network packets. It
+ * is expected to call lcc_network_parse_options_t.writer with each
+ * lcc_value_list_t it parses that has the required security level. */
+typedef int (*lcc_network_parser_t)(void *payload, size_t payload_size,
+                                    lcc_network_parse_options_t opts);
+
 /* lcc_listener_t holds parameters for running a collectd server. */
 typedef struct {
   /* conn is a UDP socket for the server to listen on. */
@@ -48,6 +55,10 @@ typedef struct {
    * LCC_DEFAULT_PORT. */
   char *service;
 
+  /* parser is the callback used to parse incoming network packets. Defaults to
+   * lcc_network_parse() if set to NULL. */
+  lcc_network_parser_t parser;
+
   /* writer is the callback used to send incoming lcc_value_list_t to. */
   lcc_value_list_writer_t writer;
 
index 4a414bfaeb07d30d0555720ac008b5ba28e88a7e..68e4a9ea1aee2f505930207ad65429bd68a33c03 100644 (file)
@@ -189,6 +189,9 @@ int lcc_listen_and_write(lcc_listener_t srv) {
     /* TODO(octo): this should be a define. */
     srv.buffer_size = 1452;
 
+  if (srv.parser == NULL)
+    srv.parser = lcc_network_parse;
+
   int ret = 0;
   while (42) {
     char buffer[srv.buffer_size];
@@ -200,13 +203,12 @@ int lcc_listen_and_write(lcc_listener_t srv) {
       break;
     }
 
-    /* TODO(octo): implement parse(). */
-    (void)lcc_network_parse(buffer, (size_t)len,
-                            (lcc_network_parse_options_t){
-                                .writer = srv.writer,
-                                .password_lookup = srv.password_lookup,
-                                .security_level = srv.security_level,
-                            });
+    (void)srv.parser(buffer, (size_t)len,
+                     (lcc_network_parse_options_t){
+                         .writer = srv.writer,
+                         .password_lookup = srv.password_lookup,
+                         .security_level = srv.security_level,
+                     });
   }
 
   if (close_socket) {