2 * Copyright (c) 2005 Michael Bushkov <bushman@rsu.ru>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 #include <sys/cdefs.h>
29 #include <sys/types.h>
30 #include <sys/event.h>
31 #include <sys/socket.h>
45 #include "mp_ws_query.h"
46 #include "singletons.h"
48 static int on_mp_write_session_abandon_notification(struct query_state *);
49 static int on_mp_write_session_close_notification(struct query_state *);
50 static void on_mp_write_session_destroy(struct query_state *);
51 static int on_mp_write_session_mapper(struct query_state *);
52 /* int on_mp_write_session_request_read1(struct query_state *); */
53 static int on_mp_write_session_request_read2(struct query_state *);
54 static int on_mp_write_session_request_process(struct query_state *);
55 static int on_mp_write_session_response_write1(struct query_state *);
56 static int on_mp_write_session_write_request_read1(struct query_state *);
57 static int on_mp_write_session_write_request_read2(struct query_state *);
58 static int on_mp_write_session_write_request_process(struct query_state *);
59 static int on_mp_write_session_write_response_write1(struct query_state *);
62 * This function is used as the query_state's destroy_func to make the
63 * proper cleanup in case of errors.
66 on_mp_write_session_destroy(struct query_state *qstate)
69 TRACE_IN(on_mp_write_session_destroy);
70 finalize_comm_element(&qstate->request);
71 finalize_comm_element(&qstate->response);
73 if (qstate->mdata != NULL) {
74 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
75 abandon_cache_mp_write_session(
76 (cache_mp_write_session)qstate->mdata);
77 configuration_unlock_entry(qstate->config_entry,
80 TRACE_OUT(on_mp_write_session_destroy);
84 * The functions below are used to process multipart write session initiation
86 * - on_mp_write_session_request_read1 and on_mp_write_session_request_read2
87 * read the request itself
88 * - on_mp_write_session_request_process processes it
89 * - on_mp_write_session_response_write1 sends the response
92 on_mp_write_session_request_read1(struct query_state *qstate)
94 struct cache_mp_write_session_request *c_mp_ws_request;
97 TRACE_IN(on_mp_write_session_request_read1);
98 if (qstate->kevent_watermark == 0)
99 qstate->kevent_watermark = sizeof(size_t);
101 init_comm_element(&qstate->request,
102 CET_MP_WRITE_SESSION_REQUEST);
103 c_mp_ws_request = get_cache_mp_write_session_request(
106 result = qstate->read_func(qstate,
107 &c_mp_ws_request->entry_length, sizeof(size_t));
109 if (result != sizeof(size_t)) {
110 LOG_ERR_3("on_mp_write_session_request_read1",
112 TRACE_OUT(on_mp_write_session_request_read1);
116 if (BUFSIZE_INVALID(c_mp_ws_request->entry_length)) {
117 LOG_ERR_3("on_mp_write_session_request_read1",
118 "invalid entry_length value");
119 TRACE_OUT(on_mp_write_session_request_read1);
123 c_mp_ws_request->entry = calloc(1,
124 c_mp_ws_request->entry_length + 1);
125 assert(c_mp_ws_request->entry != NULL);
127 qstate->kevent_watermark = c_mp_ws_request->entry_length;
128 qstate->process_func = on_mp_write_session_request_read2;
130 TRACE_OUT(on_mp_write_session_request_read1);
135 on_mp_write_session_request_read2(struct query_state *qstate)
137 struct cache_mp_write_session_request *c_mp_ws_request;
140 TRACE_IN(on_mp_write_session_request_read2);
141 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
143 result = qstate->read_func(qstate, c_mp_ws_request->entry,
144 c_mp_ws_request->entry_length);
146 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
147 LOG_ERR_3("on_mp_write_session_request_read2",
149 TRACE_OUT(on_mp_write_session_request_read2);
153 qstate->kevent_watermark = 0;
154 qstate->process_func = on_mp_write_session_request_process;
156 TRACE_OUT(on_mp_write_session_request_read2);
161 on_mp_write_session_request_process(struct query_state *qstate)
163 struct cache_mp_write_session_request *c_mp_ws_request;
164 struct cache_mp_write_session_response *c_mp_ws_response;
165 cache_mp_write_session ws;
167 char *dec_cache_entry_name;
169 TRACE_IN(on_mp_write_session_request_process);
170 init_comm_element(&qstate->response, CET_MP_WRITE_SESSION_RESPONSE);
171 c_mp_ws_response = get_cache_mp_write_session_response(
173 c_mp_ws_request = get_cache_mp_write_session_request(&qstate->request);
175 qstate->config_entry = configuration_find_entry(
176 s_configuration, c_mp_ws_request->entry);
177 if (qstate->config_entry == NULL) {
178 c_mp_ws_response->error_code = ENOENT;
180 LOG_ERR_2("write_session_request",
181 "can't find configuration entry '%s'. "
182 "aborting request", c_mp_ws_request->entry);
186 if (qstate->config_entry->enabled == 0) {
187 c_mp_ws_response->error_code = EACCES;
189 LOG_ERR_2("write_session_request",
190 "configuration entry '%s' is disabled",
191 c_mp_ws_request->entry);
195 if (qstate->config_entry->perform_actual_lookups != 0) {
196 c_mp_ws_response->error_code = EOPNOTSUPP;
198 LOG_ERR_2("write_session_request",
199 "entry '%s' performs lookups by itself: "
200 "can't write to it", c_mp_ws_request->entry);
203 #ifdef NS_NSCD_EID_CHECKING
204 if (check_query_eids(qstate) != 0) {
205 c_mp_ws_response->error_code = EPERM;
212 * All multipart entries are separated by their name decorations.
213 * For one configuration entry there will be a lot of multipart
214 * cache entries - each with its own decorated name.
216 asprintf(&dec_cache_entry_name, "%s%s", qstate->eid_str,
217 qstate->config_entry->mp_cache_params.cep.entry_name);
218 assert(dec_cache_entry_name != NULL);
220 configuration_lock_rdlock(s_configuration);
221 c_entry = find_cache_entry(s_cache,
222 dec_cache_entry_name);
223 configuration_unlock(s_configuration);
225 if (c_entry == INVALID_CACHE_ENTRY)
226 c_entry = register_new_mp_cache_entry(qstate,
227 dec_cache_entry_name);
229 free(dec_cache_entry_name);
231 assert(c_entry != NULL);
232 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
233 ws = open_cache_mp_write_session(c_entry);
234 if (ws == INVALID_CACHE_MP_WRITE_SESSION)
235 c_mp_ws_response->error_code = -1;
238 qstate->destroy_func = on_mp_write_session_destroy;
240 if ((qstate->config_entry->mp_query_timeout.tv_sec != 0) ||
241 (qstate->config_entry->mp_query_timeout.tv_usec != 0))
242 memcpy(&qstate->timeout,
243 &qstate->config_entry->mp_query_timeout,
244 sizeof(struct timeval));
246 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
249 qstate->process_func = on_mp_write_session_response_write1;
250 qstate->kevent_watermark = sizeof(int);
251 qstate->kevent_filter = EVFILT_WRITE;
253 TRACE_OUT(on_mp_write_session_request_process);
258 on_mp_write_session_response_write1(struct query_state *qstate)
260 struct cache_mp_write_session_response *c_mp_ws_response;
263 TRACE_IN(on_mp_write_session_response_write1);
264 c_mp_ws_response = get_cache_mp_write_session_response(
266 result = qstate->write_func(qstate, &c_mp_ws_response->error_code,
268 if (result != sizeof(int)) {
269 LOG_ERR_3("on_mp_write_session_response_write1",
271 TRACE_OUT(on_mp_write_session_response_write1);
275 if (c_mp_ws_response->error_code == 0) {
276 qstate->kevent_watermark = sizeof(int);
277 qstate->process_func = on_mp_write_session_mapper;
278 qstate->kevent_filter = EVFILT_READ;
280 qstate->kevent_watermark = 0;
281 qstate->process_func = NULL;
283 TRACE_OUT(on_mp_write_session_response_write1);
288 * Mapper function is used to avoid multiple connections for each session
289 * write or read requests. After processing the request, it does not close
290 * the connection, but waits for the next request.
293 on_mp_write_session_mapper(struct query_state *qstate)
298 TRACE_IN(on_mp_write_session_mapper);
299 if (qstate->kevent_watermark == 0) {
300 qstate->kevent_watermark = sizeof(int);
302 result = qstate->read_func(qstate, &elem_type, sizeof(int));
303 if (result != sizeof(int)) {
304 LOG_ERR_3("on_mp_write_session_mapper",
306 TRACE_OUT(on_mp_write_session_mapper);
311 case CET_MP_WRITE_SESSION_WRITE_REQUEST:
312 qstate->kevent_watermark = sizeof(size_t);
313 qstate->process_func =
314 on_mp_write_session_write_request_read1;
316 case CET_MP_WRITE_SESSION_ABANDON_NOTIFICATION:
317 qstate->kevent_watermark = 0;
318 qstate->process_func =
319 on_mp_write_session_abandon_notification;
321 case CET_MP_WRITE_SESSION_CLOSE_NOTIFICATION:
322 qstate->kevent_watermark = 0;
323 qstate->process_func =
324 on_mp_write_session_close_notification;
327 qstate->kevent_watermark = 0;
328 qstate->process_func = NULL;
329 LOG_ERR_2("on_mp_write_session_mapper",
330 "unknown element type");
331 TRACE_OUT(on_mp_write_session_mapper);
335 TRACE_OUT(on_mp_write_session_mapper);
340 * The functions below are used to process multipart write sessions write
342 * - on_mp_write_session_write_request_read1 and
343 * on_mp_write_session_write_request_read2 read the request itself
344 * - on_mp_write_session_write_request_process processes it
345 * - on_mp_write_session_write_response_write1 sends the response
348 on_mp_write_session_write_request_read1(struct query_state *qstate)
350 struct cache_mp_write_session_write_request *write_request;
353 TRACE_IN(on_mp_write_session_write_request_read1);
354 init_comm_element(&qstate->request,
355 CET_MP_WRITE_SESSION_WRITE_REQUEST);
356 write_request = get_cache_mp_write_session_write_request(
359 result = qstate->read_func(qstate, &write_request->data_size,
362 if (result != sizeof(size_t)) {
363 LOG_ERR_3("on_mp_write_session_write_request_read1",
365 TRACE_OUT(on_mp_write_session_write_request_read1);
369 if (BUFSIZE_INVALID(write_request->data_size)) {
370 LOG_ERR_3("on_mp_write_session_write_request_read1",
371 "invalid data_size value");
372 TRACE_OUT(on_mp_write_session_write_request_read1);
376 write_request->data = calloc(1, write_request->data_size);
377 assert(write_request->data != NULL);
379 qstate->kevent_watermark = write_request->data_size;
380 qstate->process_func = on_mp_write_session_write_request_read2;
381 TRACE_OUT(on_mp_write_session_write_request_read1);
386 on_mp_write_session_write_request_read2(struct query_state *qstate)
388 struct cache_mp_write_session_write_request *write_request;
391 TRACE_IN(on_mp_write_session_write_request_read2);
392 write_request = get_cache_mp_write_session_write_request(
395 result = qstate->read_func(qstate, write_request->data,
396 write_request->data_size);
398 if (result < 0 || (size_t)result != qstate->kevent_watermark) {
399 LOG_ERR_3("on_mp_write_session_write_request_read2",
401 TRACE_OUT(on_mp_write_session_write_request_read2);
405 qstate->kevent_watermark = 0;
406 qstate->process_func = on_mp_write_session_write_request_process;
407 TRACE_OUT(on_mp_write_session_write_request_read2);
412 on_mp_write_session_write_request_process(struct query_state *qstate)
414 struct cache_mp_write_session_write_request *write_request;
415 struct cache_mp_write_session_write_response *write_response;
417 TRACE_IN(on_mp_write_session_write_request_process);
418 init_comm_element(&qstate->response,
419 CET_MP_WRITE_SESSION_WRITE_RESPONSE);
420 write_response = get_cache_mp_write_session_write_response(
422 write_request = get_cache_mp_write_session_write_request(
425 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
426 write_response->error_code = cache_mp_write(
427 (cache_mp_write_session)qstate->mdata,
429 write_request->data_size);
430 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
432 qstate->kevent_watermark = sizeof(int);
433 qstate->process_func = on_mp_write_session_write_response_write1;
434 qstate->kevent_filter = EVFILT_WRITE;
436 TRACE_OUT(on_mp_write_session_write_request_process);
441 on_mp_write_session_write_response_write1(struct query_state *qstate)
443 struct cache_mp_write_session_write_response *write_response;
446 TRACE_IN(on_mp_write_session_write_response_write1);
447 write_response = get_cache_mp_write_session_write_response(
449 result = qstate->write_func(qstate, &write_response->error_code,
451 if (result != sizeof(int)) {
452 LOG_ERR_3("on_mp_write_session_write_response_write1",
454 TRACE_OUT(on_mp_write_session_write_response_write1);
458 if (write_response->error_code == 0) {
459 finalize_comm_element(&qstate->request);
460 finalize_comm_element(&qstate->response);
462 qstate->kevent_watermark = sizeof(int);
463 qstate->process_func = on_mp_write_session_mapper;
464 qstate->kevent_filter = EVFILT_READ;
466 qstate->kevent_watermark = 0;
467 qstate->process_func = 0;
470 TRACE_OUT(on_mp_write_session_write_response_write1);
475 * Handles abandon notifications. Destroys the session by calling the
476 * abandon_cache_mp_write_session.
479 on_mp_write_session_abandon_notification(struct query_state *qstate)
481 TRACE_IN(on_mp_write_session_abandon_notification);
482 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
483 abandon_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
484 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
485 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
487 qstate->kevent_watermark = 0;
488 qstate->process_func = NULL;
489 TRACE_OUT(on_mp_write_session_abandon_notification);
494 * Handles close notifications. Commits the session by calling
495 * the close_cache_mp_write_session.
498 on_mp_write_session_close_notification(struct query_state *qstate)
500 TRACE_IN(on_mp_write_session_close_notification);
501 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
502 close_cache_mp_write_session((cache_mp_write_session)qstate->mdata);
503 configuration_unlock_entry(qstate->config_entry, CELT_MULTIPART);
504 qstate->mdata = INVALID_CACHE_MP_WRITE_SESSION;
506 qstate->kevent_watermark = 0;
507 qstate->process_func = NULL;
508 TRACE_OUT(on_mp_write_session_close_notification);
512 cache_entry register_new_mp_cache_entry(struct query_state *qstate,
513 const char *dec_cache_entry_name)
518 TRACE_IN(register_new_mp_cache_entry);
519 c_entry = INVALID_CACHE_ENTRY;
520 configuration_lock_entry(qstate->config_entry, CELT_MULTIPART);
522 configuration_lock_wrlock(s_configuration);
523 en_bkp = qstate->config_entry->mp_cache_params.cep.entry_name;
524 qstate->config_entry->mp_cache_params.cep.entry_name =
525 (char *)dec_cache_entry_name;
526 register_cache_entry(s_cache, (struct cache_entry_params *)
527 &qstate->config_entry->mp_cache_params);
528 qstate->config_entry->mp_cache_params.cep.entry_name = en_bkp;
529 configuration_unlock(s_configuration);
531 configuration_lock_rdlock(s_configuration);
532 c_entry = find_cache_entry(s_cache,
533 dec_cache_entry_name);
534 configuration_unlock(s_configuration);
536 configuration_entry_add_mp_cache_entry(qstate->config_entry,
539 configuration_unlock_entry(qstate->config_entry,
542 TRACE_OUT(register_new_mp_cache_entry);