]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/kern/sonewconn_overflow.py
Mark the repository as being converted to Git.
[FreeBSD/FreeBSD.git] / tests / sys / kern / sonewconn_overflow.py
1 #!/usr/bin/env python
2 #-
3 # SPDX-License-Identifier: BSD-2-Clause
4 #
5 # Copyright (c) 2020 Netflix, Inc.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions
9 # are met:
10 # 1. Redistributions of source code must retain the above copyright
11 #    notice, this list of conditions and the following disclaimer.
12 # 2. Redistributions in binary form must reproduce the above copyright
13 #    notice, this list of conditions and the following disclaimer in the
14 #    documentation and/or other materials provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 # SUCH DAMAGE.
27 #
28 # $FreeBSD$
29 #
30
31 import socket
32 import os
33 import sys
34 from subprocess import check_output
35 from time import sleep
36
37 V4HOST = '127.0.0.1'
38 V6HOST = '::1'
39 TCPPORT = 65432
40 UNIXSOCK = '/tmp/testsock'
41 TYPE = socket.SOCK_STREAM
42
43 class GenericTest(object):
44     def __init__(self):
45         raise NotImplementedError("Subclass must override the __init__ method")
46     def setup(self, af, addr):
47         self.sockets = []
48         self.ls = None
49         self.ls = socket.socket(af, TYPE)
50         self.ls.bind(addr)
51         self.ls.listen(2)
52         self.af = af
53         self.addr = addr
54     def doTest(self, cnt):
55         rv = 0
56         for i in range(0, cnt):
57             try:
58                 s = socket.socket(self.af, TYPE)
59                 s.connect(self.addr)
60             except:
61                 continue
62             self.sockets.append(s)
63             rv += 1
64         return rv
65     def __del__(self):
66         for s in self.sockets:
67             s.close()
68         if self.ls is not None:
69             self.ls.close()
70
71 class IPv4Test(GenericTest):
72     def __init__(self):
73         super(IPv4Test, self).setup(socket.AF_INET, (V4HOST, TCPPORT))
74
75 class IPv6Test(GenericTest):
76     def __init__(self):
77         super(IPv6Test, self).setup(socket.AF_INET6, (V6HOST, TCPPORT))
78
79 class UnixTest(GenericTest):
80     def __init__(self):
81         super(UnixTest, self).setup(socket.AF_UNIX, UNIXSOCK)
82     def __del__(self):
83         super(UnixTest, self).__del__()
84         os.remove(UNIXSOCK)
85
86 class LogChecker():
87     def __init__(self):
88         # Clear the dmesg buffer to prevent rotating causes issues
89         os.system('/sbin/dmesg -c > /dev/null')
90         # Figure out how big the dmesg buffer is.
91         self.dmesgOff = len(check_output("/sbin/dmesg"))
92
93     def checkForMsg(self, expected):
94         newOff = self.dmesgOff
95         for i in range(0, 3):
96             dmesg = check_output("/sbin/dmesg")
97             newOff = len(dmesg)
98             if newOff >= self.dmesgOff:
99                 dmesg = dmesg[self.dmesgOff:]
100             for line in dmesg.splitlines():
101                 try:
102                     if str(line).find(expected) >= 0:
103                         self.dmesgOff = newOff
104                         return True
105                 except:
106                     pass
107             sleep(0.5)
108         self.dmesgOff = newOff
109         return False
110
111 def main():
112     ip4 = IPv4Test()
113     ip6 = IPv6Test()
114     lcl = UnixTest()
115     lc = LogChecker()
116     failure = False
117
118     STDLOGMSG = "Listen queue overflow: 4 already in queue awaiting acceptance (1 occurrences)"
119
120     V4LOGMSG = "(%s:%d (proto 6)): %s" % (V4HOST, TCPPORT, STDLOGMSG)
121     ip4.doTest(5)
122     if not lc.checkForMsg(V4LOGMSG):
123         failure = True
124         sys.stderr.write("IPv4 log message not seen\n")
125     else:
126         ip4.doTest(1)
127         if lc.checkForMsg(V4LOGMSG):
128             failure = True
129             sys.stderr.write("Subsequent IPv4 log message not suppressed\n")
130
131     V6LOGMSG = "([%s]:%d (proto 6)): %s" % (V6HOST, TCPPORT, STDLOGMSG)
132     ip6.doTest(5)
133     if not lc.checkForMsg(V6LOGMSG):
134         failure = True
135         sys.stderr.write("IPv6 log message not seen\n")
136     else:
137         ip6.doTest(1)
138         if lc.checkForMsg(V6LOGMSG):
139             failure = True
140             sys.stderr.write("Subsequent IPv6 log message not suppressed\n")
141
142     UNIXLOGMSG = "(local:%s): %s" % (UNIXSOCK, STDLOGMSG)
143     lcl.doTest(5)
144     if not lc.checkForMsg(UNIXLOGMSG):
145         failure = True
146         sys.stderr.write("Unix socket log message not seen\n")
147     else:
148         lcl.doTest(1)
149         if lc.checkForMsg(UNIXLOGMSG):
150             failure = True
151             sys.stderr.write("Subsequent Unix socket log message not suppressed\n")
152
153     if failure:
154         sys.exit(1)
155     sys.exit(0)
156
157 if __name__ == '__main__':
158     main()