2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 #include <sys/cdefs.h>
29 __FBSDID("$FreeBSD$");
31 #include <sys/socket.h>
33 #include <sys/types.h>
34 #include <sys/event.h>
46 #include "mp_rs_query.h"
47 #include "mp_ws_query.h"
48 #include "singletons.h"
50 static int on_mp_read_session_close_notification(struct query_state *);
51 static void on_mp_read_session_destroy(struct query_state *);
52 static int on_mp_read_session_mapper(struct query_state *);
53 /* int on_mp_read_session_request_read1(struct query_state *); */
54 static int on_mp_read_session_request_read2(struct query_state *);
55 static int on_mp_read_session_request_process(struct query_state *);
56 static int on_mp_read_session_response_write1(struct query_state *);
57 static int on_mp_read_session_read_request_process(struct query_state *);
58 static int on_mp_read_session_read_response_write1(struct query_state *);
59 static int on_mp_read_session_read_response_write2(struct query_state *);
62 * This function is used as the query_state's destroy_func to make the
63 * proper cleanup in case of errors.
66 on_mp_read_session_destroy(struct query_state *qstate)
68 TRACE_IN(on_mp_read_session_destroy);
69 finalize_comm_element(&qstate->request);
70 finalize_comm_element(&qstate->response);
72 if (qstate->mdata != NULL) {
73 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
74 close_cache_mp_read_session(
75 (cache_mp_read_session)qstate->mdata);
76 configuration_unlock_entry(qstate->config_entry,
79 TRACE_OUT(on_mp_read_session_destroy);
83 * The functions below are used to process multipart read session initiation
85 * - on_mp_read_session_request_read1 and on_mp_read_session_request_read2 read
87 * - on_mp_read_session_request_process processes it
88 * - on_mp_read_session_response_write1 sends the response
91 on_mp_read_session_request_read1(struct query_state *qstate)
93 struct cache_mp_read_session_request *c_mp_rs_request;
96 TRACE_IN(on_mp_read_session_request_read1);
97 if (qstate->kevent_watermark == 0)
98 qstate->kevent_watermark = sizeof(size_t);
100 init_comm_element(&qstate->request,
101 CET_MP_READ_SESSION_REQUEST);
102 c_mp_rs_request = get_cache_mp_read_session_request(
105 result = qstate->read_func(qstate,
106 &c_mp_rs_request->entry_length, sizeof(size_t));
108 if (result != sizeof(size_t)) {
109 TRACE_OUT(on_mp_read_session_request_read1);
113 if (BUFSIZE_INVALID(c_mp_rs_request->entry_length)) {
114 TRACE_OUT(on_mp_read_session_request_read1);
118 c_mp_rs_request->entry = (char *)calloc(1,
119 c_mp_rs_request->entry_length + 1);
120 assert(c_mp_rs_request->entry != NULL);
122 qstate->kevent_watermark = c_mp_rs_request->entry_length;
123 qstate->process_func = on_mp_read_session_request_read2;
125 TRACE_OUT(on_mp_read_session_request_read1);
130 on_mp_read_session_request_read2(struct query_state *qstate)
132 struct cache_mp_read_session_request *c_mp_rs_request;
135 TRACE_IN(on_mp_read_session_request_read2);
136 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
138 result = qstate->read_func(qstate, c_mp_rs_request->entry,
139 c_mp_rs_request->entry_length);
141 if (result != qstate->kevent_watermark) {
142 LOG_ERR_3("on_mp_read_session_request_read2",
144 TRACE_OUT(on_mp_read_session_request_read2);
148 qstate->kevent_watermark = 0;
149 qstate->process_func = on_mp_read_session_request_process;
150 TRACE_OUT(on_mp_read_session_request_read2);
155 on_mp_read_session_request_process(struct query_state *qstate)
157 struct cache_mp_read_session_request *c_mp_rs_request;
158 struct cache_mp_read_session_response *c_mp_rs_response;
159 cache_mp_read_session rs;
161 char *dec_cache_entry_name;
165 cache_mp_write_session ws;
166 struct agent *lookup_agent;
167 struct multipart_agent *mp_agent;
171 TRACE_IN(on_mp_read_session_request_process);
172 init_comm_element(&qstate->response, CET_MP_READ_SESSION_RESPONSE);
173 c_mp_rs_response = get_cache_mp_read_session_response(
175 c_mp_rs_request = get_cache_mp_read_session_request(&qstate->request);
177 qstate->config_entry = configuration_find_entry(
178 s_configuration, c_mp_rs_request->entry);
179 if (qstate->config_entry == NULL) {
180 c_mp_rs_response->error_code = ENOENT;
182 LOG_ERR_2("read_session_request",
183 "can't find configuration entry '%s'."
184 " aborting request", c_mp_rs_request->entry);
188 if (qstate->config_entry->enabled == 0) {
189 c_mp_rs_response->error_code = EACCES;
191 LOG_ERR_2("read_session_request",
192 "configuration entry '%s' is disabled",
193 c_mp_rs_request->entry);
197 if (qstate->config_entry->perform_actual_lookups != 0)
198 dec_cache_entry_name = strdup(
199 qstate->config_entry->mp_cache_params.entry_name);
201 #ifdef NS_NSCD_EID_CHECKING
202 if (check_query_eids(qstate) != 0) {
203 c_mp_rs_response->error_code = EPERM;
208 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
209 qstate->config_entry->mp_cache_params.entry_name);
212 assert(dec_cache_entry_name != NULL);
214 configuration_lock_rdlock(s_configuration);
215 c_entry = find_cache_entry(s_cache, dec_cache_entry_name);
216 configuration_unlock(s_configuration);
218 if ((c_entry == INVALID_CACHE) &&
219 (qstate->config_entry->perform_actual_lookups != 0))
220 c_entry = register_new_mp_cache_entry(qstate,
221 dec_cache_entry_name);
223 free(dec_cache_entry_name);
225 if (c_entry != INVALID_CACHE_ENTRY) {
226 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
227 rs = open_cache_mp_read_session(c_entry);
228 configuration_unlock_entry(qstate->config_entry,
231 if ((rs == INVALID_CACHE_MP_READ_SESSION) &&
232 (qstate->config_entry->perform_actual_lookups != 0)) {
233 lookup_agent = find_agent(s_agent_table,
234 c_mp_rs_request->entry, MULTIPART_AGENT);
236 if ((lookup_agent != NULL) &&
237 (lookup_agent->type == MULTIPART_AGENT)) {
238 mp_agent = (struct multipart_agent *)
240 mdata = mp_agent->mp_init_func();
243 * Multipart agents read the whole snapshot
244 * of the data at one time.
246 configuration_lock_entry(qstate->config_entry,
248 ws = open_cache_mp_write_session(c_entry);
249 configuration_unlock_entry(qstate->config_entry,
254 res = mp_agent->mp_lookup_func(&buffer,
258 if ((res & NS_TERMINATE) &&
260 configuration_lock_entry(
261 qstate->config_entry,
263 if (cache_mp_write(ws, buffer,
265 abandon_cache_mp_write_session(ws);
268 configuration_unlock_entry(
269 qstate->config_entry,
275 configuration_lock_entry(
276 qstate->config_entry,
278 close_cache_mp_write_session(ws);
279 configuration_unlock_entry(
280 qstate->config_entry,
286 } while ((res & NS_TERMINATE) &&
290 configuration_lock_entry(qstate->config_entry,
292 rs = open_cache_mp_read_session(c_entry);
293 configuration_unlock_entry(qstate->config_entry,
298 if (rs == INVALID_CACHE_MP_READ_SESSION)
299 c_mp_rs_response->error_code = -1;
302 qstate->destroy_func = on_mp_read_session_destroy;
304 configuration_lock_entry(qstate->config_entry,
306 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
307 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
308 memcpy(&qstate->timeout,
309 &qstate->config_entry->mp_query_timeout,
310 sizeof(struct timeval));
311 configuration_unlock_entry(qstate->config_entry,
315 c_mp_rs_response->error_code = -1;
318 qstate->process_func = on_mp_read_session_response_write1;
319 qstate->kevent_watermark = sizeof(int);
320 qstate->kevent_filter = EVFILT_WRITE;
322 TRACE_OUT(on_mp_read_session_request_process);
327 on_mp_read_session_response_write1(struct query_state *qstate)
329 struct cache_mp_read_session_response *c_mp_rs_response;
332 TRACE_IN(on_mp_read_session_response_write1);
333 c_mp_rs_response = get_cache_mp_read_session_response(
335 result = qstate->write_func(qstate, &c_mp_rs_response->error_code,
338 if (result != sizeof(int)) {
339 LOG_ERR_3("on_mp_read_session_response_write1",
341 TRACE_OUT(on_mp_read_session_response_write1);
345 if (c_mp_rs_response->error_code == 0) {
346 qstate->kevent_watermark = sizeof(int);
347 qstate->process_func = on_mp_read_session_mapper;
348 qstate->kevent_filter = EVFILT_READ;
350 qstate->kevent_watermark = 0;
351 qstate->process_func = NULL;
353 TRACE_OUT(on_mp_read_session_response_write1);
358 * Mapper function is used to avoid multiple connections for each session
359 * write or read requests. After processing the request, it does not close
360 * the connection, but waits for the next request.
363 on_mp_read_session_mapper(struct query_state *qstate)
368 TRACE_IN(on_mp_read_session_mapper);
369 if (qstate->kevent_watermark == 0) {
370 qstate->kevent_watermark = sizeof(int);
372 result = qstate->read_func(qstate, &elem_type, sizeof(int));
373 if (result != sizeof(int)) {
374 LOG_ERR_3("on_mp_read_session_mapper",
376 TRACE_OUT(on_mp_read_session_mapper);
381 case CET_MP_READ_SESSION_READ_REQUEST:
382 qstate->kevent_watermark = 0;
383 qstate->process_func =
384 on_mp_read_session_read_request_process;
386 case CET_MP_READ_SESSION_CLOSE_NOTIFICATION:
387 qstate->kevent_watermark = 0;
388 qstate->process_func =
389 on_mp_read_session_close_notification;
392 qstate->kevent_watermark = 0;
393 qstate->process_func = NULL;
394 LOG_ERR_3("on_mp_read_session_mapper",
395 "unknown element type");
396 TRACE_OUT(on_mp_read_session_mapper);
400 TRACE_OUT(on_mp_read_session_mapper);
405 * The functions below are used to process multipart read sessions read
406 * requests. User doesn't have to pass any kind of data, besides the
407 * request identificator itself. So we don't need any XXX_read functions and
408 * start with the XXX_process function.
409 * - on_mp_read_session_read_request_process processes it
410 * - on_mp_read_session_read_response_write1 and
411 * on_mp_read_session_read_response_write2 sends the response
414 on_mp_read_session_read_request_process(struct query_state *qstate)
416 struct cache_mp_read_session_read_response *read_response;
418 TRACE_IN(on_mp_read_session_response_process);
419 init_comm_element(&qstate->response, CET_MP_READ_SESSION_READ_RESPONSE);
420 read_response = get_cache_mp_read_session_read_response(
423 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
424 read_response->error_code = cache_mp_read(
425 (cache_mp_read_session)qstate->mdata, NULL,
426 &read_response->data_size);
428 if (read_response->error_code == 0) {
429 read_response->data = (char *)malloc(read_response->data_size);
430 assert(read_response != NULL);
431 read_response->error_code = cache_mp_read(
432 (cache_mp_read_session)qstate->mdata,
434 &read_response->data_size);
436 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
438 if (read_response->error_code == 0)
439 qstate->kevent_watermark = sizeof(size_t) + sizeof(int);
441 qstate->kevent_watermark = sizeof(int);
442 qstate->process_func = on_mp_read_session_read_response_write1;
443 qstate->kevent_filter = EVFILT_WRITE;
445 TRACE_OUT(on_mp_read_session_response_process);
450 on_mp_read_session_read_response_write1(struct query_state *qstate)
452 struct cache_mp_read_session_read_response *read_response;
455 TRACE_IN(on_mp_read_session_read_response_write1);
456 read_response = get_cache_mp_read_session_read_response(
459 result = qstate->write_func(qstate, &read_response->error_code,
461 if (read_response->error_code == 0) {
462 result += qstate->write_func(qstate, &read_response->data_size,
464 if (result != qstate->kevent_watermark) {
465 TRACE_OUT(on_mp_read_session_read_response_write1);
466 LOG_ERR_3("on_mp_read_session_read_response_write1",
471 qstate->kevent_watermark = read_response->data_size;
472 qstate->process_func = on_mp_read_session_read_response_write2;
474 if (result != qstate->kevent_watermark) {
475 LOG_ERR_3("on_mp_read_session_read_response_write1",
477 TRACE_OUT(on_mp_read_session_read_response_write1);
481 qstate->kevent_watermark = 0;
482 qstate->process_func = NULL;
485 TRACE_OUT(on_mp_read_session_read_response_write1);
490 on_mp_read_session_read_response_write2(struct query_state *qstate)
492 struct cache_mp_read_session_read_response *read_response;
495 TRACE_IN(on_mp_read_session_read_response_write2);
496 read_response = get_cache_mp_read_session_read_response(
498 result = qstate->write_func(qstate, read_response->data,
499 read_response->data_size);
500 if (result != qstate->kevent_watermark) {
501 LOG_ERR_3("on_mp_read_session_read_response_write2",
503 TRACE_OUT(on_mp_read_session_read_response_write2);
507 finalize_comm_element(&qstate->request);
508 finalize_comm_element(&qstate->response);
510 qstate->kevent_watermark = sizeof(int);
511 qstate->process_func = on_mp_read_session_mapper;
512 qstate->kevent_filter = EVFILT_READ;
514 TRACE_OUT(on_mp_read_session_read_response_write2);
519 * Handles session close notification by calling close_cache_mp_read_session
523 on_mp_read_session_close_notification(struct query_state *qstate)
526 TRACE_IN(on_mp_read_session_close_notification);
527 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
528 close_cache_mp_read_session((cache_mp_read_session)qstate->mdata);
529 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
530 qstate->mdata = NULL;
531 qstate->kevent_watermark = 0;
532 qstate->process_func = NULL;
533 TRACE_OUT(on_mp_read_session_close_notification);