]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - tests/rateLimiter.py
import zstd 1.3.7
[FreeBSD/FreeBSD.git] / tests / rateLimiter.py
1 #!/usr/bin/env python3
2
3 # ################################################################
4 # Copyright (c) 2018-present, Facebook, Inc.
5 # All rights reserved.
6 #
7 # This source code is licensed under both the BSD-style license (found in the
8 # LICENSE file in the root directory of this source tree) and the GPLv2 (found
9 # in the COPYING file in the root directory of this source tree).
10 # ##########################################################################
11
12 # Rate limiter, replacement for pv
13 # this rate limiter does not "catch up" after a blocking period
14 # Limitations:
15 # - only accepts limit speed in MB/s
16
17 import sys
18 import time
19
20 MB = 1024 * 1024
21 rate = float(sys.argv[1]) * MB
22 start = time.time()
23 total_read = 0
24
25 # sys.stderr.close()  # remove error message, for Ctrl+C
26
27 try:
28   buf = " "
29   while len(buf):
30     now = time.time()
31     to_read = max(int(rate * (now - start)), 1)
32     max_buf_size = 1 * MB
33     to_read = min(to_read, max_buf_size)
34     start = now
35
36     buf = sys.stdin.buffer.read(to_read)
37     sys.stdout.buffer.write(buf)
38
39 except (KeyboardInterrupt, BrokenPipeError) as e:
40     pass