]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/sys/opencrypto/cryptotest.py
THIS BRANCH IS OBSOLETE, PLEASE READ:
[FreeBSD/FreeBSD.git] / tests / sys / opencrypto / cryptotest.py
1 #!/usr/local/bin/python3
2 #
3 # Copyright (c) 2014 The FreeBSD Foundation
4 # All rights reserved.
5 # Copyright 2019 Enji Cooper
6 #
7 # This software was developed by John-Mark Gurney under
8 # the sponsorship from the FreeBSD Foundation.
9 # Redistribution and use in source and binary forms, with or without
10 # modification, are permitted provided that the following conditions
11 # are met:
12 # 1.  Redistributions of source code must retain the above copyright
13 #     notice, this list of conditions and the following disclaimer.
14 # 2.  Redistributions in binary form must reproduce the above copyright
15 #     notice, this list of conditions and the following disclaimer in the
16 #     documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
19 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 # SUCH DAMAGE.
29 #
30 # $FreeBSD$
31 #
32
33
34
35 import binascii
36 import errno
37 import cryptodev
38 import itertools
39 import os
40 import struct
41 import unittest
42 from cryptodev import *
43 from glob import iglob
44
45 katdir = '/usr/local/share/nist-kat'
46
47 def katg(base, glob):
48     assert os.path.exists(katdir), "Please 'pkg install nist-kat'"
49     if not os.path.exists(os.path.join(katdir, base)):
50         raise unittest.SkipTest("Missing %s test vectors" % (base))
51     return iglob(os.path.join(katdir, base, glob))
52
53 aesmodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0', 'safexcel0', 'qat0' ]
54 shamodules = [ 'cryptosoft0', 'aesni0', 'armv8crypto0', 'ccr0', 'ccp0', 'ossl0', 'safexcel0', 'qat0' ]
55
56 def GenTestCase(cname):
57     try:
58         crid = cryptodev.Crypto.findcrid(cname)
59     except IOError:
60         return None
61
62     class GendCryptoTestCase(unittest.TestCase):
63         ###############
64         ##### AES #####
65         ###############
66         @unittest.skipIf(cname not in aesmodules, 'skipping AES-XTS on %s' % (cname))
67         def test_xts(self):
68             for i in katg('XTSTestVectors/format tweak value input - data unit seq no', '*.rsp'):
69                 self.runXTS(i, cryptodev.CRYPTO_AES_XTS)
70
71         @unittest.skipIf(cname not in aesmodules, 'skipping AES-CBC on %s' % (cname))
72         def test_cbc(self):
73             for i in katg('KAT_AES', 'CBC[GKV]*.rsp'):
74                 self.runCBC(i)
75
76         @unittest.skipIf(cname not in aesmodules, 'skipping AES-CCM on %s' % (cname))
77         def test_ccm(self):
78             for i in katg('ccmtestvectors', 'V*.rsp'):
79                 self.runCCMEncrypt(i)
80
81             for i in katg('ccmtestvectors', 'D*.rsp'):
82                 self.runCCMDecrypt(i)
83
84         @unittest.skipIf(cname not in aesmodules, 'skipping AES-GCM on %s' % (cname))
85         def test_gcm(self):
86             for i in katg('gcmtestvectors', 'gcmEncrypt*'):
87                 self.runGCM(i, 'ENCRYPT')
88
89             for i in katg('gcmtestvectors', 'gcmDecrypt*'):
90                 self.runGCM(i, 'DECRYPT')
91
92         def runGCM(self, fname, mode):
93             curfun = None
94             if mode == 'ENCRYPT':
95                 swapptct = False
96                 curfun = Crypto.encrypt
97             elif mode == 'DECRYPT':
98                 swapptct = True
99                 curfun = Crypto.decrypt
100             else:
101                 raise RuntimeError('unknown mode: %r' % repr(mode))
102
103             columns = [ 'Count', 'Key', 'IV', 'CT', 'AAD', 'Tag', 'PT', ]
104             with cryptodev.KATParser(fname, columns) as parser:
105                 self.runGCMWithParser(parser, mode)
106
107         def runGCMWithParser(self, parser, mode):
108             for _, lines in next(parser):
109                 for data in lines:
110                     curcnt = int(data['Count'])
111                     cipherkey = binascii.unhexlify(data['Key'])
112                     iv = binascii.unhexlify(data['IV'])
113                     aad = binascii.unhexlify(data['AAD'])
114                     tag = binascii.unhexlify(data['Tag'])
115                     if 'FAIL' not in data:
116                         pt = binascii.unhexlify(data['PT'])
117                     ct = binascii.unhexlify(data['CT'])
118
119                     if len(iv) != 12:
120                         # XXX - isn't supported
121                         continue
122
123                     try:
124                         c = Crypto(cryptodev.CRYPTO_AES_NIST_GCM_16,
125                             cipherkey, crid=crid,
126                             maclen=16)
127                     except EnvironmentError as e:
128                         # Can't test algorithms the driver does not support.
129                         if e.errno != errno.EOPNOTSUPP:
130                             raise
131                         continue
132
133                     if mode == 'ENCRYPT':
134                         try:
135                             rct, rtag = c.encrypt(pt, iv, aad)
136                         except EnvironmentError as e:
137                             # Can't test inputs the driver does not support.
138                             if e.errno != errno.EINVAL:
139                                 raise
140                             continue
141                         rtag = rtag[:len(tag)]
142                         data['rct'] = binascii.hexlify(rct)
143                         data['rtag'] = binascii.hexlify(rtag)
144                         self.assertEqual(rct, ct, repr(data))
145                         self.assertEqual(rtag, tag, repr(data))
146                     else:
147                         if len(tag) != 16:
148                             continue
149                         args = (ct, iv, aad, tag)
150                         if 'FAIL' in data:
151                             self.assertRaises(IOError,
152                                 c.decrypt, *args)
153                         else:
154                             try:
155                                 rpt, rtag = c.decrypt(*args)
156                             except EnvironmentError as e:
157                                 # Can't test inputs the driver does not support.
158                                 if e.errno != errno.EINVAL:
159                                     raise
160                                 continue
161                             data['rpt'] = binascii.hexlify(rpt)
162                             data['rtag'] = binascii.hexlify(rtag)
163                             self.assertEqual(rpt, pt,
164                                 repr(data))
165
166         def runCBC(self, fname):
167             columns = [ 'COUNT', 'KEY', 'IV', 'PLAINTEXT', 'CIPHERTEXT', ]
168             with cryptodev.KATParser(fname, columns) as parser:
169                 self.runCBCWithParser(parser)
170
171         def runCBCWithParser(self, parser):
172             curfun = None
173             for mode, lines in next(parser):
174                 if mode == 'ENCRYPT':
175                     swapptct = False
176                     curfun = Crypto.encrypt
177                 elif mode == 'DECRYPT':
178                     swapptct = True
179                     curfun = Crypto.decrypt
180                 else:
181                     raise RuntimeError('unknown mode: %r' % repr(mode))
182
183                 for data in lines:
184                     curcnt = int(data['COUNT'])
185                     cipherkey = binascii.unhexlify(data['KEY'])
186                     iv = binascii.unhexlify(data['IV'])
187                     pt = binascii.unhexlify(data['PLAINTEXT'])
188                     ct = binascii.unhexlify(data['CIPHERTEXT'])
189
190                     if swapptct:
191                         pt, ct = ct, pt
192                     # run the fun
193                     c = Crypto(cryptodev.CRYPTO_AES_CBC, cipherkey, crid=crid)
194                     r = curfun(c, pt, iv)
195                     self.assertEqual(r, ct)
196
197         def runXTS(self, fname, meth):
198             columns = [ 'COUNT', 'DataUnitLen', 'Key', 'DataUnitSeqNumber', 'PT',
199                         'CT']
200             with cryptodev.KATParser(fname, columns) as parser:
201                 self.runXTSWithParser(parser, meth)
202
203         def runXTSWithParser(self, parser, meth):
204             curfun = None
205             for mode, lines in next(parser):
206                 if mode == 'ENCRYPT':
207                     swapptct = False
208                     curfun = Crypto.encrypt
209                 elif mode == 'DECRYPT':
210                     swapptct = True
211                     curfun = Crypto.decrypt
212                 else:
213                     raise RuntimeError('unknown mode: %r' % repr(mode))
214
215                 for data in lines:
216                     curcnt = int(data['COUNT'])
217                     nbits = int(data['DataUnitLen'])
218                     cipherkey = binascii.unhexlify(data['Key'])
219                     iv = struct.pack('QQ', int(data['DataUnitSeqNumber']), 0)
220                     pt = binascii.unhexlify(data['PT'])
221                     ct = binascii.unhexlify(data['CT'])
222
223                     if nbits % 128 != 0:
224                         # XXX - mark as skipped
225                         continue
226                     if swapptct:
227                         pt, ct = ct, pt
228                     # run the fun
229                     try:
230                         c = Crypto(meth, cipherkey, crid=crid)
231                         r = curfun(c, pt, iv)
232                     except EnvironmentError as e:
233                         # Can't test hashes the driver does not support.
234                         if e.errno != errno.EOPNOTSUPP:
235                             raise
236                         continue
237                     self.assertEqual(r, ct)
238
239         def runCCMEncrypt(self, fname):
240             with cryptodev.KATCCMParser(fname) as parser:
241                 self.runCCMEncryptWithParser(parser)
242
243         def runCCMEncryptWithParser(self, parser):
244             for data in next(parser):
245                 Nlen = int(data['Nlen'])
246                 if Nlen != 12:
247                     # OCF only supports 12 byte IVs
248                     continue
249                 key = binascii.unhexlify(data['Key'])
250                 nonce = binascii.unhexlify(data['Nonce'])
251                 Alen = int(data['Alen'])
252                 if Alen != 0:
253                     aad = binascii.unhexlify(data['Adata'])
254                 else:
255                     aad = None
256                 payload = binascii.unhexlify(data['Payload'])
257                 ct = binascii.unhexlify(data['CT'])
258
259                 try:
260                     c = Crypto(crid=crid,
261                         cipher=cryptodev.CRYPTO_AES_CCM_16,
262                         key=key,
263                         mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
264                         mackey=key, maclen=16)
265                     r, tag = Crypto.encrypt(c, payload,
266                         nonce, aad)
267                 except EnvironmentError as e:
268                     if e.errno != errno.EOPNOTSUPP:
269                         raise
270                     continue
271
272                 out = r + tag
273                 self.assertEqual(out, ct,
274                     "Count " + data['Count'] + " Actual: " + \
275                     repr(binascii.hexlify(out)) + " Expected: " + \
276                     repr(data) + " on " + cname)
277
278         def runCCMDecrypt(self, fname):
279             with cryptodev.KATCCMParser(fname) as parser:
280                 self.runCCMDecryptWithParser(parser)
281
282         def runCCMDecryptWithParser(self, parser):
283             # XXX: Note that all of the current CCM
284             # decryption test vectors use IV and tag sizes
285             # that aren't supported by OCF none of the
286             # tests are actually ran.
287             for data in next(parser):
288                 Nlen = int(data['Nlen'])
289                 if Nlen != 12:
290                     # OCF only supports 12 byte IVs
291                     continue
292                 Tlen = int(data['Tlen'])
293                 if Tlen != 16:
294                     # OCF only supports 16 byte tags
295                     continue
296                 key = binascii.unhexlify(data['Key'])
297                 nonce = binascii.unhexlify(data['Nonce'])
298                 Alen = int(data['Alen'])
299                 if Alen != 0:
300                     aad = binascii.unhexlify(data['Adata'])
301                 else:
302                     aad = None
303                 ct = binascii.unhexlify(data['CT'])
304                 tag = ct[-16:]
305                 ct = ct[:-16]
306
307                 try:
308                     c = Crypto(crid=crid,
309                         cipher=cryptodev.CRYPTO_AES_CCM_16,
310                         key=key,
311                         mac=cryptodev.CRYPTO_AES_CCM_CBC_MAC,
312                         mackey=key, maclen=16)
313                 except EnvironmentError as e:
314                     if e.errno != errno.EOPNOTSUPP:
315                         raise
316                     continue
317
318                 if data['Result'] == 'Fail':
319                     self.assertRaises(IOError,
320                         c.decrypt, payload, nonce, aad, tag)
321                 else:
322                     r = Crypto.decrypt(c, payload, nonce,
323                         aad, tag)
324
325                     payload = binascii.unhexlify(data['Payload'])
326                     plen = int(data('Plen'))
327                     payload = payload[:plen]
328                     self.assertEqual(r, payload,
329                         "Count " + data['Count'] + \
330                         " Actual: " + repr(binascii.hexlify(r)) + \
331                         " Expected: " + repr(data) + \
332                         " on " + cname)
333
334         ###############
335         ##### SHA #####
336         ###############
337         @unittest.skipIf(cname not in shamodules, 'skipping SHA on %s' % str(cname))
338         def test_sha(self):
339             for i in katg('shabytetestvectors', 'SHA*Msg.rsp'):
340                 self.runSHA(i)
341
342         def runSHA(self, fname):
343             # Skip SHA512_(224|256) tests
344             if fname.find('SHA512_') != -1:
345                 return
346             columns = [ 'Len', 'Msg', 'MD' ]
347             with cryptodev.KATParser(fname, columns) as parser:
348                 self.runSHAWithParser(parser)
349
350         def runSHAWithParser(self, parser):
351             for hashlength, lines in next(parser):
352                 # E.g., hashlength will be "L=20" (bytes)
353                 hashlen = int(hashlength.split("=")[1])
354
355                 if hashlen == 20:
356                     alg = cryptodev.CRYPTO_SHA1
357                 elif hashlen == 28:
358                     alg = cryptodev.CRYPTO_SHA2_224
359                 elif hashlen == 32:
360                     alg = cryptodev.CRYPTO_SHA2_256
361                 elif hashlen == 48:
362                     alg = cryptodev.CRYPTO_SHA2_384
363                 elif hashlen == 64:
364                     alg = cryptodev.CRYPTO_SHA2_512
365                 else:
366                     # Skip unsupported hashes
367                     # Slurp remaining input in section
368                     for data in lines:
369                         continue
370                     continue
371
372                 for data in lines:
373                     msg = binascii.unhexlify(data['Msg'])
374                     msg = msg[:int(data['Len'])]
375                     md = binascii.unhexlify(data['MD'])
376
377                     try:
378                         c = Crypto(mac=alg, crid=crid,
379                             maclen=hashlen)
380                     except EnvironmentError as e:
381                         # Can't test hashes the driver does not support.
382                         if e.errno != errno.EOPNOTSUPP:
383                             raise
384                         continue
385
386                     _, r = c.encrypt(msg, iv="")
387
388                     self.assertEqual(r, md, "Actual: " + \
389                         repr(binascii.hexlify(r)) + " Expected: " + repr(data) + " on " + cname)
390
391         @unittest.skipIf(cname not in shamodules, 'skipping SHA-HMAC on %s' % str(cname))
392         def test_sha1hmac(self):
393             for i in katg('hmactestvectors', 'HMAC.rsp'):
394                 self.runSHA1HMAC(i)
395
396         def runSHA1HMAC(self, fname):
397             columns = [ 'Count', 'Klen', 'Tlen', 'Key', 'Msg', 'Mac' ]
398             with cryptodev.KATParser(fname, columns) as parser:
399                 self.runSHA1HMACWithParser(parser)
400
401         def runSHA1HMACWithParser(self, parser):
402             for hashlength, lines in next(parser):
403                 # E.g., hashlength will be "L=20" (bytes)
404                 hashlen = int(hashlength.split("=")[1])
405
406                 blocksize = None
407                 if hashlen == 20:
408                     alg = cryptodev.CRYPTO_SHA1_HMAC
409                     blocksize = 64
410                 elif hashlen == 28:
411                     alg = cryptodev.CRYPTO_SHA2_224_HMAC
412                     blocksize = 64
413                 elif hashlen == 32:
414                     alg = cryptodev.CRYPTO_SHA2_256_HMAC
415                     blocksize = 64
416                 elif hashlen == 48:
417                     alg = cryptodev.CRYPTO_SHA2_384_HMAC
418                     blocksize = 128
419                 elif hashlen == 64:
420                     alg = cryptodev.CRYPTO_SHA2_512_HMAC
421                     blocksize = 128
422                 else:
423                     # Skip unsupported hashes
424                     # Slurp remaining input in section
425                     for data in lines:
426                         continue
427                     continue
428
429                 for data in lines:
430                     key = binascii.unhexlify(data['Key'])
431                     msg = binascii.unhexlify(data['Msg'])
432                     mac = binascii.unhexlify(data['Mac'])
433                     tlen = int(data['Tlen'])
434
435                     if len(key) > blocksize:
436                         continue
437
438                     try:
439                         c = Crypto(mac=alg, mackey=key,
440                             crid=crid, maclen=hashlen)
441                     except EnvironmentError as e:
442                         # Can't test hashes the driver does not support.
443                         if e.errno != errno.EOPNOTSUPP:
444                             raise
445                         continue
446
447                     _, r = c.encrypt(msg, iv="")
448
449                     self.assertEqual(r[:tlen], mac, "Actual: " + \
450                         repr(binascii.hexlify(r)) + " Expected: " + repr(data))
451
452     return GendCryptoTestCase
453
454 cryptosoft = GenTestCase('cryptosoft0')
455 aesni = GenTestCase('aesni0')
456 armv8crypto = GenTestCase('armv8crypto0')
457 ccr = GenTestCase('ccr0')
458 ccp = GenTestCase('ccp0')
459 ossl = GenTestCase('ossl0')
460 safexcel = GenTestCase('safexcel0')
461 qat = GenTestCase('qat0')
462
463 if __name__ == '__main__':
464     unittest.main()