512c33496fc072f66fa6f5c4e699c886358eae61
1 /*
2 **
3 ** collectd-amqp
4 ** Copyright (c) <2009> <sebastien.pahl@dotcloud.com>
5 **
6 ** Permission is hereby granted, free of charge, to any person
7 ** obtaining a copy of this software and associated documentation
8 ** files (the "Software"), to deal in the Software without
9 ** restriction, including without limitation the rights to use,
10 ** copy, modify, merge, publish, distribute, sublicense, and/or sell
11 ** copies of the Software, and to permit persons to whom the
12 ** Software is furnished to do so, subject to the following
13 ** conditions:
14 **
15 ** The above copyright notice and this permission notice shall be
16 ** included in all copies or substantial portions of the Software.
17 **
18 ** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
19 ** EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
20 ** OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
21 ** NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 ** HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
23 ** WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24 ** FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
25 ** OTHER DEALINGS IN THE SOFTWARE.
26 **
27 */
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
34 #include <collectd.h>
35 #include <common.h>
36 #include <plugin.h>
37 #include <utils_format_json.h>
39 #include <amqp.h>
40 #include <amqp_framing.h>
42 #define PLUGIN_NAME "amqp"
44 static int port;
45 static char *host = NULL;
46 static char *vhost = NULL;
47 static char *user = NULL;
48 static char *password = NULL;
49 static char *exchange = NULL;
50 static char *routingkey = NULL;
52 static const char *config_keys[] =
53 {
54 "Host",
55 "Port",
56 "VHost",
57 "User",
58 "Password",
59 "Exchange",
60 "RoutingKey"
61 };
63 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
65 static void config_free(char *var)
66 {
67 if (var != NULL)
68 free(var);
69 }
71 static int config_set(char **var, const char *value)
72 {
73 config_free(*var);
74 if ((*var = strdup(value)) == NULL)
75 return (1);
76 return (0);
77 }
79 static int config(const char *key, const char *value)
80 {
81 if (strcasecmp(key, "host") == 0)
82 return (config_set(&host, value));
83 else if(strcasecmp(key, "port") == 0)
84 {
85 port = atoi(value);
86 return (0);
87 }
88 else if (strcasecmp(key, "vhost") == 0)
89 return (config_set(&vhost, value));
90 else if (strcasecmp(key, "user") == 0)
91 return (config_set(&user, value));
92 else if (strcasecmp(key, "password") == 0)
93 return (config_set(&password, value));
94 else if (strcasecmp(key, "exchange") == 0)
95 return (config_set(&exchange, value));
96 else if (strcasecmp(key, "routingkey") == 0)
97 return (config_set(&routingkey, value));
98 return (-1);
99 }
101 static int amqp_write(const data_set_t *ds, const value_list_t *vl, user_data_t *user_data)
102 {
103 int error;
104 int sockfd;
105 size_t bfree;
106 size_t bfill;
107 char buffer[4096];
108 amqp_rpc_reply_t reply;
109 amqp_connection_state_t conn;
110 amqp_basic_properties_t props;
112 conn = amqp_new_connection();
113 if ((sockfd = amqp_open_socket(host, port)) < 0)
114 {
115 amqp_destroy_connection(conn);
116 return (1);
117 }
118 amqp_set_sockfd(conn, sockfd);
119 reply = amqp_login(conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, user, password);
120 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
121 {
122 amqp_destroy_connection(conn);
123 close(sockfd);
124 return (1);
125 }
126 amqp_channel_open(conn, 1);
127 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
128 {
129 amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
130 amqp_destroy_connection(conn);
131 close(sockfd);
132 return (1);
133 }
134 error = 0;
135 memset(buffer, 0, sizeof(buffer));
136 bfree = sizeof(buffer);
137 bfill = 0;
138 format_json_initialize(buffer, &bfill, &bfree);
139 format_json_value_list(buffer, &bfill, &bfree, ds, vl);
140 format_json_finalize(buffer, &bfill, &bfree);
141 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
142 props.content_type = amqp_cstring_bytes("application/json");
143 props.delivery_mode = 2; // persistent delivery mode
144 error = amqp_basic_publish(conn,
145 1,
146 amqp_cstring_bytes(exchange),
147 amqp_cstring_bytes(routingkey),
148 0,
149 0,
150 &props,
151 amqp_cstring_bytes(buffer));
152 reply = amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
153 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
154 error = 1;
155 reply = amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
156 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
157 error = 1;
158 amqp_destroy_connection(conn);
159 if (close(sockfd) < 0)
160 error = 1;
161 return (error);
162 }
164 static int shutdown(void)
165 {
166 config_free(host);
167 config_free(vhost);
168 config_free(user);
169 config_free(password);
170 config_free(exchange);
171 config_free(routingkey);
172 return (0);
173 }
175 void module_register(void)
176 {
177 plugin_register_config(PLUGIN_NAME, config, config_keys, config_keys_num);
178 plugin_register_write(PLUGIN_NAME, amqp_write, NULL);
179 plugin_register_shutdown(PLUGIN_NAME, shutdown);
180 }