]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/opencrypto/cryptotest.py
Fix `KAT(CCM)?Parser` file descriptor leaks
[FreeBSD/FreeBSD.git] / tests / sys / opencrypto / cryptotest.py
1 #!/usr/local/bin/python2
2 #
3 # Copyright (c) 2014 The FreeBSD Foundation
4 # All rights reserved.
5 #
6 # This software was developed by John-Mark Gurney under
7 # the sponsorship from the FreeBSD Foundation.
8 # Redistribution and use in source and binary forms, with or without
9 # modification, are permitted provided that the following conditions
10 # are met:
11 # 1.  Redistributions of source code must retain the above copyright
12 #     notice, this list of conditions and the following disclaimer.
13 # 2.  Redistributions in binary form must reproduce the above copyright
14 #     notice, this list of conditions and the following disclaimer in the
15 #     documentation and/or other materials provided with the distribution.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 # SUCH DAMAGE.
28 #
29 # $FreeBSD$
30 #
31
32 from __future__ import print_function
33
34 import binascii
35 import errno
36 import cryptodev
37 import itertools
38 import os
39 import struct
40 import unittest
41 from cryptodev import *
42 from glob import iglob
43
44 katdir = '/usr/local/share/nist-kat'
45
46 def katg(base, glob):
47     assert os.path.exists(katdir), "Please 'pkg install nist-kat'"
48     if not os.path.exists(os.path.join(katdir, base)):
49         raise unittest.SkipTest("Missing %s test vectors" % (base))
50     return iglob(os.path.join(katdir, base, glob))
51
52 aesmodules = [ 'cryptosoft0', 'aesni0', 'ccr0', 'ccp0' ]
53 desmodules = [ 'cryptosoft0', ]
54 shamodules = [ 'cryptosoft0', 'aesni0', 'ccr0', 'ccp0' ]
55
56 def GenTestCase(cname):
57     try:
58         crid = cryptodev.Crypto.findcrid(cname)
59     except IOError:
60         return None
61
62     class GendCryptoTestCase(unittest.TestCase):
63         ###############
64         ##### AES #####
65         ###############
66         @unittest.skipIf(cname not in aesmodules, 'skipping AES-XTS on %s' % (cname))
67         def test_xts(self):
68             for i in katg('XTSTestVectors/format tweak value input - data unit seq no', '*.rsp'):
69                 self.runXTS(i, cryptodev.CRYPTO_AES_XTS)
70
71         @unittest.skipIf(cname not in aesmodules, 'skipping AES-CBC on %s' % (cname))
72         def test_cbc(self):
73             for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
74                 self.runCBC(i)
75
76         @unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname))
77         def test_ccm(self):
78             for i in katg('ccmtestvectors', 'V*.rsp'):
79                 self.runCCMEncrypt(i)
80
81             for i in katg('ccmtestvectors', 'D*.rsp'):
82                 self.runCCMDecrypt(i)
83
84         @unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname))
85         def test_gcm(self):
86             for i in katg('gcmtestvectors', 'gcmEncrypt*'):
87                 self.runGCM(i, 'ENCRYPT')
88
89             for i in katg('gcmtestvectors', 'gcmDecrypt*'):
90                 self.runGCM(i, 'DECRYPT')
91
92         _gmacsizes = { 32: cryptodev.CRYPTO_AES_256_NIST_GMAC,
93             24: cryptodev.CRYPTO_AES_192_NIST_GMAC,
94             16: cryptodev.CRYPTO_AES_128_NIST_GMAC,
95         }
96         def runGCM(self, fname, mode):
97             curfun = None
98             if mode == 'ENCRYPT':
99                 swapptct = False
100                 curfun = Crypto.encrypt
101             elif mode == 'DECRYPT':
102                 swapptct = True
103                 curfun = Crypto.decrypt
104             else:
105                 raise RuntimeError('unknown mode: %r' % repr(mode))
106
107             columns = [ 'Count', 'Key', 'IV', 'CT', 'AAD', 'Tag', 'PT', ]
108             with cryptodev.KATParser(fname, columns) as parser:
109                 self.runGCMWithParser(parser, mode)
110
111         def runGCMWithParser(self, parser, mode):
112             for _, lines in next(parser):
113                 for data in lines:
114                     curcnt = int(data['Count'])
115                     cipherkey = binascii.unhexlify(data['Key'])
116                     iv = binascii.unhexlify(data['IV'])
117                     aad = binascii.unhexlify(data['AAD'])
118                     tag = binascii.unhexlify(data['Tag'])
119                     if 'FAIL' not in data:
120                         pt = binascii.unhexlify(data['PT'])
121                     ct = binascii.unhexlify(data['CT'])
122
123                     if len(iv) != 12:
124                         # XXX - isn't supported
125                         continue
126
127                     try:
128                         c = Crypto(cryptodev.CRYPTO_AES_NIST_GCM_16,
129                             cipherkey,
130                             mac=self._gmacsizes[len(cipherkey)],
131                             mackey=cipherkey, crid=crid,
132                             maclen=16)
133                     except EnvironmentError as e:
134                         # Can't test algorithms the driver does not support.
135                         if e.errno != errno.EOPNOTSUPP:
136                             raise
137                         continue
138
139                     if mode == 'ENCRYPT':
140                         try:
141                             rct, rtag = c.encrypt(pt, iv, aad)
142                         except EnvironmentError as e:
143                             # Can't test inputs the driver does not support.
144                             if e.errno != errno.EINVAL:
145                                 raise
146                             continue
147                         rtag = rtag[:len(tag)]
148                         data['rct'] = binascii.hexlify(rct)
149                         data['rtag'] = binascii.hexlify(rtag)
150                         self.assertEqual(rct, ct, repr(data))
151                         self.assertEqual(rtag, tag, repr(data))
152                     else:
153                         if len(tag) != 16:
154                             continue
155                         args = (ct, iv, aad, tag)
156                         if 'FAIL' in data:
157                             self.assertRaises(IOError,
158                                 c.decrypt, *args)
159                         else:
160                             try:
161                                 rpt, rtag = c.decrypt(*args)
162                             except EnvironmentError as e:
163                                 # Can't test inputs the driver does not support.
164                                 if e.errno != errno.EINVAL:
165                                     raise
166                                 continue
167                             data['rpt'] = binascii.hexlify(rpt)
168                             data['rtag'] = binascii.hexlify(rtag)
169                             self.assertEqual(rpt, pt,
170                                 repr(data))
171
172         def runCBC(self, fname):
173             columns = [ 'COUNT', 'KEY', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]
174             with cryptodev.KATParser(fname, columns) as parser:
175                 self.runCBCWithParser(parser)
176
177         def runCBCWithParser(self, parser):
178             curfun = None
179             for mode, lines in next(parser):
180                 if mode == 'ENCRYPT':
181                     swapptct = False
182                     curfun = Crypto.encrypt
183                 elif mode == 'DECRYPT':
184                     swapptct = True
185                     curfun = Crypto.decrypt
186                 else:
187                     raise RuntimeError('unknown mode: %r' % repr(mode))
188
189                 for data in lines:
190                     curcnt = int(data['COUNT'])
191                     cipherkey = binascii.unhexlify(data['KEY'])
192                     iv = binascii.unhexlify(data['IV'])
193                     pt = binascii.unhexlify(data['PLAINTEXT'])
194                     ct = binascii.unhexlify(data['CIPHERTEXT'])
195
196                     if swapptct:
197                         pt, ct = ct, pt
198                     # run the fun
199                     c = Crypto(cryptodev.CRYPTO_AES_CBC, cipherkey, crid=crid)
200                     r = curfun(c, pt, iv)
201                     self.assertEqual(r, ct)
202
203         def runXTS(self, fname, meth):
204             columns = [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT',
205                         'CT']
206             with cryptodev.KATParser(fname, columns) as parser:
207                 self.runXTSWithParser(parser, meth)
208
209         def runXTSWithParser(self, parser, meth):
210             curfun = None
211             for mode, lines in next(parser):
212                 if mode == 'ENCRYPT':
213                     swapptct = False
214                     curfun = Crypto.encrypt
215                 elif mode == 'DECRYPT':
216                     swapptct = True
217                     curfun = Crypto.decrypt
218                 else:
219                     raise RuntimeError('unknown mode: %r' % repr(mode))
220
221                 for data in lines:
222                     curcnt = int(data['COUNT'])
223                     nbits = int(data['DataUnitLen'])
224                     cipherkey = binascii.unhexlify(data['Key'])
225                     iv = struct.pack('QQ', int(data['DataUnitSeqNumber']), 0)
226                     pt = binascii.unhexlify(data['PT'])
227                     ct = binascii.unhexlify(data['CT'])
228
229                     if nbits % 128 != 0:
230                         # XXX - mark as skipped
231                         continue
232                     if swapptct:
233                         pt, ct = ct, pt
234                     # run the fun
235                     try:
236                         c = Crypto(meth, cipherkey, crid=crid)
237                         r = curfun(c, pt, iv)
238                     except EnvironmentError as e:
239                         # Can't test hashes the driver does not support.
240                         if e.errno != errno.EOPNOTSUPP:
241                             raise
242                         continue
243                     self.assertEqual(r, ct)
244
245         def runCCMEncrypt(self, fname):
246             with cryptodev.KATCCMParser(fname) as parser:
247                 self.runCCMEncryptWithParser(parser)
248
249         def runCCMEncryptWithParser(self, parser):
250             for data in next(parser):
251                 Nlen = int(data['Nlen'])
252                 if Nlen != 12:
253                     # OCF only supports 12 byte IVs
254                     continue
255                 key = binascii.unhexlify(data['Key'])
256                 nonce = binascii.unhexlify(data['Nonce'])
257                 Alen = int(data['Alen'])
258                 if Alen != 0:
259                     aad = binascii.unhexlify(data['Adata'])
260                 else:
261                     aad = None
262                 payload = binascii.unhexlify(data['Payload'])
263                 ct = binascii.unhexlify(data['CT'])
264
265                 try:
266                     c = Crypto(crid=crid,
267                         cipher=cryptodev.CRYPTO_AES_CCM_16,
268                         key=key,
269                         mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
270                         mackey=key, maclen=16)
271                     r, tag = Crypto.encrypt(c, payload,
272                         nonce, aad)
273                 except EnvironmentError as e:
274                     if e.errno != errno.EOPNOTSUPP:
275                         raise
276                     continue
277
278                 out = r + tag
279                 self.assertEqual(out, ct,
280                     "Count " + data['Count'] + " Actual: " + \
281                     repr(binascii.hexlify(out)) + " Expected: " + \
282                     repr(data) + " on " + cname)
283
284         def runCCMDecrypt(self, fname):
285             with cryptodev.KATCCMParser(fname) as parser:
286                 self.runCCMDecryptWithParser(parser)
287
288         def runCCMDecryptWithParser(self, parser):
289             # XXX: Note that all of the current CCM
290             # decryption test vectors use IV and tag sizes
291             # that aren't supported by OCF none of the
292             # tests are actually ran.
293             for data in next(parser):
294                 Nlen = int(data['Nlen'])
295                 if Nlen != 12:
296                     # OCF only supports 12 byte IVs
297                     continue
298                 Tlen = int(data['Tlen'])
299                 if Tlen != 16:
300                     # OCF only supports 16 byte tags
301                     continue
302                 key = binascii.unhexlify(data['Key'])
303                 nonce = binascii.unhexlify(data['Nonce'])
304                 Alen = int(data['Alen'])
305                 if Alen != 0:
306                     aad = binascii.unhexlify(data['Adata'])
307                 else:
308                     aad = None
309                 ct = binascii.unhexlify(data['CT'])
310                 tag = ct[-16:]
311                 ct = ct[:-16]
312
313                 try:
314                     c = Crypto(crid=crid,
315                         cipher=cryptodev.CRYPTO_AES_CCM_16,
316                         key=key,
317                         mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
318                         mackey=key, maclen=16)
319                 except EnvironmentError as e:
320                     if e.errno != errno.EOPNOTSUPP:
321                         raise
322                     continue
323
324                 if data['Result'] == 'Fail':
325                     self.assertRaises(IOError,
326                         c.decrypt, payload, nonce, aad, tag)
327                 else:
328                     r = Crypto.decrypt(c, payload, nonce,
329                         aad, tag)
330
331                     payload = binascii.unhexlify(data['Payload'])
332                     plen = int(data('Plen'))
333                     payload = payload[:plen]
334                     self.assertEqual(r, payload,
335                         "Count " + data['Count'] + \
336                         " Actual: " + repr(binascii.hexlify(r)) + \
337                         " Expected: " + repr(data) + \
338                         " on " + cname)
339
340         ###############
341         ##### DES #####
342         ###############
343         @unittest.skipIf(cname not in desmodules, 'skipping DES on %s' % (cname))
344         def test_tdes(self):
345             for i in katg('KAT_TDES', 'TCBC[a-z]*.rsp'):
346                 self.runTDES(i)
347
348         def runTDES(self, fname):
349             columns = [ 'COUNT', 'KEYs', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]
350             with cryptodev.KATParser(fname, columns) as parser:
351                 self.runTDESWithParser(parser)
352
353         def runTDESWithParser(self, parser):
354             curfun = None
355             for mode, lines in next(parser):
356                 if mode == 'ENCRYPT':
357                     swapptct = False
358                     curfun = Crypto.encrypt
359                 elif mode == 'DECRYPT':
360                     swapptct = True
361                     curfun = Crypto.decrypt
362                 else:
363                     raise RuntimeError('unknown mode: %r' % repr(mode))
364
365                 for data in lines:
366                     curcnt = int(data['COUNT'])
367                     key = data['KEYs'] * 3
368                     cipherkey = binascii.unhexlify(key)
369                     iv = binascii.unhexlify(data['IV'])
370                     pt = binascii.unhexlify(data['PLAINTEXT'])
371                     ct = binascii.unhexlify(data['CIPHERTEXT'])
372
373                     if swapptct:
374                         pt, ct = ct, pt
375                     # run the fun
376                     c = Crypto(cryptodev.CRYPTO_3DES_CBC, cipherkey, crid=crid)
377                     r = curfun(c, pt, iv)
378                     self.assertEqual(r, ct)
379
380         ###############
381         ##### SHA #####
382         ###############
383         @unittest.skipIf(cname not in shamodules, 'skipping SHA on %s' % str(cname))
384         def test_sha(self):
385             for i in katg('shabytetestvectors', 'SHA*Msg.rsp'):
386                 self.runSHA(i)
387
388         def runSHA(self, fname):
389             # Skip SHA512_(224|256) tests
390             if fname.find('SHA512_') != -1:
391                 return
392             columns = [ 'Len', 'Msg', 'MD' ]
393             with cryptodev.KATParser(fname, columns) as parser:
394                 self.runSHAWithParser(parser)
395
396         def runSHAWithParser(self, parser):
397             for hashlength, lines in next(parser):
398                 # E.g., hashlength will be "L=20" (bytes)
399                 hashlen = int(hashlength.split("=")[1])
400
401                 if hashlen == 20:
402                     alg = cryptodev.CRYPTO_SHA1
403                 elif hashlen == 28:
404                     alg = cryptodev.CRYPTO_SHA2_224
405                 elif hashlen == 32:
406                     alg = cryptodev.CRYPTO_SHA2_256
407                 elif hashlen == 48:
408                     alg = cryptodev.CRYPTO_SHA2_384
409                 elif hashlen == 64:
410                     alg = cryptodev.CRYPTO_SHA2_512
411                 else:
412                     # Skip unsupported hashes
413                     # Slurp remaining input in section
414                     for data in lines:
415                         continue
416                     continue
417
418                 for data in lines:
419                     msg = binascii.unhexlify(data['Msg'])
420                     msg = msg[:int(data['Len'])]
421                     md = binascii.unhexlify(data['MD'])
422
423                     try:
424                         c = Crypto(mac=alg, crid=crid,
425                             maclen=hashlen)
426                     except EnvironmentError as e:
427                         # Can't test hashes the driver does not support.
428                         if e.errno != errno.EOPNOTSUPP:
429                             raise
430                         continue
431
432                     _, r = c.encrypt(msg, iv="")
433
434                     self.assertEqual(r, md, "Actual: " + \
435                         repr(binascii.hexlify(r)) + " Expected: " + repr(data) + " on " + cname)
436
437         @unittest.skipIf(cname not in shamodules, 'skipping SHA-HMAC on %s' % str(cname))
438         def test_sha1hmac(self):
439             for i in katg('hmactestvectors', 'HMAC.rsp'):
440                 self.runSHA1HMAC(i)
441
442         def runSHA1HMAC(self, fname):
443             columns = [ 'Count', 'Klen', 'Tlen', 'Key', 'Msg', 'Mac' ]
444             with cryptodev.KATParser(fname, columns) as parser:
445                 self.runSHA1HMACWithParser(parser)
446
447         def runSHA1HMACWithParser(self, parser):
448             for hashlength, lines in next(parser):
449                 # E.g., hashlength will be "L=20" (bytes)
450                 hashlen = int(hashlength.split("=")[1])
451
452                 blocksize = None
453                 if hashlen == 20:
454                     alg = cryptodev.CRYPTO_SHA1_HMAC
455                     blocksize = 64
456                 elif hashlen == 28:
457                     alg = cryptodev.CRYPTO_SHA2_224_HMAC
458                     blocksize = 64
459                 elif hashlen == 32:
460                     alg = cryptodev.CRYPTO_SHA2_256_HMAC
461                     blocksize = 64
462                 elif hashlen == 48:
463                     alg = cryptodev.CRYPTO_SHA2_384_HMAC
464                     blocksize = 128
465                 elif hashlen == 64:
466                     alg = cryptodev.CRYPTO_SHA2_512_HMAC
467                     blocksize = 128
468                 else:
469                     # Skip unsupported hashes
470                     # Slurp remaining input in section
471                     for data in lines:
472                         continue
473                     continue
474
475                 for data in lines:
476                     key = binascii.unhexlify(data['Key'])
477                     msg = binascii.unhexlify(data['Msg'])
478                     mac = binascii.unhexlify(data['Mac'])
479                     tlen = int(data['Tlen'])
480
481                     if len(key) > blocksize:
482                         continue
483
484                     try:
485                         c = Crypto(mac=alg, mackey=key,
486                             crid=crid, maclen=hashlen)
487                     except EnvironmentError as e:
488                         # Can't test hashes the driver does not support.
489                         if e.errno != errno.EOPNOTSUPP:
490                             raise
491                         continue
492
493                     _, r = c.encrypt(msg, iv="")
494
495                     self.assertEqual(r[:tlen], mac, "Actual: " + \
496                         repr(binascii.hexlify(r)) + " Expected: " + repr(data))
497
498     return GendCryptoTestCase
499
500 cryptosoft = GenTestCase('cryptosoft0')
501 aesni = GenTestCase('aesni0')
502 ccr = GenTestCase('ccr0')
503 ccp = GenTestCase('ccp0')
504
505 if __name__ == '__main__':
506     unittest.main()