]> CyberLeo.Net >> Repos - FreeBSD/releng/9.2.git/blob - contrib/ofed/management/opensm/complib/cl_dispatcher.c
- Copy stable/9 to releng/9.2 as part of the 9.2-RELEASE cycle.
[FreeBSD/releng/9.2.git] / contrib / ofed / management / opensm / complib / cl_dispatcher.c
1 /*
2  * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3  * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4  * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  *
34  */
35
36 /*
37  * Abstract:
38  *    Implementation of Dispatcher abstraction.
39  *
40  */
41
42 #if HAVE_CONFIG_H
43 #  include <config.h>
44 #endif                          /* HAVE_CONFIG_H */
45
46 #include <stdlib.h>
47 #include <complib/cl_dispatcher.h>
48 #include <complib/cl_thread.h>
49 #include <complib/cl_timer.h>
50
51 /* give some guidance when we build our cl_pool of messages */
52 #define CL_DISP_INITIAL_MSG_COUNT   256
53 #define CL_DISP_MSG_GROW_SIZE       64
54
55 /* give some guidance when we build our cl_pool of registration elements */
56 #define CL_DISP_INITIAL_REG_COUNT   16
57 #define CL_DISP_REG_GROW_SIZE       16
58
59 /********************************************************************
60    __cl_disp_worker
61
62    Description:
63    This function takes messages off the FIFO and calls Processmsg()
64    This function executes as passive level.
65
66    Inputs:
67    p_disp - Pointer to Dispatcher object
68
69    Outputs:
70    None
71
72    Returns:
73    None
74 ********************************************************************/
75 void __cl_disp_worker(IN void *context)
76 {
77         cl_disp_msg_t *p_msg;
78         cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
79
80         cl_spinlock_acquire(&p_disp->lock);
81
82         /* Process the FIFO until we drain it dry. */
83         while (cl_qlist_count(&p_disp->msg_fifo)) {
84                 /* Pop the message at the head from the FIFO. */
85                 p_msg =
86                     (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
87
88                 /* we track the tim ethe last message spent in the queue */
89                 p_disp->last_msg_queue_time_us =
90                     cl_get_time_stamp() - p_msg->in_time;
91
92                 /*
93                  * Release the spinlock while the message is processed.
94                  * The user's callback may reenter the dispatcher
95                  * and cause the lock to be reaquired.
96                  */
97                 cl_spinlock_release(&p_disp->lock);
98                 p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
99                                                     context,
100                                                     (void *)p_msg->p_data);
101
102                 cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
103
104                 /* The client has seen the data.  Notify the sender as appropriate. */
105                 if (p_msg->pfn_xmt_callback) {
106                         p_msg->pfn_xmt_callback((void *)p_msg->context,
107                                                 (void *)p_msg->p_data);
108                         cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
109                 }
110
111                 /* Grab the lock for the next iteration through the list. */
112                 cl_spinlock_acquire(&p_disp->lock);
113
114                 /* Return this message to the pool. */
115                 cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
116         }
117
118         cl_spinlock_release(&p_disp->lock);
119 }
120
121 /********************************************************************
122  ********************************************************************/
123 void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
124 {
125         CL_ASSERT(p_disp);
126
127         cl_qlist_init(&p_disp->reg_list);
128         cl_ptr_vector_construct(&p_disp->reg_vec);
129         cl_qlist_init(&p_disp->msg_fifo);
130         cl_spinlock_construct(&p_disp->lock);
131         cl_qpool_construct(&p_disp->msg_pool);
132 }
133
134 /********************************************************************
135  ********************************************************************/
136 void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
137 {
138         CL_ASSERT(p_disp);
139
140         /* Stop the thread pool. */
141         cl_thread_pool_destroy(&p_disp->worker_threads);
142
143         /* Process all outstanding callbacks. */
144         __cl_disp_worker(p_disp);
145
146         /* Free all registration info. */
147         while (!cl_is_qlist_empty(&p_disp->reg_list))
148                 free(cl_qlist_remove_head(&p_disp->reg_list));
149 }
150
151 /********************************************************************
152  ********************************************************************/
153 void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
154 {
155         CL_ASSERT(p_disp);
156
157         cl_spinlock_destroy(&p_disp->lock);
158         /* Destroy the message pool */
159         cl_qpool_destroy(&p_disp->msg_pool);
160         /* Destroy the pointer vector of registrants. */
161         cl_ptr_vector_destroy(&p_disp->reg_vec);
162 }
163
164 /********************************************************************
165  ********************************************************************/
166 cl_status_t
167 cl_disp_init(IN cl_dispatcher_t * const p_disp,
168              IN const uint32_t thread_count, IN const char *const name)
169 {
170         cl_status_t status;
171
172         CL_ASSERT(p_disp);
173
174         cl_disp_construct(p_disp);
175
176         status = cl_spinlock_init(&p_disp->lock);
177         if (status != CL_SUCCESS) {
178                 cl_disp_destroy(p_disp);
179                 return (status);
180         }
181
182         /* Specify no upper limit to the number of messages in the pool */
183         status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
184                                0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
185                                NULL, NULL, NULL);
186         if (status != CL_SUCCESS) {
187                 cl_disp_destroy(p_disp);
188                 return (status);
189         }
190
191         status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
192                                     CL_DISP_REG_GROW_SIZE);
193         if (status != CL_SUCCESS) {
194                 cl_disp_destroy(p_disp);
195                 return (status);
196         }
197
198         status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
199                                      __cl_disp_worker, p_disp, name);
200         if (status != CL_SUCCESS)
201                 cl_disp_destroy(p_disp);
202
203         return (status);
204 }
205
206 /********************************************************************
207  ********************************************************************/
208 cl_disp_reg_handle_t
209 cl_disp_register(IN cl_dispatcher_t * const p_disp,
210                  IN const cl_disp_msgid_t msg_id,
211                  IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL,
212                  IN const void *const context OPTIONAL)
213 {
214         cl_disp_reg_info_t *p_reg;
215         cl_status_t status;
216
217         CL_ASSERT(p_disp);
218
219         /* Check that the requested registrant ID is available. */
220         cl_spinlock_acquire(&p_disp->lock);
221         if ((msg_id != CL_DISP_MSGID_NONE) &&
222             (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
223             (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
224                 cl_spinlock_release(&p_disp->lock);
225                 return (NULL);
226         }
227
228         /* Get a registration info from the pool. */
229         p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
230         if (!p_reg) {
231                 cl_spinlock_release(&p_disp->lock);
232                 return (NULL);
233         } else {
234                 memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
235         }
236
237         p_reg->p_disp = p_disp;
238         p_reg->ref_cnt = 0;
239         p_reg->pfn_rcv_callback = pfn_callback;
240         p_reg->context = context;
241         p_reg->msg_id = msg_id;
242
243         /* Insert the registration in the list. */
244         cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
245
246         /* Set the array entry to the registrant. */
247         /* The ptr_vector grow automatically as necessary. */
248         if (msg_id != CL_DISP_MSGID_NONE) {
249                 status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
250                 if (status != CL_SUCCESS) {
251                         free(p_reg);
252                         cl_spinlock_release(&p_disp->lock);
253                         return (NULL);
254                 }
255         }
256
257         cl_spinlock_release(&p_disp->lock);
258
259         return (p_reg);
260 }
261
262 /********************************************************************
263  ********************************************************************/
264 void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
265 {
266         cl_disp_reg_info_t *p_reg;
267         cl_dispatcher_t *p_disp;
268
269         if (handle == CL_DISP_INVALID_HANDLE)
270                 return;
271
272         p_reg = (cl_disp_reg_info_t *) handle;
273         p_disp = p_reg->p_disp;
274         CL_ASSERT(p_disp);
275
276         cl_spinlock_acquire(&p_disp->lock);
277         /*
278          * Clear the registrant vector entry.  This will cause any further
279          * post calls to fail.
280          */
281         if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
282                 CL_ASSERT(p_reg->msg_id <
283                           cl_ptr_vector_get_size(&p_disp->reg_vec));
284                 cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
285         }
286         cl_spinlock_release(&p_disp->lock);
287
288         while (p_reg->ref_cnt > 0)
289                 cl_thread_suspend(1);
290
291         cl_spinlock_acquire(&p_disp->lock);
292         /* Remove the registrant from the list. */
293         cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
294         /* Return the registration info to the pool */
295         free(p_reg);
296
297         cl_spinlock_release(&p_disp->lock);
298 }
299
300 /********************************************************************
301  ********************************************************************/
302 cl_status_t
303 cl_disp_post(IN const cl_disp_reg_handle_t handle,
304              IN const cl_disp_msgid_t msg_id,
305              IN const void *const p_data,
306              IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
307              IN const void *const context OPTIONAL)
308 {
309         cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
310         cl_disp_reg_info_t *p_dest_reg;
311         cl_dispatcher_t *p_disp;
312         cl_disp_msg_t *p_msg;
313
314         p_disp = handle->p_disp;
315         CL_ASSERT(p_disp);
316         CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
317
318         cl_spinlock_acquire(&p_disp->lock);
319         /* Check that the recipient exists. */
320         p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
321         if (!p_dest_reg) {
322                 cl_spinlock_release(&p_disp->lock);
323                 return (CL_NOT_FOUND);
324         }
325
326         /* Get a free message from the pool. */
327         p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
328         if (!p_msg) {
329                 cl_spinlock_release(&p_disp->lock);
330                 return (CL_INSUFFICIENT_MEMORY);
331         }
332
333         /* Initialize the message */
334         p_msg->p_src_reg = p_src_reg;
335         p_msg->p_dest_reg = p_dest_reg;
336         p_msg->p_data = p_data;
337         p_msg->pfn_xmt_callback = pfn_callback;
338         p_msg->context = context;
339         p_msg->in_time = cl_get_time_stamp();
340
341         /*
342          * Increment the sender's reference count if they request a completion
343          * notification.
344          */
345         if (pfn_callback)
346                 cl_atomic_inc(&p_src_reg->ref_cnt);
347
348         /* Increment the recipient's reference count. */
349         cl_atomic_inc(&p_dest_reg->ref_cnt);
350
351         /* Queue the message in the FIFO. */
352         cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
353         cl_spinlock_release(&p_disp->lock);
354
355         /* Signal the thread pool that there is work to be done. */
356         cl_thread_pool_signal(&p_disp->worker_threads);
357         return (CL_SUCCESS);
358 }
359
360 void
361 cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
362                          OUT uint32_t * p_num_queued_msgs,
363                          OUT uint64_t * p_last_msg_queue_time_ms)
364 {
365         cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
366
367         cl_spinlock_acquire(&p_disp->lock);
368
369         if (p_last_msg_queue_time_ms)
370                 *p_last_msg_queue_time_ms =
371                     p_disp->last_msg_queue_time_us / 1000;
372
373         if (p_num_queued_msgs)
374                 *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
375
376         cl_spinlock_release(&p_disp->lock);
377 }