2 * Copyright (c) 2004-2006 Voltaire, Inc. All rights reserved.
3 * Copyright (c) 2002-2005 Mellanox Technologies LTD. All rights reserved.
4 * Copyright (c) 1996-2003 Intel Corporation. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
38 * Implementation of Dispatcher abstraction.
44 #endif /* HAVE_CONFIG_H */
47 #include <complib/cl_dispatcher.h>
48 #include <complib/cl_thread.h>
49 #include <complib/cl_timer.h>
51 /* give some guidance when we build our cl_pool of messages */
52 #define CL_DISP_INITIAL_MSG_COUNT 256
53 #define CL_DISP_MSG_GROW_SIZE 64
55 /* give some guidance when we build our cl_pool of registration elements */
56 #define CL_DISP_INITIAL_REG_COUNT 16
57 #define CL_DISP_REG_GROW_SIZE 16
59 /********************************************************************
63 This function takes messages off the FIFO and calls Processmsg()
64 This function executes as passive level.
67 p_disp - Pointer to Dispatcher object
74 ********************************************************************/
75 void __cl_disp_worker(IN void *context)
78 cl_dispatcher_t *p_disp = (cl_dispatcher_t *) context;
80 cl_spinlock_acquire(&p_disp->lock);
82 /* Process the FIFO until we drain it dry. */
83 while (cl_qlist_count(&p_disp->msg_fifo)) {
84 /* Pop the message at the head from the FIFO. */
86 (cl_disp_msg_t *) cl_qlist_remove_head(&p_disp->msg_fifo);
88 /* we track the tim ethe last message spent in the queue */
89 p_disp->last_msg_queue_time_us =
90 cl_get_time_stamp() - p_msg->in_time;
93 * Release the spinlock while the message is processed.
94 * The user's callback may reenter the dispatcher
95 * and cause the lock to be reaquired.
97 cl_spinlock_release(&p_disp->lock);
98 p_msg->p_dest_reg->pfn_rcv_callback((void *)p_msg->p_dest_reg->
100 (void *)p_msg->p_data);
102 cl_atomic_dec(&p_msg->p_dest_reg->ref_cnt);
104 /* The client has seen the data. Notify the sender as appropriate. */
105 if (p_msg->pfn_xmt_callback) {
106 p_msg->pfn_xmt_callback((void *)p_msg->context,
107 (void *)p_msg->p_data);
108 cl_atomic_dec(&p_msg->p_src_reg->ref_cnt);
111 /* Grab the lock for the next iteration through the list. */
112 cl_spinlock_acquire(&p_disp->lock);
114 /* Return this message to the pool. */
115 cl_qpool_put(&p_disp->msg_pool, (cl_pool_item_t *) p_msg);
118 cl_spinlock_release(&p_disp->lock);
121 /********************************************************************
122 ********************************************************************/
123 void cl_disp_construct(IN cl_dispatcher_t * const p_disp)
127 cl_qlist_init(&p_disp->reg_list);
128 cl_ptr_vector_construct(&p_disp->reg_vec);
129 cl_qlist_init(&p_disp->msg_fifo);
130 cl_spinlock_construct(&p_disp->lock);
131 cl_qpool_construct(&p_disp->msg_pool);
134 /********************************************************************
135 ********************************************************************/
136 void cl_disp_shutdown(IN cl_dispatcher_t * const p_disp)
140 /* Stop the thread pool. */
141 cl_thread_pool_destroy(&p_disp->worker_threads);
143 /* Process all outstanding callbacks. */
144 __cl_disp_worker(p_disp);
146 /* Free all registration info. */
147 while (!cl_is_qlist_empty(&p_disp->reg_list))
148 free(cl_qlist_remove_head(&p_disp->reg_list));
151 /********************************************************************
152 ********************************************************************/
153 void cl_disp_destroy(IN cl_dispatcher_t * const p_disp)
157 cl_spinlock_destroy(&p_disp->lock);
158 /* Destroy the message pool */
159 cl_qpool_destroy(&p_disp->msg_pool);
160 /* Destroy the pointer vector of registrants. */
161 cl_ptr_vector_destroy(&p_disp->reg_vec);
164 /********************************************************************
165 ********************************************************************/
167 cl_disp_init(IN cl_dispatcher_t * const p_disp,
168 IN const uint32_t thread_count, IN const char *const name)
174 cl_disp_construct(p_disp);
176 status = cl_spinlock_init(&p_disp->lock);
177 if (status != CL_SUCCESS) {
178 cl_disp_destroy(p_disp);
182 /* Specify no upper limit to the number of messages in the pool */
183 status = cl_qpool_init(&p_disp->msg_pool, CL_DISP_INITIAL_MSG_COUNT,
184 0, CL_DISP_MSG_GROW_SIZE, sizeof(cl_disp_msg_t),
186 if (status != CL_SUCCESS) {
187 cl_disp_destroy(p_disp);
191 status = cl_ptr_vector_init(&p_disp->reg_vec, CL_DISP_INITIAL_REG_COUNT,
192 CL_DISP_REG_GROW_SIZE);
193 if (status != CL_SUCCESS) {
194 cl_disp_destroy(p_disp);
198 status = cl_thread_pool_init(&p_disp->worker_threads, thread_count,
199 __cl_disp_worker, p_disp, name);
200 if (status != CL_SUCCESS)
201 cl_disp_destroy(p_disp);
206 /********************************************************************
207 ********************************************************************/
209 cl_disp_register(IN cl_dispatcher_t * const p_disp,
210 IN const cl_disp_msgid_t msg_id,
211 IN cl_pfn_msgrcv_cb_t pfn_callback OPTIONAL,
212 IN const void *const context OPTIONAL)
214 cl_disp_reg_info_t *p_reg;
219 /* Check that the requested registrant ID is available. */
220 cl_spinlock_acquire(&p_disp->lock);
221 if ((msg_id != CL_DISP_MSGID_NONE) &&
222 (msg_id < cl_ptr_vector_get_size(&p_disp->reg_vec)) &&
223 (cl_ptr_vector_get(&p_disp->reg_vec, msg_id))) {
224 cl_spinlock_release(&p_disp->lock);
228 /* Get a registration info from the pool. */
229 p_reg = (cl_disp_reg_info_t *) malloc(sizeof(cl_disp_reg_info_t));
231 cl_spinlock_release(&p_disp->lock);
234 memset(p_reg, 0, sizeof(cl_disp_reg_info_t));
237 p_reg->p_disp = p_disp;
239 p_reg->pfn_rcv_callback = pfn_callback;
240 p_reg->context = context;
241 p_reg->msg_id = msg_id;
243 /* Insert the registration in the list. */
244 cl_qlist_insert_tail(&p_disp->reg_list, (cl_list_item_t *) p_reg);
246 /* Set the array entry to the registrant. */
247 /* The ptr_vector grow automatically as necessary. */
248 if (msg_id != CL_DISP_MSGID_NONE) {
249 status = cl_ptr_vector_set(&p_disp->reg_vec, msg_id, p_reg);
250 if (status != CL_SUCCESS) {
252 cl_spinlock_release(&p_disp->lock);
257 cl_spinlock_release(&p_disp->lock);
262 /********************************************************************
263 ********************************************************************/
264 void cl_disp_unregister(IN const cl_disp_reg_handle_t handle)
266 cl_disp_reg_info_t *p_reg;
267 cl_dispatcher_t *p_disp;
269 if (handle == CL_DISP_INVALID_HANDLE)
272 p_reg = (cl_disp_reg_info_t *) handle;
273 p_disp = p_reg->p_disp;
276 cl_spinlock_acquire(&p_disp->lock);
278 * Clear the registrant vector entry. This will cause any further
279 * post calls to fail.
281 if (p_reg->msg_id != CL_DISP_MSGID_NONE) {
282 CL_ASSERT(p_reg->msg_id <
283 cl_ptr_vector_get_size(&p_disp->reg_vec));
284 cl_ptr_vector_set(&p_disp->reg_vec, p_reg->msg_id, NULL);
286 cl_spinlock_release(&p_disp->lock);
288 while (p_reg->ref_cnt > 0)
289 cl_thread_suspend(1);
291 cl_spinlock_acquire(&p_disp->lock);
292 /* Remove the registrant from the list. */
293 cl_qlist_remove_item(&p_disp->reg_list, (cl_list_item_t *) p_reg);
294 /* Return the registration info to the pool */
297 cl_spinlock_release(&p_disp->lock);
300 /********************************************************************
301 ********************************************************************/
303 cl_disp_post(IN const cl_disp_reg_handle_t handle,
304 IN const cl_disp_msgid_t msg_id,
305 IN const void *const p_data,
306 IN cl_pfn_msgdone_cb_t pfn_callback OPTIONAL,
307 IN const void *const context OPTIONAL)
309 cl_disp_reg_info_t *p_src_reg = (cl_disp_reg_info_t *) handle;
310 cl_disp_reg_info_t *p_dest_reg;
311 cl_dispatcher_t *p_disp;
312 cl_disp_msg_t *p_msg;
314 p_disp = handle->p_disp;
316 CL_ASSERT(msg_id != CL_DISP_MSGID_NONE);
318 cl_spinlock_acquire(&p_disp->lock);
319 /* Check that the recipient exists. */
320 p_dest_reg = cl_ptr_vector_get(&p_disp->reg_vec, msg_id);
322 cl_spinlock_release(&p_disp->lock);
323 return (CL_NOT_FOUND);
326 /* Get a free message from the pool. */
327 p_msg = (cl_disp_msg_t *) cl_qpool_get(&p_disp->msg_pool);
329 cl_spinlock_release(&p_disp->lock);
330 return (CL_INSUFFICIENT_MEMORY);
333 /* Initialize the message */
334 p_msg->p_src_reg = p_src_reg;
335 p_msg->p_dest_reg = p_dest_reg;
336 p_msg->p_data = p_data;
337 p_msg->pfn_xmt_callback = pfn_callback;
338 p_msg->context = context;
339 p_msg->in_time = cl_get_time_stamp();
342 * Increment the sender's reference count if they request a completion
346 cl_atomic_inc(&p_src_reg->ref_cnt);
348 /* Increment the recipient's reference count. */
349 cl_atomic_inc(&p_dest_reg->ref_cnt);
351 /* Queue the message in the FIFO. */
352 cl_qlist_insert_tail(&p_disp->msg_fifo, (cl_list_item_t *) p_msg);
353 cl_spinlock_release(&p_disp->lock);
355 /* Signal the thread pool that there is work to be done. */
356 cl_thread_pool_signal(&p_disp->worker_threads);
361 cl_disp_get_queue_status(IN const cl_disp_reg_handle_t handle,
362 OUT uint32_t * p_num_queued_msgs,
363 OUT uint64_t * p_last_msg_queue_time_ms)
365 cl_dispatcher_t *p_disp = ((cl_disp_reg_info_t *) handle)->p_disp;
367 cl_spinlock_acquire(&p_disp->lock);
369 if (p_last_msg_queue_time_ms)
370 *p_last_msg_queue_time_ms =
371 p_disp->last_msg_queue_time_us / 1000;
373 if (p_num_queued_msgs)
374 *p_num_queued_msgs = cl_qlist_count(&p_disp->msg_fifo);
376 cl_spinlock_release(&p_disp->lock);