1f61080cdff9c799f1f2299d91e4d7461bbfe055
1 /**
2 * collectd - src/amqp.c
3 * Copyright (C) 2009 Sebastien Pahl
4 * Copyright (C) 2010 Florian Forster
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a
7 * copy of this software and associated documentation files (the "Software"),
8 * to deal in the Software without restriction, including without limitation
9 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10 * and/or sell copies of the Software, and to permit persons to whom the
11 * Software is furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in
14 * all copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 *
24 * Authors:
25 * Sebastien Pahl <sebastien.pahl at dotcloud.com>
26 * Florian Forster <octo at verplant.org>
27 **/
29 #include <stdint.h>
30 #include <stdlib.h>
31 #include <unistd.h>
32 #include <strings.h>
33 #include <pthread.h>
35 #include "collectd.h"
36 #include "common.h"
37 #include "plugin.h"
38 #include "utils_format_json.h"
40 #include <amqp.h>
41 #include <amqp_framing.h>
43 /* Defines for the delivery mode. I have no idea why they're not defined by the
44 * library.. */
45 #define AMQP_DM_VOLATILE 1
46 #define AMQP_DM_PERSISTENT 2
48 /*
49 * Global variables
50 */
51 static int port = 5672;
52 static char *host = NULL;
53 static char *vhost = NULL;
54 static char *user = NULL;
55 static char *password = NULL;
56 static char *exchange = NULL;
57 static char *routingkey = NULL;
58 static uint8_t delivery_mode = AMQP_DM_VOLATILE;
59 static _Bool store_rates = 0;
61 static amqp_connection_state_t amqp_conn = NULL;
62 static pthread_mutex_t amqp_conn_lock = PTHREAD_MUTEX_INITIALIZER;
64 static const char *config_keys[] =
65 {
66 "Host",
67 "Port",
68 "VHost",
69 "User",
70 "Password",
71 "Exchange",
72 "RoutingKey",
73 "Persistent",
74 "StoreRates"
75 };
76 static int config_keys_num = STATIC_ARRAY_SIZE(config_keys);
78 /*
79 * Functions
80 */
81 static int config_set(char **var, const char *value)
82 {
83 sfree(*var);
84 if ((*var = strdup(value)) == NULL)
85 return (1);
86 return (0);
87 } /* int config_set */
89 static int config(const char *key, const char *value)
90 {
91 if (strcasecmp(key, "host") == 0)
92 return (config_set(&host, value));
93 else if(strcasecmp(key, "port") == 0)
94 {
95 int tmp;
97 tmp = service_name_to_port_number (value);
98 if (tmp <= 0)
99 {
100 ERROR ("AMQP plugin: Cannot parse `%s' as a "
101 "service name (port number).", value);
102 return (1);
103 }
105 port = tmp;
106 return (0);
107 }
108 else if (strcasecmp(key, "vhost") == 0)
109 return (config_set(&vhost, value));
110 else if (strcasecmp(key, "user") == 0)
111 return (config_set(&user, value));
112 else if (strcasecmp(key, "password") == 0)
113 return (config_set(&password, value));
114 else if (strcasecmp(key, "exchange") == 0)
115 return (config_set(&exchange, value));
116 else if (strcasecmp(key, "routingkey") == 0)
117 return (config_set(&routingkey, value));
118 else if (strcasecmp ("Persistent", key) == 0)
119 {
120 if (IS_TRUE (value))
121 delivery_mode = AMQP_DM_PERSISTENT;
122 else
123 delivery_mode = AMQP_DM_VOLATILE;
124 return (0);
125 }
126 else if (strcasecmp ("StoreRates", key) == 0)
127 {
128 if (IS_TRUE (value))
129 store_rates = 1;
130 else
131 store_rates = 0;
132 return (0);
133 }
134 return (-1);
135 } /* int config */
137 static int amqp_connect (void)
138 {
139 amqp_rpc_reply_t reply;
140 int sockfd;
141 int status;
143 if (amqp_conn != NULL)
144 return (0);
146 amqp_conn = amqp_new_connection ();
147 if (amqp_conn == NULL)
148 {
149 ERROR ("amqp plugin: amqp_new_connection failed.");
150 return (ENOMEM);
151 }
153 sockfd = amqp_open_socket (host, port);
154 if (sockfd < 0)
155 {
156 char errbuf[1024];
157 status = (-1) * sockfd;
158 ERROR ("amqp plugin: amqp_open_socket failed: %s",
159 sstrerror (status, errbuf, sizeof (errbuf)));
160 amqp_destroy_connection(amqp_conn);
161 amqp_conn = NULL;
162 return (status);
163 }
165 amqp_set_sockfd (amqp_conn, sockfd);
167 reply = amqp_login(amqp_conn, vhost,
168 /* channel max = */ 0,
169 /* frame max = */ 131072,
170 /* heartbeat = */ 0,
171 /* authentication: */ AMQP_SASL_METHOD_PLAIN, user, password);
172 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
173 {
174 ERROR ("amqp plugin: amqp_login (vhost = %s, user = %s) failed.",
175 vhost, user);
176 amqp_destroy_connection(amqp_conn);
177 close(sockfd);
178 amqp_conn = NULL;
179 return (1);
180 }
182 amqp_channel_open (amqp_conn, /* channel = */ 1);
183 /* FIXME: Is checking "reply.reply_type" really correct here? How does
184 * it get set? --octo */
185 if (reply.reply_type != AMQP_RESPONSE_NORMAL)
186 {
187 ERROR ("amqp plugin: amqp_channel_open failed.");
188 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
189 amqp_destroy_connection(amqp_conn);
190 close(sockfd);
191 amqp_conn = NULL;
192 return (1);
193 }
195 INFO ("amqp plugin: Successfully opened connection to vhost \"%s\" "
196 "on %s:%i.", vhost, host, port);
197 return (0);
198 } /* int amqp_connect */
200 static int amqp_write_locked (const char *buffer)
201 {
202 amqp_basic_properties_t props;
203 int status;
205 status = amqp_connect ();
206 if (status != 0)
207 return (status);
209 memset (&props, 0, sizeof (props));
210 props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
211 props.content_type = amqp_cstring_bytes("application/json");
212 props.delivery_mode = delivery_mode;
214 status = amqp_basic_publish(amqp_conn,
215 /* channel = */ 1,
216 amqp_cstring_bytes(exchange),
217 amqp_cstring_bytes(routingkey),
218 /* mandatory = */ 0,
219 /* immediate = */ 0,
220 &props,
221 amqp_cstring_bytes(buffer));
222 if (status != 0)
223 {
224 int sockfd;
226 ERROR ("amqp plugin: amqp_basic_publish failed with status %i.",
227 status);
229 sockfd = amqp_get_sockfd (amqp_conn);
230 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
231 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
232 amqp_destroy_connection (amqp_conn);
233 close (sockfd);
234 amqp_conn = NULL;
235 }
237 return (status);
238 } /* int amqp_write_locked */
240 static int amqp_write (const data_set_t *ds, const value_list_t *vl,
241 __attribute__((unused)) user_data_t *user_data)
242 {
243 char buffer[4096];
244 size_t bfree;
245 size_t bfill;
246 int status;
248 if ((ds == NULL) || (vl == NULL))
249 return (EINVAL);
251 memset (buffer, 0, sizeof (buffer));
252 bfree = sizeof (buffer);
253 bfill = 0;
255 format_json_initialize (buffer, &bfill, &bfree);
256 format_json_value_list (buffer, &bfill, &bfree, ds, vl, store_rates);
257 format_json_finalize (buffer, &bfill, &bfree);
259 pthread_mutex_lock (&amqp_conn_lock);
260 status = amqp_write_locked (buffer);
261 pthread_mutex_unlock (&amqp_conn_lock);
263 return (status);
264 } /* int amqp_write */
266 static int shutdown (void)
267 {
268 pthread_mutex_lock (&amqp_conn_lock);
269 if (amqp_conn != NULL)
270 {
271 int sockfd;
273 sockfd = amqp_get_sockfd (amqp_conn);
274 amqp_channel_close (amqp_conn, 1, AMQP_REPLY_SUCCESS);
275 amqp_connection_close (amqp_conn, AMQP_REPLY_SUCCESS);
276 amqp_destroy_connection (amqp_conn);
277 close(sockfd);
278 amqp_conn = NULL;
279 }
280 pthread_mutex_unlock (&amqp_conn_lock);
282 sfree(host);
283 sfree(vhost);
284 sfree(user);
285 sfree(password);
286 sfree(exchange);
287 sfree(routingkey);
289 return (0);
290 } /* int shutdown */
292 void module_register (void)
293 {
294 plugin_register_config ("amqp", config, config_keys, config_keys_num);
295 plugin_register_write ("amqp", amqp_write, NULL);
296 plugin_register_shutdown ("amqp", shutdown);
297 } /* void module_register */
299 /* vim: set sw=4 sts=4 et : */