]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/opencrypto/dpkt.py
Update to bmake-201802222
[FreeBSD/FreeBSD.git] / tests / sys / opencrypto / dpkt.py
1 # $FreeBSD$
2 # $Id: dpkt.py 114 2005-09-11 15:15:12Z dugsong $
3
4 """fast, simple packet creation / parsing, with definitions for the
5 basic TCP/IP protocols.
6 """
7
8 __author__ = 'Dug Song <dugsong@monkey.org>'
9 __copyright__ = 'Copyright (c) 2004 Dug Song'
10 __license__ = 'BSD'
11 __url__ = 'http://monkey.org/~dugsong/dpkt/'
12 __version__ = '1.2'
13
14 try:
15     from itertools import izip as _it_izip
16 except ImportError:
17     _it_izip = zip
18     
19 from struct import calcsize as _st_calcsize, \
20      pack as _st_pack, unpack as _st_unpack, error as _st_error
21 from re import compile as _re_compile
22
23 intchr = _re_compile(r"(?P<int>[0-9]+)(?P<chr>.)")
24
25 class MetaPacket(type):
26     def __new__(cls, clsname, clsbases, clsdict):
27         if '__hdr__' in clsdict:
28             st = clsdict['__hdr__']
29             clsdict['__hdr_fields__'] = [ x[0] for x in st ]
30             clsdict['__hdr_fmt__'] = clsdict.get('__byte_order__', '>') + \
31                 ''.join([ x[1] for x in st ])
32             clsdict['__hdr_len__'] = _st_calcsize(clsdict['__hdr_fmt__'])
33             clsdict['__hdr_defaults__'] = \
34                 dict(zip(clsdict['__hdr_fields__'], [ x[2] for x in st ]))
35             clsdict['__slots__'] = clsdict['__hdr_fields__']
36         return type.__new__(cls, clsname, clsbases, clsdict)
37                         
38 class Packet(object):
39     """Packet class
40
41     __hdr__ should be defined as a list of (name, structfmt, default) tuples
42     __byte_order__ can be set to override the default ('>')
43     """
44     __metaclass__ = MetaPacket
45     data = ''
46     
47     def __init__(self, *args, **kwargs):
48         """Packet constructor with ([buf], [field=val,...]) prototype.
49
50         Arguments:
51
52         buf -- packet buffer to unpack
53
54         Optional keyword arguments correspond to packet field names.
55         """
56         if args:
57             self.unpack(args[0])
58         else:
59             for k in self.__hdr_fields__:
60                 setattr(self, k, self.__hdr_defaults__[k])
61             for k, v in kwargs.iteritems():
62                 setattr(self, k, v)
63
64     def __len__(self):
65         return self.__hdr_len__ + len(self.data)
66
67     def __repr__(self):
68         l = [ '%s=%r' % (k, getattr(self, k))
69               for k in self.__hdr_defaults__
70               if getattr(self, k) != self.__hdr_defaults__[k] ]
71         if self.data:
72             l.append('data=%r' % self.data)
73         return '%s(%s)' % (self.__class__.__name__, ', '.join(l))
74
75     def __str__(self):
76         return self.pack_hdr() + str(self.data)
77     
78     def pack_hdr(self):
79         """Return packed header string."""
80         try:
81             return _st_pack(self.__hdr_fmt__,
82                             *[ getattr(self, k) for k in self.__hdr_fields__ ])
83         except _st_error:
84             vals = []
85             for k in self.__hdr_fields__:
86                 v = getattr(self, k)
87                 if isinstance(v, tuple):
88                     vals.extend(v)
89                 else:
90                     vals.append(v)
91             return _st_pack(self.__hdr_fmt__, *vals)
92     
93     def unpack(self, buf):
94         """Unpack packet header fields from buf, and set self.data."""
95
96         res = list(_st_unpack(self.__hdr_fmt__, buf[:self.__hdr_len__]))
97         for e, k in enumerate(self.__slots__):
98             sfmt = self.__hdr__[e][1]
99             mat = intchr.match(sfmt)
100             if mat and mat.group('chr') != 's':
101                 cnt = int(mat.group('int'))
102                 setattr(self, k, list(res[:cnt]))
103                 del res[:cnt]
104             else:
105                 if sfmt[-1] == 's':
106                     i = res[0].find('\x00')
107                     if i != -1:
108                         res[0] = res[0][:i]
109                 setattr(self, k, res[0])
110                 del res[0]
111         assert len(res) == 0
112         self.data = buf[self.__hdr_len__:]
113
114 # XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)])
115 __vis_filter = """................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~................................................................................................................................."""
116
117 def hexdump(buf, length=16):
118     """Return a hexdump output string of the given buffer."""
119     n = 0
120     res = []
121     while buf:
122         line, buf = buf[:length], buf[length:]
123         hexa = ' '.join(['%02x' % ord(x) for x in line])
124         line = line.translate(__vis_filter)
125         res.append('  %04d:  %-*s %s' % (n, length * 3, hexa, line))
126         n += length
127     return '\n'.join(res)
128
129 def in_cksum_add(s, buf):
130     """in_cksum_add(cksum, buf) -> cksum
131
132     Return accumulated Internet checksum.
133     """
134     nleft = len(buf)
135     i = 0
136     while nleft > 1:
137         s += ord(buf[i]) * 256 + ord(buf[i+1])
138         i += 2
139         nleft -= 2
140     if nleft:
141         s += ord(buf[i]) * 256
142     return s
143
144 def in_cksum_done(s):
145     """Fold and return Internet checksum."""
146     while (s >> 16):
147         s = (s >> 16) + (s & 0xffff)
148     return (~s & 0xffff)
149
150 def in_cksum(buf):
151     """Return computed Internet checksum."""
152     return in_cksum_done(in_cksum_add(0, buf))
153
154 try:
155     import psyco
156     psyco.bind(in_cksum)
157     psyco.bind(Packet)
158 except ImportError:
159     pass
160