]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - sys/netgraph/netflow/netflow.c
openssh: cherry-pick OpenSSL 1.1.1 compatibility
[FreeBSD/FreeBSD.git] / sys / netgraph / netflow / netflow.c
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3  *
4  * Copyright (c) 2010-2011 Alexander V. Chernikov <melifaro@ipfw.ru>
5  * Copyright (c) 2004-2005 Gleb Smirnoff <glebius@FreeBSD.org>
6  * Copyright (c) 2001-2003 Roman V. Palagin <romanp@unshadow.net>
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  *
30  * $SourceForge: netflow.c,v 1.41 2004/09/05 11:41:10 glebius Exp $
31  */
32
33 #include <sys/cdefs.h>
34 __FBSDID("$FreeBSD$");
35
36 #include "opt_inet6.h"
37 #include "opt_route.h"
38 #include <sys/param.h>
39 #include <sys/bitstring.h>
40 #include <sys/systm.h>
41 #include <sys/counter.h>
42 #include <sys/kernel.h>
43 #include <sys/ktr.h>
44 #include <sys/limits.h>
45 #include <sys/mbuf.h>
46 #include <sys/syslog.h>
47 #include <sys/socket.h>
48 #include <vm/uma.h>
49
50 #include <net/if.h>
51 #include <net/if_dl.h>
52 #include <net/if_var.h>
53 #include <net/route.h>
54 #include <net/ethernet.h>
55 #include <netinet/in.h>
56 #include <netinet/in_systm.h>
57 #include <netinet/ip.h>
58 #include <netinet/ip6.h>
59 #include <netinet/tcp.h>
60 #include <netinet/udp.h>
61
62 #include <netgraph/ng_message.h>
63 #include <netgraph/netgraph.h>
64
65 #include <netgraph/netflow/netflow.h>
66 #include <netgraph/netflow/netflow_v9.h>
67 #include <netgraph/netflow/ng_netflow.h>
68
69 #define NBUCKETS        (65536)         /* must be power of 2 */
70
71 /* This hash is for TCP or UDP packets. */
72 #define FULL_HASH(addr1, addr2, port1, port2)   \
73         (((addr1 ^ (addr1 >> 16) ^              \
74         htons(addr2 ^ (addr2 >> 16))) ^         \
75         port1 ^ htons(port2)) &                 \
76         (NBUCKETS - 1))
77
78 /* This hash is for all other IP packets. */
79 #define ADDR_HASH(addr1, addr2)                 \
80         ((addr1 ^ (addr1 >> 16) ^               \
81         htons(addr2 ^ (addr2 >> 16))) &         \
82         (NBUCKETS - 1))
83
84 /* Macros to shorten logical constructions */
85 /* XXX: priv must exist in namespace */
86 #define INACTIVE(fle)   (time_uptime - fle->f.last > priv->nfinfo_inact_t)
87 #define AGED(fle)       (time_uptime - fle->f.first > priv->nfinfo_act_t)
88 #define ISFREE(fle)     (fle->f.packets == 0)
89
90 /*
91  * 4 is a magical number: statistically number of 4-packet flows is
92  * bigger than 5,6,7...-packet flows by an order of magnitude. Most UDP/ICMP
93  * scans are 1 packet (~ 90% of flow cache). TCP scans are 2-packet in case
94  * of reachable host and 4-packet otherwise.
95  */
96 #define SMALL(fle)      (fle->f.packets <= 4)
97
98 MALLOC_DEFINE(M_NETFLOW_HASH, "netflow_hash", "NetFlow hash");
99
100 static int export_add(item_p, struct flow_entry *);
101 static int export_send(priv_p, fib_export_p, item_p, int);
102
103 static int hash_insert(priv_p, struct flow_hash_entry *, struct flow_rec *,
104     int, uint8_t, uint8_t);
105 #ifdef INET6
106 static int hash6_insert(priv_p, struct flow_hash_entry *, struct flow6_rec *,
107     int, uint8_t, uint8_t);
108 #endif
109
110 static void expire_flow(priv_p, fib_export_p, struct flow_entry *, int);
111
112 /*
113  * Generate hash for a given flow record.
114  *
115  * FIB is not used here, because:
116  * most VRFS will carry public IPv4 addresses which are unique even
117  * without FIB private addresses can overlap, but this is worked out
118  * via flow_rec bcmp() containing fib id. In IPv6 world addresses are
119  * all globally unique (it's not fully true, there is FC00::/7 for example,
120  * but chances of address overlap are MUCH smaller)
121  */
122 static inline uint32_t
123 ip_hash(struct flow_rec *r)
124 {
125
126         switch (r->r_ip_p) {
127         case IPPROTO_TCP:
128         case IPPROTO_UDP:
129                 return FULL_HASH(r->r_src.s_addr, r->r_dst.s_addr,
130                     r->r_sport, r->r_dport);
131         default:
132                 return ADDR_HASH(r->r_src.s_addr, r->r_dst.s_addr);
133         }
134 }
135
136 #ifdef INET6
137 /* Generate hash for a given flow6 record. Use lower 4 octets from v6 addresses */
138 static inline uint32_t
139 ip6_hash(struct flow6_rec *r)
140 {
141
142         switch (r->r_ip_p) {
143         case IPPROTO_TCP:
144         case IPPROTO_UDP:
145                 return FULL_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
146                     r->dst.r_dst6.__u6_addr.__u6_addr32[3], r->r_sport,
147                     r->r_dport);
148         default:
149                 return ADDR_HASH(r->src.r_src6.__u6_addr.__u6_addr32[3],
150                     r->dst.r_dst6.__u6_addr.__u6_addr32[3]);
151         }
152 }
153
154 static inline int
155 ip6_masklen(struct in6_addr *saddr, struct rt_addrinfo *info)
156 {
157         const int nbits = sizeof(*saddr) * NBBY;
158         int mlen;
159
160         if (info->rti_addrs & RTA_NETMASK)
161                 bit_count((bitstr_t *)saddr, 0, nbits, &mlen);
162         else
163                 mlen = nbits;
164         return (mlen);
165 }
166 #endif
167
168 /*
169  * Detach export datagram from priv, if there is any.
170  * If there is no, allocate a new one.
171  */
172 static item_p
173 get_export_dgram(priv_p priv, fib_export_p fe)
174 {
175         item_p  item = NULL;
176
177         mtx_lock(&fe->export_mtx);
178         if (fe->exp.item != NULL) {
179                 item = fe->exp.item;
180                 fe->exp.item = NULL;
181         }
182         mtx_unlock(&fe->export_mtx);
183
184         if (item == NULL) {
185                 struct netflow_v5_export_dgram *dgram;
186                 struct mbuf *m;
187
188                 m = m_getcl(M_NOWAIT, MT_DATA, M_PKTHDR);
189                 if (m == NULL)
190                         return (NULL);
191                 item = ng_package_data(m, NG_NOFLAGS);
192                 if (item == NULL)
193                         return (NULL);
194                 dgram = mtod(m, struct netflow_v5_export_dgram *);
195                 dgram->header.count = 0;
196                 dgram->header.version = htons(NETFLOW_V5);
197                 dgram->header.pad = 0;
198         }
199
200         return (item);
201 }
202
203 /*
204  * Re-attach incomplete datagram back to priv.
205  * If there is already another one, then send incomplete. */
206 static void
207 return_export_dgram(priv_p priv, fib_export_p fe, item_p item, int flags)
208 {
209
210         /*
211          * It may happen on SMP, that some thread has already
212          * put its item there, in this case we bail out and
213          * send what we have to collector.
214          */
215         mtx_lock(&fe->export_mtx);
216         if (fe->exp.item == NULL) {
217                 fe->exp.item = item;
218                 mtx_unlock(&fe->export_mtx);
219         } else {
220                 mtx_unlock(&fe->export_mtx);
221                 export_send(priv, fe, item, flags);
222         }
223 }
224
225 /*
226  * The flow is over. Call export_add() and free it. If datagram is
227  * full, then call export_send().
228  */
229 static void
230 expire_flow(priv_p priv, fib_export_p fe, struct flow_entry *fle, int flags)
231 {
232         struct netflow_export_item exp;
233         uint16_t version = fle->f.version;
234
235         if ((priv->export != NULL) && (version == IPVERSION)) {
236                 exp.item = get_export_dgram(priv, fe);
237                 if (exp.item == NULL) {
238                         priv->nfinfo_export_failed++;
239                         if (priv->export9 != NULL)
240                                 priv->nfinfo_export9_failed++;
241                         /* fle definitely contains IPv4 flow. */
242                         uma_zfree_arg(priv->zone, fle, priv);
243                         return;
244                 }
245
246                 if (export_add(exp.item, fle) > 0)
247                         export_send(priv, fe, exp.item, flags);
248                 else
249                         return_export_dgram(priv, fe, exp.item, NG_QUEUE);
250         }
251
252         if (priv->export9 != NULL) {
253                 exp.item9 = get_export9_dgram(priv, fe, &exp.item9_opt);
254                 if (exp.item9 == NULL) {
255                         priv->nfinfo_export9_failed++;
256                         if (version == IPVERSION)
257                                 uma_zfree_arg(priv->zone, fle, priv);
258 #ifdef INET6
259                         else if (version == IP6VERSION)
260                                 uma_zfree_arg(priv->zone6, fle, priv);
261 #endif
262                         else
263                                 panic("ng_netflow: Unknown IP proto: %d",
264                                     version);
265                         return;
266                 }
267
268                 if (export9_add(exp.item9, exp.item9_opt, fle) > 0)
269                         export9_send(priv, fe, exp.item9, exp.item9_opt, flags);
270                 else
271                         return_export9_dgram(priv, fe, exp.item9,
272                             exp.item9_opt, NG_QUEUE);
273         }
274
275         if (version == IPVERSION)
276                 uma_zfree_arg(priv->zone, fle, priv);
277 #ifdef INET6
278         else if (version == IP6VERSION)
279                 uma_zfree_arg(priv->zone6, fle, priv);
280 #endif
281 }
282
283 /* Get a snapshot of node statistics */
284 void
285 ng_netflow_copyinfo(priv_p priv, struct ng_netflow_info *i)
286 {
287
288         i->nfinfo_bytes = counter_u64_fetch(priv->nfinfo_bytes);
289         i->nfinfo_packets = counter_u64_fetch(priv->nfinfo_packets);
290         i->nfinfo_bytes6 = counter_u64_fetch(priv->nfinfo_bytes6);
291         i->nfinfo_packets6 = counter_u64_fetch(priv->nfinfo_packets6);
292         i->nfinfo_sbytes = counter_u64_fetch(priv->nfinfo_sbytes);
293         i->nfinfo_spackets = counter_u64_fetch(priv->nfinfo_spackets);
294         i->nfinfo_sbytes6 = counter_u64_fetch(priv->nfinfo_sbytes6);
295         i->nfinfo_spackets6 = counter_u64_fetch(priv->nfinfo_spackets6);
296         i->nfinfo_act_exp = counter_u64_fetch(priv->nfinfo_act_exp);
297         i->nfinfo_inact_exp = counter_u64_fetch(priv->nfinfo_inact_exp);
298
299         i->nfinfo_used = uma_zone_get_cur(priv->zone);
300 #ifdef INET6
301         i->nfinfo_used6 = uma_zone_get_cur(priv->zone6);
302 #endif
303
304         i->nfinfo_alloc_failed = priv->nfinfo_alloc_failed;
305         i->nfinfo_export_failed = priv->nfinfo_export_failed;
306         i->nfinfo_export9_failed = priv->nfinfo_export9_failed;
307         i->nfinfo_realloc_mbuf = priv->nfinfo_realloc_mbuf;
308         i->nfinfo_alloc_fibs = priv->nfinfo_alloc_fibs;
309         i->nfinfo_inact_t = priv->nfinfo_inact_t;
310         i->nfinfo_act_t = priv->nfinfo_act_t;
311 }
312
313 /*
314  * Insert a record into defined slot.
315  *
316  * First we get for us a free flow entry, then fill in all
317  * possible fields in it.
318  *
319  * TODO: consider dropping hash mutex while filling in datagram,
320  * as this was done in previous version. Need to test & profile
321  * to be sure.
322  */
323 static int
324 hash_insert(priv_p priv, struct flow_hash_entry *hsh, struct flow_rec *r,
325         int plen, uint8_t flags, uint8_t tcp_flags)
326 {
327         struct flow_entry *fle;
328         struct sockaddr_in sin, sin_mask;
329         struct sockaddr_dl rt_gateway;
330         struct rt_addrinfo info;
331
332         mtx_assert(&hsh->mtx, MA_OWNED);
333
334         fle = uma_zalloc_arg(priv->zone, priv, M_NOWAIT);
335         if (fle == NULL) {
336                 priv->nfinfo_alloc_failed++;
337                 return (ENOMEM);
338         }
339
340         /*
341          * Now fle is totally ours. It is detached from all lists,
342          * we can safely edit it.
343          */
344         fle->f.version = IPVERSION;
345         bcopy(r, &fle->f.r, sizeof(struct flow_rec));
346         fle->f.bytes = plen;
347         fle->f.packets = 1;
348         fle->f.tcp_flags = tcp_flags;
349
350         fle->f.first = fle->f.last = time_uptime;
351
352         /*
353          * First we do route table lookup on destination address. So we can
354          * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
355          */
356         if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
357                 bzero(&sin, sizeof(sin));
358                 sin.sin_len = sizeof(struct sockaddr_in);
359                 sin.sin_family = AF_INET;
360                 sin.sin_addr = fle->f.r.r_dst;
361
362                 rt_gateway.sdl_len = sizeof(rt_gateway);
363                 sin_mask.sin_len = sizeof(struct sockaddr_in);
364                 bzero(&info, sizeof(info));
365
366                 info.rti_info[RTAX_GATEWAY] = (struct sockaddr *)&rt_gateway;
367                 info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin_mask;
368
369                 if (rib_lookup_info(r->fib, (struct sockaddr *)&sin, NHR_REF, 0,
370                     &info) == 0) {
371                         fle->f.fle_o_ifx = info.rti_ifp->if_index;
372
373                         if (info.rti_flags & RTF_GATEWAY &&
374                             rt_gateway.sdl_family == AF_INET)
375                                 fle->f.next_hop =
376                                     ((struct sockaddr_in *)&rt_gateway)->sin_addr;
377
378                         if (info.rti_addrs & RTA_NETMASK)
379                                 fle->f.dst_mask = bitcount32(sin_mask.sin_addr.s_addr);
380                         else if (info.rti_flags & RTF_HOST)
381                                 /* Give up. We can't determine mask :( */
382                                 fle->f.dst_mask = 32;
383
384                         rib_free_info(&info);
385                 }
386         }
387
388         /* Do route lookup on source address, to fill in src_mask. */
389         if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
390                 bzero(&sin, sizeof(sin));
391                 sin.sin_len = sizeof(struct sockaddr_in);
392                 sin.sin_family = AF_INET;
393                 sin.sin_addr = fle->f.r.r_src;
394
395                 sin_mask.sin_len = sizeof(struct sockaddr_in);
396                 bzero(&info, sizeof(info));
397
398                 info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin_mask;
399
400                 if (rib_lookup_info(r->fib, (struct sockaddr *)&sin, 0, 0,
401                     &info) == 0) {
402                         if (info.rti_addrs & RTA_NETMASK)
403                                 fle->f.src_mask =
404                                     bitcount32(sin_mask.sin_addr.s_addr);
405                         else if (info.rti_flags & RTF_HOST)
406                                 /* Give up. We can't determine mask :( */
407                                 fle->f.src_mask = 32;
408                 }
409         }
410
411         /* Push new flow at the and of hash. */
412         TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
413
414         return (0);
415 }
416
417 #ifdef INET6
418 static int
419 hash6_insert(priv_p priv, struct flow_hash_entry *hsh6, struct flow6_rec *r,
420         int plen, uint8_t flags, uint8_t tcp_flags)
421 {
422         struct flow6_entry *fle6;
423         struct sockaddr_in6 sin6, sin6_mask;
424         struct sockaddr_dl rt_gateway;
425         struct rt_addrinfo info;
426
427         mtx_assert(&hsh6->mtx, MA_OWNED);
428
429         fle6 = uma_zalloc_arg(priv->zone6, priv, M_NOWAIT);
430         if (fle6 == NULL) {
431                 priv->nfinfo_alloc_failed++;
432                 return (ENOMEM);
433         }
434
435         /*
436          * Now fle is totally ours. It is detached from all lists,
437          * we can safely edit it.
438          */
439
440         fle6->f.version = IP6VERSION;
441         bcopy(r, &fle6->f.r, sizeof(struct flow6_rec));
442         fle6->f.bytes = plen;
443         fle6->f.packets = 1;
444         fle6->f.tcp_flags = tcp_flags;
445
446         fle6->f.first = fle6->f.last = time_uptime;
447
448         /*
449          * First we do route table lookup on destination address. So we can
450          * fill in out_ifx, dst_mask, nexthop, and dst_as in future releases.
451          */
452         if ((flags & NG_NETFLOW_CONF_NODSTLOOKUP) == 0) {
453                 bzero(&sin6, sizeof(struct sockaddr_in6));
454                 sin6.sin6_len = sizeof(struct sockaddr_in6);
455                 sin6.sin6_family = AF_INET6;
456                 sin6.sin6_addr = r->dst.r_dst6;
457
458                 rt_gateway.sdl_len = sizeof(rt_gateway);
459                 sin6_mask.sin6_len = sizeof(struct sockaddr_in6);
460                 bzero(&info, sizeof(info));
461
462                 info.rti_info[RTAX_GATEWAY] = (struct sockaddr *)&rt_gateway;
463                 info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin6_mask;
464
465                 if (rib_lookup_info(r->fib, (struct sockaddr *)&sin6, NHR_REF,
466                     0, &info) == 0) {
467                         fle6->f.fle_o_ifx = info.rti_ifp->if_index;
468
469                         if (info.rti_flags & RTF_GATEWAY &&
470                             rt_gateway.sdl_family == AF_INET6)
471                                 fle6->f.n.next_hop6 =
472                                     ((struct sockaddr_in6 *)&rt_gateway)->sin6_addr;
473
474                         fle6->f.dst_mask =
475                             ip6_masklen(&sin6_mask.sin6_addr, &info);
476
477                         rib_free_info(&info);
478                 }
479         }
480
481         if ((flags & NG_NETFLOW_CONF_NOSRCLOOKUP) == 0) {
482                 /* Do route lookup on source address, to fill in src_mask. */
483                 bzero(&sin6, sizeof(struct sockaddr_in6));
484                 sin6.sin6_len = sizeof(struct sockaddr_in6);
485                 sin6.sin6_family = AF_INET6;
486                 sin6.sin6_addr = r->src.r_src6;
487
488                 sin6_mask.sin6_len = sizeof(struct sockaddr_in6);
489                 bzero(&info, sizeof(info));
490
491                 info.rti_info[RTAX_NETMASK] = (struct sockaddr *)&sin6_mask;
492
493                 if (rib_lookup_info(r->fib, (struct sockaddr *)&sin6, 0, 0,
494                     &info) == 0)
495                         fle6->f.src_mask =
496                             ip6_masklen(&sin6_mask.sin6_addr, &info);
497         }
498
499         /* Push new flow at the and of hash. */
500         TAILQ_INSERT_TAIL(&hsh6->head, (struct flow_entry *)fle6, fle_hash);
501
502         return (0);
503 }
504 #endif
505
506
507 /*
508  * Non-static functions called from ng_netflow.c
509  */
510
511 /* Allocate memory and set up flow cache */
512 void
513 ng_netflow_cache_init(priv_p priv)
514 {
515         struct flow_hash_entry *hsh;
516         int i;
517
518         /* Initialize cache UMA zone. */
519         priv->zone = uma_zcreate("NetFlow IPv4 cache",
520             sizeof(struct flow_entry), NULL, NULL, NULL, NULL,
521             UMA_ALIGN_CACHE, 0);
522         uma_zone_set_max(priv->zone, CACHESIZE);
523 #ifdef INET6    
524         priv->zone6 = uma_zcreate("NetFlow IPv6 cache",
525             sizeof(struct flow6_entry), NULL, NULL, NULL, NULL,
526             UMA_ALIGN_CACHE, 0);
527         uma_zone_set_max(priv->zone6, CACHESIZE);
528 #endif  
529
530         /* Allocate hash. */
531         priv->hash = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
532             M_NETFLOW_HASH, M_WAITOK | M_ZERO);
533
534         /* Initialize hash. */
535         for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++) {
536                 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
537                 TAILQ_INIT(&hsh->head);
538         }
539
540 #ifdef INET6
541         /* Allocate hash. */
542         priv->hash6 = malloc(NBUCKETS * sizeof(struct flow_hash_entry),
543             M_NETFLOW_HASH, M_WAITOK | M_ZERO);
544
545         /* Initialize hash. */
546         for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++) {
547                 mtx_init(&hsh->mtx, "hash mutex", NULL, MTX_DEF);
548                 TAILQ_INIT(&hsh->head);
549         }
550 #endif
551
552         priv->nfinfo_bytes = counter_u64_alloc(M_WAITOK);
553         priv->nfinfo_packets = counter_u64_alloc(M_WAITOK);
554         priv->nfinfo_bytes6 = counter_u64_alloc(M_WAITOK);
555         priv->nfinfo_packets6 = counter_u64_alloc(M_WAITOK);
556         priv->nfinfo_sbytes = counter_u64_alloc(M_WAITOK);
557         priv->nfinfo_spackets = counter_u64_alloc(M_WAITOK);
558         priv->nfinfo_sbytes6 = counter_u64_alloc(M_WAITOK);
559         priv->nfinfo_spackets6 = counter_u64_alloc(M_WAITOK);
560         priv->nfinfo_act_exp = counter_u64_alloc(M_WAITOK);
561         priv->nfinfo_inact_exp = counter_u64_alloc(M_WAITOK);
562
563         ng_netflow_v9_cache_init(priv);
564         CTR0(KTR_NET, "ng_netflow startup()");
565 }
566
567 /* Initialize new FIB table for v5 and v9 */
568 int
569 ng_netflow_fib_init(priv_p priv, int fib)
570 {
571         fib_export_p    fe = priv_to_fib(priv, fib);
572
573         CTR1(KTR_NET, "ng_netflow(): fib init: %d", fib);
574
575         if (fe != NULL)
576                 return (0);
577
578         if ((fe = malloc(sizeof(struct fib_export), M_NETGRAPH,
579             M_NOWAIT | M_ZERO)) == NULL)
580                 return (ENOMEM);
581
582         mtx_init(&fe->export_mtx, "export dgram lock", NULL, MTX_DEF);
583         mtx_init(&fe->export9_mtx, "export9 dgram lock", NULL, MTX_DEF);
584         fe->fib = fib;
585         fe->domain_id = fib;
586
587         if (atomic_cmpset_ptr((volatile uintptr_t *)&priv->fib_data[fib],
588             (uintptr_t)NULL, (uintptr_t)fe) == 0) {
589                 /* FIB already set up by other ISR */
590                 CTR3(KTR_NET, "ng_netflow(): fib init: %d setup %p but got %p",
591                     fib, fe, priv_to_fib(priv, fib));
592                 mtx_destroy(&fe->export_mtx);
593                 mtx_destroy(&fe->export9_mtx);
594                 free(fe, M_NETGRAPH);
595         } else {
596                 /* Increase counter for statistics */
597                 CTR3(KTR_NET, "ng_netflow(): fib %d setup to %p (%p)",
598                     fib, fe, priv_to_fib(priv, fib));
599                 priv->nfinfo_alloc_fibs++;
600         }
601         
602         return (0);
603 }
604
605 /* Free all flow cache memory. Called from node close method. */
606 void
607 ng_netflow_cache_flush(priv_p priv)
608 {
609         struct flow_entry       *fle, *fle1;
610         struct flow_hash_entry  *hsh;
611         struct netflow_export_item exp;
612         fib_export_p fe;
613         int i;
614
615         bzero(&exp, sizeof(exp));
616
617         /*
618          * We are going to free probably billable data.
619          * Expire everything before freeing it.
620          * No locking is required since callout is already drained.
621          */
622         for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++)
623                 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
624                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
625                         fe = priv_to_fib(priv, fle->f.r.fib);
626                         expire_flow(priv, fe, fle, NG_QUEUE);
627                 }
628 #ifdef INET6
629         for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++)
630                 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
631                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
632                         fe = priv_to_fib(priv, fle->f.r.fib);
633                         expire_flow(priv, fe, fle, NG_QUEUE);
634                 }
635 #endif
636
637         uma_zdestroy(priv->zone);
638         /* Destroy hash mutexes. */
639         for (i = 0, hsh = priv->hash; i < NBUCKETS; i++, hsh++)
640                 mtx_destroy(&hsh->mtx);
641
642         /* Free hash memory. */
643         if (priv->hash != NULL)
644                 free(priv->hash, M_NETFLOW_HASH);
645 #ifdef INET6
646         uma_zdestroy(priv->zone6);
647         /* Destroy hash mutexes. */
648         for (i = 0, hsh = priv->hash6; i < NBUCKETS; i++, hsh++)
649                 mtx_destroy(&hsh->mtx);
650
651         /* Free hash memory. */
652         if (priv->hash6 != NULL)
653                 free(priv->hash6, M_NETFLOW_HASH);
654 #endif
655
656         for (i = 0; i < priv->maxfibs; i++) {
657                 if ((fe = priv_to_fib(priv, i)) == NULL)
658                         continue;
659
660                 if (fe->exp.item != NULL)
661                         export_send(priv, fe, fe->exp.item, NG_QUEUE);
662
663                 if (fe->exp.item9 != NULL)
664                         export9_send(priv, fe, fe->exp.item9,
665                             fe->exp.item9_opt, NG_QUEUE);
666
667                 mtx_destroy(&fe->export_mtx);
668                 mtx_destroy(&fe->export9_mtx);
669                 free(fe, M_NETGRAPH);
670         }
671
672         counter_u64_free(priv->nfinfo_bytes);
673         counter_u64_free(priv->nfinfo_packets);
674         counter_u64_free(priv->nfinfo_bytes6);
675         counter_u64_free(priv->nfinfo_packets6);
676         counter_u64_free(priv->nfinfo_sbytes);
677         counter_u64_free(priv->nfinfo_spackets);
678         counter_u64_free(priv->nfinfo_sbytes6);
679         counter_u64_free(priv->nfinfo_spackets6);
680         counter_u64_free(priv->nfinfo_act_exp);
681         counter_u64_free(priv->nfinfo_inact_exp);
682
683         ng_netflow_v9_cache_flush(priv);
684 }
685
686 /* Insert packet from into flow cache. */
687 int
688 ng_netflow_flow_add(priv_p priv, fib_export_p fe, struct ip *ip,
689     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
690     unsigned int src_if_index)
691 {
692         struct flow_entry       *fle, *fle1;
693         struct flow_hash_entry  *hsh;
694         struct flow_rec         r;
695         int                     hlen, plen;
696         int                     error = 0;
697         uint16_t                eproto;
698         uint8_t                 tcp_flags = 0;
699
700         bzero(&r, sizeof(r));
701
702         if (ip->ip_v != IPVERSION)
703                 return (EINVAL);
704
705         hlen = ip->ip_hl << 2;
706         if (hlen < sizeof(struct ip))
707                 return (EINVAL);
708
709         eproto = ETHERTYPE_IP;
710         /* Assume L4 template by default */
711         r.flow_type = NETFLOW_V9_FLOW_V4_L4;
712
713         r.r_src = ip->ip_src;
714         r.r_dst = ip->ip_dst;
715         r.fib = fe->fib;
716
717         plen = ntohs(ip->ip_len);
718
719         r.r_ip_p = ip->ip_p;
720         r.r_tos = ip->ip_tos;
721
722         r.r_i_ifx = src_if_index;
723
724         /*
725          * XXX NOTE: only first fragment of fragmented TCP, UDP and
726          * ICMP packet will be recorded with proper s_port and d_port.
727          * Following fragments will be recorded simply as IP packet with
728          * ip_proto = ip->ip_p and s_port, d_port set to zero.
729          * I know, it looks like bug. But I don't want to re-implement
730          * ip packet assebmling here. Anyway, (in)famous trafd works this way -
731          * and nobody complains yet :)
732          */
733         if ((ip->ip_off & htons(IP_OFFMASK)) == 0)
734                 switch(r.r_ip_p) {
735                 case IPPROTO_TCP:
736                     {
737                         struct tcphdr *tcp;
738
739                         tcp = (struct tcphdr *)((caddr_t )ip + hlen);
740                         r.r_sport = tcp->th_sport;
741                         r.r_dport = tcp->th_dport;
742                         tcp_flags = tcp->th_flags;
743                         break;
744                     }
745                 case IPPROTO_UDP:
746                         r.r_ports = *(uint32_t *)((caddr_t )ip + hlen);
747                         break;
748                 }
749
750         counter_u64_add(priv->nfinfo_packets, 1);
751         counter_u64_add(priv->nfinfo_bytes, plen);
752
753         /* Find hash slot. */
754         hsh = &priv->hash[ip_hash(&r)];
755
756         mtx_lock(&hsh->mtx);
757
758         /*
759          * Go through hash and find our entry. If we encounter an
760          * entry, that should be expired, purge it. We do a reverse
761          * search since most active entries are first, and most
762          * searches are done on most active entries.
763          */
764         TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
765                 if (bcmp(&r, &fle->f.r, sizeof(struct flow_rec)) == 0)
766                         break;
767                 if ((INACTIVE(fle) && SMALL(fle)) || AGED(fle)) {
768                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
769                         expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
770                             fle, NG_QUEUE);
771                         counter_u64_add(priv->nfinfo_act_exp, 1);
772                 }
773         }
774
775         if (fle) {                      /* An existent entry. */
776
777                 fle->f.bytes += plen;
778                 fle->f.packets ++;
779                 fle->f.tcp_flags |= tcp_flags;
780                 fle->f.last = time_uptime;
781
782                 /*
783                  * We have the following reasons to expire flow in active way:
784                  * - it hit active timeout
785                  * - a TCP connection closed
786                  * - it is going to overflow counter
787                  */
788                 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle) ||
789                     (fle->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
790                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
791                         expire_flow(priv, priv_to_fib(priv, fle->f.r.fib),
792                             fle, NG_QUEUE);
793                         counter_u64_add(priv->nfinfo_act_exp, 1);
794                 } else {
795                         /*
796                          * It is the newest, move it to the tail,
797                          * if it isn't there already. Next search will
798                          * locate it quicker.
799                          */
800                         if (fle != TAILQ_LAST(&hsh->head, fhead)) {
801                                 TAILQ_REMOVE(&hsh->head, fle, fle_hash);
802                                 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
803                         }
804                 }
805         } else                          /* A new flow entry. */
806                 error = hash_insert(priv, hsh, &r, plen, flags, tcp_flags);
807
808         mtx_unlock(&hsh->mtx);
809
810         return (error);
811 }
812
813 #ifdef INET6
814 /* Insert IPv6 packet from into flow cache. */
815 int
816 ng_netflow_flow6_add(priv_p priv, fib_export_p fe, struct ip6_hdr *ip6,
817     caddr_t upper_ptr, uint8_t upper_proto, uint8_t flags,
818     unsigned int src_if_index)
819 {
820         struct flow_entry       *fle = NULL, *fle1;
821         struct flow6_entry      *fle6;
822         struct flow_hash_entry  *hsh;
823         struct flow6_rec        r;
824         int                     plen;
825         int                     error = 0;
826         uint8_t                 tcp_flags = 0;
827
828         /* check version */
829         if ((ip6->ip6_vfc & IPV6_VERSION_MASK) != IPV6_VERSION)
830                 return (EINVAL);
831
832         bzero(&r, sizeof(r));
833
834         r.src.r_src6 = ip6->ip6_src;
835         r.dst.r_dst6 = ip6->ip6_dst;
836         r.fib = fe->fib;
837
838         /* Assume L4 template by default */
839         r.flow_type = NETFLOW_V9_FLOW_V6_L4;
840
841         plen = ntohs(ip6->ip6_plen) + sizeof(struct ip6_hdr);
842
843 #if 0
844         /* XXX: set DSCP/CoS value */
845         r.r_tos = ip->ip_tos;
846 #endif
847         if ((flags & NG_NETFLOW_IS_FRAG) == 0) {
848                 switch(upper_proto) {
849                 case IPPROTO_TCP:
850                     {
851                         struct tcphdr *tcp;
852
853                         tcp = (struct tcphdr *)upper_ptr;
854                         r.r_ports = *(uint32_t *)upper_ptr;
855                         tcp_flags = tcp->th_flags;
856                         break;
857                     }
858                 case IPPROTO_UDP:
859                 case IPPROTO_SCTP:
860                         r.r_ports = *(uint32_t *)upper_ptr;
861                         break;
862                 }
863         }       
864
865         r.r_ip_p = upper_proto;
866         r.r_i_ifx = src_if_index;
867  
868         counter_u64_add(priv->nfinfo_packets6, 1);
869         counter_u64_add(priv->nfinfo_bytes6, plen);
870
871         /* Find hash slot. */
872         hsh = &priv->hash6[ip6_hash(&r)];
873
874         mtx_lock(&hsh->mtx);
875
876         /*
877          * Go through hash and find our entry. If we encounter an
878          * entry, that should be expired, purge it. We do a reverse
879          * search since most active entries are first, and most
880          * searches are done on most active entries.
881          */
882         TAILQ_FOREACH_REVERSE_SAFE(fle, &hsh->head, fhead, fle_hash, fle1) {
883                 if (fle->f.version != IP6VERSION)
884                         continue;
885                 fle6 = (struct flow6_entry *)fle;
886                 if (bcmp(&r, &fle6->f.r, sizeof(struct flow6_rec)) == 0)
887                         break;
888                 if ((INACTIVE(fle6) && SMALL(fle6)) || AGED(fle6)) {
889                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
890                         expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
891                             NG_QUEUE);
892                         counter_u64_add(priv->nfinfo_act_exp, 1);
893                 }
894         }
895
896         if (fle != NULL) {                      /* An existent entry. */
897                 fle6 = (struct flow6_entry *)fle;
898
899                 fle6->f.bytes += plen;
900                 fle6->f.packets ++;
901                 fle6->f.tcp_flags |= tcp_flags;
902                 fle6->f.last = time_uptime;
903
904                 /*
905                  * We have the following reasons to expire flow in active way:
906                  * - it hit active timeout
907                  * - a TCP connection closed
908                  * - it is going to overflow counter
909                  */
910                 if (tcp_flags & TH_FIN || tcp_flags & TH_RST || AGED(fle6) ||
911                     (fle6->f.bytes >= (CNTR_MAX - IF_MAXMTU)) ) {
912                         TAILQ_REMOVE(&hsh->head, fle, fle_hash);
913                         expire_flow(priv, priv_to_fib(priv, fle->f.r.fib), fle,
914                             NG_QUEUE);
915                         counter_u64_add(priv->nfinfo_act_exp, 1);
916                 } else {
917                         /*
918                          * It is the newest, move it to the tail,
919                          * if it isn't there already. Next search will
920                          * locate it quicker.
921                          */
922                         if (fle != TAILQ_LAST(&hsh->head, fhead)) {
923                                 TAILQ_REMOVE(&hsh->head, fle, fle_hash);
924                                 TAILQ_INSERT_TAIL(&hsh->head, fle, fle_hash);
925                         }
926                 }
927         } else                          /* A new flow entry. */
928                 error = hash6_insert(priv, hsh, &r, plen, flags, tcp_flags);
929
930         mtx_unlock(&hsh->mtx);
931
932         return (error);
933 }
934 #endif
935
936 /*
937  * Return records from cache to userland.
938  *
939  * TODO: matching particular IP should be done in kernel, here.
940  */
941 int
942 ng_netflow_flow_show(priv_p priv, struct ngnf_show_header *req,
943 struct ngnf_show_header *resp)
944 {
945         struct flow_hash_entry  *hsh;
946         struct flow_entry       *fle;
947         struct flow_entry_data  *data = (struct flow_entry_data *)(resp + 1);
948 #ifdef INET6
949         struct flow6_entry_data *data6 = (struct flow6_entry_data *)(resp + 1);
950 #endif
951         int     i, max;
952
953         i = req->hash_id;
954         if (i > NBUCKETS-1)
955                 return (EINVAL);
956
957 #ifdef INET6
958         if (req->version == 6) {
959                 resp->version = 6;
960                 hsh = priv->hash6 + i;
961                 max = NREC6_AT_ONCE;
962         } else
963 #endif
964         if (req->version == 4) {
965                 resp->version = 4;
966                 hsh = priv->hash + i;
967                 max = NREC_AT_ONCE;
968         } else
969                 return (EINVAL);
970
971         /*
972          * We will transfer not more than NREC_AT_ONCE. More data
973          * will come in next message.
974          * We send current hash index and current record number in list 
975          * to userland, and userland should return it back to us. 
976          * Then, we will restart with new entry.
977          *
978          * The resulting cache snapshot can be inaccurate if flow expiration
979          * is taking place on hash item between userland data requests for 
980          * this hash item id.
981          */
982         resp->nentries = 0;
983         for (; i < NBUCKETS; hsh++, i++) {
984                 int list_id;
985
986                 if (mtx_trylock(&hsh->mtx) == 0) {
987                         /* 
988                          * Requested hash index is not available,
989                          * relay decision to skip or re-request data
990                          * to userland.
991                          */
992                         resp->hash_id = i;
993                         resp->list_id = 0;
994                         return (0);
995                 }
996
997                 list_id = 0;
998                 TAILQ_FOREACH(fle, &hsh->head, fle_hash) {
999                         if (hsh->mtx.mtx_lock & MTX_CONTESTED) {
1000                                 resp->hash_id = i;
1001                                 resp->list_id = list_id;
1002                                 mtx_unlock(&hsh->mtx);
1003                                 return (0);
1004                         }
1005
1006                         list_id++;
1007                         /* Search for particular record in list. */
1008                         if (req->list_id > 0) {
1009                                 if (list_id < req->list_id)
1010                                         continue;
1011
1012                                 /* Requested list position found. */
1013                                 req->list_id = 0;
1014                         }
1015 #ifdef INET6
1016                         if (req->version == 6) {
1017                                 struct flow6_entry *fle6;
1018
1019                                 fle6 = (struct flow6_entry *)fle;
1020                                 bcopy(&fle6->f, data6 + resp->nentries,
1021                                     sizeof(fle6->f));
1022                         } else
1023 #endif
1024                                 bcopy(&fle->f, data + resp->nentries,
1025                                     sizeof(fle->f));
1026                         resp->nentries++;
1027                         if (resp->nentries == max) {
1028                                 resp->hash_id = i;
1029                                 /* 
1030                                  * If it was the last item in list
1031                                  * we simply skip to next hash_id.
1032                                  */
1033                                 resp->list_id = list_id + 1;
1034                                 mtx_unlock(&hsh->mtx);
1035                                 return (0);
1036                         }
1037                 }
1038                 mtx_unlock(&hsh->mtx);
1039         }
1040
1041         resp->hash_id = resp->list_id = 0;
1042
1043         return (0);
1044 }
1045
1046 /* We have full datagram in privdata. Send it to export hook. */
1047 static int
1048 export_send(priv_p priv, fib_export_p fe, item_p item, int flags)
1049 {
1050         struct mbuf *m = NGI_M(item);
1051         struct netflow_v5_export_dgram *dgram = mtod(m,
1052                                         struct netflow_v5_export_dgram *);
1053         struct netflow_v5_header *header = &dgram->header;
1054         struct timespec ts;
1055         int error = 0;
1056
1057         /* Fill mbuf header. */
1058         m->m_len = m->m_pkthdr.len = sizeof(struct netflow_v5_record) *
1059            header->count + sizeof(struct netflow_v5_header);
1060
1061         /* Fill export header. */
1062         header->sys_uptime = htonl(MILLIUPTIME(time_uptime));
1063         getnanotime(&ts);
1064         header->unix_secs  = htonl(ts.tv_sec);
1065         header->unix_nsecs = htonl(ts.tv_nsec);
1066         header->engine_type = 0;
1067         header->engine_id = fe->domain_id;
1068         header->pad = 0;
1069         header->flow_seq = htonl(atomic_fetchadd_32(&fe->flow_seq,
1070             header->count));
1071         header->count = htons(header->count);
1072
1073         if (priv->export != NULL)
1074                 NG_FWD_ITEM_HOOK_FLAGS(error, item, priv->export, flags);
1075         else
1076                 NG_FREE_ITEM(item);
1077
1078         return (error);
1079 }
1080
1081
1082 /* Add export record to dgram. */
1083 static int
1084 export_add(item_p item, struct flow_entry *fle)
1085 {
1086         struct netflow_v5_export_dgram *dgram = mtod(NGI_M(item),
1087                                         struct netflow_v5_export_dgram *);
1088         struct netflow_v5_header *header = &dgram->header;
1089         struct netflow_v5_record *rec;
1090
1091         rec = &dgram->r[header->count];
1092         header->count ++;
1093
1094         KASSERT(header->count <= NETFLOW_V5_MAX_RECORDS,
1095             ("ng_netflow: export too big"));
1096
1097         /* Fill in export record. */
1098         rec->src_addr = fle->f.r.r_src.s_addr;
1099         rec->dst_addr = fle->f.r.r_dst.s_addr;
1100         rec->next_hop = fle->f.next_hop.s_addr;
1101         rec->i_ifx    = htons(fle->f.fle_i_ifx);
1102         rec->o_ifx    = htons(fle->f.fle_o_ifx);
1103         rec->packets  = htonl(fle->f.packets);
1104         rec->octets   = htonl(fle->f.bytes);
1105         rec->first    = htonl(MILLIUPTIME(fle->f.first));
1106         rec->last     = htonl(MILLIUPTIME(fle->f.last));
1107         rec->s_port   = fle->f.r.r_sport;
1108         rec->d_port   = fle->f.r.r_dport;
1109         rec->flags    = fle->f.tcp_flags;
1110         rec->prot     = fle->f.r.r_ip_p;
1111         rec->tos      = fle->f.r.r_tos;
1112         rec->dst_mask = fle->f.dst_mask;
1113         rec->src_mask = fle->f.src_mask;
1114         rec->pad1     = 0;
1115         rec->pad2     = 0;
1116
1117         /* Not supported fields. */
1118         rec->src_as = rec->dst_as = 0;
1119
1120         if (header->count == NETFLOW_V5_MAX_RECORDS)
1121                 return (1); /* end of datagram */
1122         else
1123                 return (0);     
1124 }
1125
1126 /* Periodic flow expiry run. */
1127 void
1128 ng_netflow_expire(void *arg)
1129 {
1130         struct flow_entry       *fle, *fle1;
1131         struct flow_hash_entry  *hsh;
1132         priv_p                  priv = (priv_p )arg;
1133         int                     used, i;
1134
1135         /*
1136          * Going through all the cache.
1137          */
1138         used = uma_zone_get_cur(priv->zone);
1139         for (hsh = priv->hash, i = 0; i < NBUCKETS; hsh++, i++) {
1140                 /*
1141                  * Skip entries, that are already being worked on.
1142                  */
1143                 if (mtx_trylock(&hsh->mtx) == 0)
1144                         continue;
1145
1146                 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1147                         /*
1148                          * Interrupt thread wants this entry!
1149                          * Quick! Quick! Bail out!
1150                          */
1151                         if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1152                                 break;
1153
1154                         /*
1155                          * Don't expire aggressively while hash collision
1156                          * ratio is predicted small.
1157                          */
1158                         if (used <= (NBUCKETS*2) && !INACTIVE(fle))
1159                                 break;
1160
1161                         if ((INACTIVE(fle) && (SMALL(fle) ||
1162                             (used > (NBUCKETS*2)))) || AGED(fle)) {
1163                                 TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1164                                 expire_flow(priv, priv_to_fib(priv,
1165                                     fle->f.r.fib), fle, NG_NOFLAGS);
1166                                 used--;
1167                                 counter_u64_add(priv->nfinfo_inact_exp, 1);
1168                         }
1169                 }
1170                 mtx_unlock(&hsh->mtx);
1171         }
1172
1173 #ifdef INET6
1174         used = uma_zone_get_cur(priv->zone6);
1175         for (hsh = priv->hash6, i = 0; i < NBUCKETS; hsh++, i++) {
1176                 struct flow6_entry      *fle6;
1177
1178                 /*
1179                  * Skip entries, that are already being worked on.
1180                  */
1181                 if (mtx_trylock(&hsh->mtx) == 0)
1182                         continue;
1183
1184                 TAILQ_FOREACH_SAFE(fle, &hsh->head, fle_hash, fle1) {
1185                         fle6 = (struct flow6_entry *)fle;
1186                         /*
1187                          * Interrupt thread wants this entry!
1188                          * Quick! Quick! Bail out!
1189                          */
1190                         if (hsh->mtx.mtx_lock & MTX_CONTESTED)
1191                                 break;
1192
1193                         /*
1194                          * Don't expire aggressively while hash collision
1195                          * ratio is predicted small.
1196                          */
1197                         if (used <= (NBUCKETS*2) && !INACTIVE(fle6))
1198                                 break;
1199
1200                         if ((INACTIVE(fle6) && (SMALL(fle6) ||
1201                             (used > (NBUCKETS*2)))) || AGED(fle6)) {
1202                                 TAILQ_REMOVE(&hsh->head, fle, fle_hash);
1203                                 expire_flow(priv, priv_to_fib(priv,
1204                                     fle->f.r.fib), fle, NG_NOFLAGS);
1205                                 used--;
1206                                 counter_u64_add(priv->nfinfo_inact_exp, 1);
1207                         }
1208                 }
1209                 mtx_unlock(&hsh->mtx);
1210         }
1211 #endif
1212
1213         /* Schedule next expire. */
1214         callout_reset(&priv->exp_callout, (1*hz), &ng_netflow_expire,
1215             (void *)priv);
1216 }