]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
Vendor import of lldb release_39 branch r276489:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / gdbremote_testcase.py
1 """
2 Base class for gdb-remote test cases.
3 """
4
5 from __future__ import print_function
6
7
8
9 import errno
10 import os
11 import os.path
12 import platform
13 import random
14 import re
15 import select
16 import signal
17 import socket
18 import subprocess
19 import sys
20 import tempfile
21 import time
22 from lldbsuite.test import configuration
23 from lldbsuite.test.lldbtest import *
24 from lldbgdbserverutils import *
25 import logging
26
27 class _ConnectionRefused(IOError):
28     pass
29
30 class GdbRemoteTestCaseBase(TestBase):
31
32     NO_DEBUG_INFO_TESTCASE = True
33
34     _TIMEOUT_SECONDS = 7
35
36     _GDBREMOTE_KILL_PACKET = "$k#6b"
37
38     # Start the inferior separately, attach to the inferior on the stub command line.
39     _STARTUP_ATTACH = "attach"
40     # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
41     _STARTUP_ATTACH_MANUALLY = "attach_manually"
42     # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
43     _STARTUP_LAUNCH = "launch"
44
45     # GDB Signal numbers that are not target-specific used for common exceptions
46     TARGET_EXC_BAD_ACCESS      = 0x91
47     TARGET_EXC_BAD_INSTRUCTION = 0x92
48     TARGET_EXC_ARITHMETIC      = 0x93
49     TARGET_EXC_EMULATION       = 0x94
50     TARGET_EXC_SOFTWARE        = 0x95
51     TARGET_EXC_BREAKPOINT      = 0x96
52
53     _verbose_log_handler = None
54     _log_formatter = logging.Formatter(fmt='%(asctime)-15s %(levelname)-8s %(message)s')
55
56     def setUpBaseLogging(self):
57         self.logger = logging.getLogger(__name__)
58
59         if len(self.logger.handlers) > 0:
60             return # We have set up this handler already
61
62         self.logger.propagate = False
63         self.logger.setLevel(logging.DEBUG)
64
65         # log all warnings to stderr
66         handler = logging.StreamHandler()
67         handler.setLevel(logging.WARNING)
68         handler.setFormatter(self._log_formatter)
69         self.logger.addHandler(handler)
70
71
72     def isVerboseLoggingRequested(self):
73         # We will report our detailed logs if the user requested that the "gdb-remote" channel is
74         # logged.
75         return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
76
77     def setUp(self):
78         TestBase.setUp(self)
79
80         self.setUpBaseLogging()
81         self.debug_monitor_extra_args = []
82         self._pump_queues = socket_packet_pump.PumpQueues()
83
84         if self.isVerboseLoggingRequested():
85             # If requested, full logs go to a log file
86             self._verbose_log_handler = logging.FileHandler(self.log_basename + "-host.log")
87             self._verbose_log_handler.setFormatter(self._log_formatter)
88             self._verbose_log_handler.setLevel(logging.DEBUG)
89             self.logger.addHandler(self._verbose_log_handler)
90
91         self.test_sequence = GdbRemoteTestSequence(self.logger)
92         self.set_inferior_startup_launch()
93         self.port = self.get_next_port()
94         self.named_pipe_path = None
95         self.named_pipe = None
96         self.named_pipe_fd = None
97         self.stub_sends_two_stop_notifications_on_kill = False
98         if configuration.lldb_platform_url:
99             if configuration.lldb_platform_url.startswith('unix-'):
100                 url_pattern = '(.+)://\[?(.+?)\]?/.*'
101             else:
102                 url_pattern = '(.+)://(.+):\d+'
103             scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
104             if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
105                 self.stub_device = host
106                 self.stub_hostname = 'localhost'
107             else:
108                 self.stub_device = None
109                 self.stub_hostname = host
110         else:
111             self.stub_hostname = "localhost"
112
113     def tearDown(self):
114         self._pump_queues.verify_queues_empty()
115
116         self.logger.removeHandler(self._verbose_log_handler)
117         self._verbose_log_handler = None
118         TestBase.tearDown(self)
119
120     def getLocalServerLogFile(self):
121         return self.log_basename + "-server.log"
122
123     def setUpServerLogging(self, is_llgs):
124         if len(lldbtest_config.channels) == 0:
125             return # No logging requested
126
127         if lldb.remote_platform:
128             log_file = lldbutil.join_remote_paths(lldb.remote_platform.GetWorkingDirectory(), "server.log")
129         else:
130             log_file = self.getLocalServerLogFile()
131
132         if is_llgs:
133             self.debug_monitor_extra_args.append("--log-file=" + log_file)
134             self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
135         else:
136             self.debug_monitor_extra_args = ["--log-file=" + self.log_file, "--log-flags=0x800000"]
137
138     def get_next_port(self):
139         return 12000 + random.randint(0,3999)
140
141     def reset_test_sequence(self):
142         self.test_sequence = GdbRemoteTestSequence(self.logger)
143
144     def create_named_pipe(self):
145         # Create a temp dir and name for a pipe.
146         temp_dir = tempfile.mkdtemp()
147         named_pipe_path = os.path.join(temp_dir, "stub_port_number")
148
149         # Create the named pipe.
150         os.mkfifo(named_pipe_path)
151
152         # Open the read side of the pipe in non-blocking mode.  This will return right away, ready or not.
153         named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
154
155         # Create the file for the named pipe.  Note this will follow semantics of
156         # a non-blocking read side of a named pipe, which has different semantics
157         # than a named pipe opened for read in non-blocking mode.
158         named_pipe = os.fdopen(named_pipe_fd, "r")
159         self.assertIsNotNone(named_pipe)
160
161         def shutdown_named_pipe():
162             # Close the pipe.
163             try:
164                 named_pipe.close()
165             except:
166                 print("failed to close named pipe")
167                 None
168
169             # Delete the pipe.
170             try:
171                 os.remove(named_pipe_path)
172             except:
173                 print("failed to delete named pipe: {}".format(named_pipe_path))
174                 None
175
176             # Delete the temp directory.
177             try:
178                 os.rmdir(temp_dir)
179             except:
180                 print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
181                 None
182
183         # Add the shutdown hook to clean up the named pipe.
184         self.addTearDownHook(shutdown_named_pipe)
185
186         # Clear the port so the stub selects a port number.
187         self.port = 0
188
189         return (named_pipe_path, named_pipe, named_pipe_fd)
190
191     def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
192         # Wait for something to read with a max timeout.
193         (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
194         self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
195         self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
196
197         # Read the port from the named pipe.
198         stub_port_raw = self.named_pipe.read()
199         self.assertIsNotNone(stub_port_raw)
200         self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
201
202         # Trim null byte, convert to int.
203         stub_port_raw = stub_port_raw[:-1]
204         stub_port = int(stub_port_raw)
205         self.assertTrue(stub_port > 0)
206
207         return stub_port
208
209     def init_llgs_test(self, use_named_pipe=True):
210         if lldb.remote_platform:
211             # Remote platforms don't support named pipe based port negotiation
212             use_named_pipe = False
213
214             # Grab the ppid from /proc/[shell pid]/stat
215             err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
216             self.assertTrue(err.Success() and retcode == 0,
217                     "Failed to read file /proc/$$/stat: %s, retcode: %d" % (err.GetCString(), retcode))
218
219             # [pid] ([executable]) [state] [*ppid*]
220             pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
221             err, retcode, ls_output = self.run_platform_command("ls -l /proc/%s/exe" % pid)
222             self.assertTrue(err.Success() and retcode == 0,
223                     "Failed to read file /proc/%s/exe: %s, retcode: %d" % (pid, err.GetCString(), retcode))
224             exe = ls_output.split()[-1]
225
226             # If the binary has been deleted, the link name has " (deleted)" appended.
227             # Remove if it's there.
228             self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
229         else:
230             self.debug_monitor_exe = get_lldb_server_exe()
231             if not self.debug_monitor_exe:
232                 self.skipTest("lldb-server exe not found")
233
234         self.debug_monitor_extra_args = ["gdbserver"]
235         self.setUpServerLogging(is_llgs=True)
236
237         if use_named_pipe:
238             (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
239
240     def init_debugserver_test(self, use_named_pipe=True):
241         self.debug_monitor_exe = get_debugserver_exe()
242         if not self.debug_monitor_exe:
243             self.skipTest("debugserver exe not found")
244         self.setUpServerLogging(is_llgs=False)
245         if use_named_pipe:
246             (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
247         # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
248         # when the process truly dies.
249         self.stub_sends_two_stop_notifications_on_kill = True
250
251     def forward_adb_port(self, source, target, direction, device):
252         adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
253         def remove_port_forward():
254             subprocess.call(adb + [ "--remove", "tcp:%d" % source])
255
256         subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
257         self.addTearDownHook(remove_port_forward)
258
259     def _verify_socket(self, sock):
260         # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
261         # connect() attempt. However, due to the way how ADB forwarding works, on android targets
262         # the connect() will always be successful, but the connection will be immediately dropped
263         # if ADB could not connect on the remote side. This function tries to detect this
264         # situation, and report it as "connection refused" so that the upper layers attempt the
265         # connection again.
266         triple = self.dbg.GetSelectedPlatform().GetTriple()
267         if not re.match(".*-.*-.*-android", triple):
268             return # Not android.
269         can_read, _, _ = select.select([sock], [], [], 0.1)
270         if sock not in can_read:
271             return # Data is not available, but the connection is alive.
272         if len(sock.recv(1, socket.MSG_PEEK)) == 0:
273             raise _ConnectionRefused() # Got EOF, connection dropped.
274
275     def create_socket(self):
276         sock = socket.socket()
277         logger = self.logger
278
279         triple = self.dbg.GetSelectedPlatform().GetTriple()
280         if re.match(".*-.*-.*-android", triple):
281             self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
282
283         logger.info("Connecting to debug monitor on %s:%d", self.stub_hostname, self.port)
284         connect_info = (self.stub_hostname, self.port)
285         try:
286             sock.connect(connect_info)
287         except socket.error as serr:
288             if serr.errno == errno.ECONNREFUSED:
289                 raise _ConnectionRefused()
290             raise serr
291
292         def shutdown_socket():
293             if sock:
294                 try:
295                     # send the kill packet so lldb-server shuts down gracefully
296                     sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
297                 except:
298                     logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
299
300                 try:
301                     sock.close()
302                 except:
303                     logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
304
305         self.addTearDownHook(shutdown_socket)
306
307         self._verify_socket(sock)
308
309         return sock
310
311     def set_inferior_startup_launch(self):
312         self._inferior_startup = self._STARTUP_LAUNCH
313
314     def set_inferior_startup_attach(self):
315         self._inferior_startup = self._STARTUP_ATTACH
316
317     def set_inferior_startup_attach_manually(self):
318         self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
319
320     def get_debug_monitor_command_line_args(self, attach_pid=None):
321         if lldb.remote_platform:
322             commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
323         else:
324             commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
325
326         if attach_pid:
327             commandline_args += ["--attach=%d" % attach_pid]
328         if self.named_pipe_path:
329             commandline_args += ["--named-pipe", self.named_pipe_path]
330         return commandline_args
331
332     def launch_debug_monitor(self, attach_pid=None, logfile=None):
333         # Create the command line.
334         commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
335
336         # Start the server.
337         server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
338         self.addTearDownHook(self.cleanupSubprocesses)
339         self.assertIsNotNone(server)
340
341         # If we're receiving the stub's listening port from the named pipe, do that here.
342         if self.named_pipe:
343             self.port = self.get_stub_port_from_named_socket()
344
345         return server
346
347     def connect_to_debug_monitor(self, attach_pid=None):
348         if self.named_pipe:
349             # Create the stub.
350             server = self.launch_debug_monitor(attach_pid=attach_pid)
351             self.assertIsNotNone(server)
352
353             def shutdown_debug_monitor():
354                 try:
355                     server.terminate()
356                 except:
357                     logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
358             self.addTearDownHook(shutdown_debug_monitor)
359
360             # Schedule debug monitor to be shut down during teardown.
361             logger = self.logger
362
363             # Attach to the stub and return a socket opened to it.
364             self.sock = self.create_socket()
365             return server
366
367         # We're using a random port algorithm to try not to collide with other ports,
368         # and retry a max # times.
369         attempts = 0
370         MAX_ATTEMPTS = 20
371
372         while attempts < MAX_ATTEMPTS:
373             server = self.launch_debug_monitor(attach_pid=attach_pid)
374
375             # Schedule debug monitor to be shut down during teardown.
376             logger = self.logger
377             def shutdown_debug_monitor():
378                 try:
379                     server.terminate()
380                 except:
381                     logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
382             self.addTearDownHook(shutdown_debug_monitor)
383
384             connect_attemps = 0
385             MAX_CONNECT_ATTEMPTS = 10
386
387             while connect_attemps < MAX_CONNECT_ATTEMPTS:
388                 # Create a socket to talk to the server
389                 try:
390                     logger.info("Connect attempt %d", connect_attemps+1)
391                     self.sock = self.create_socket()
392                     return server
393                 except _ConnectionRefused as serr:
394                     # Ignore, and try again.
395                     pass
396                 time.sleep(0.5)
397                 connect_attemps += 1
398
399             # We should close the server here to be safe.
400             server.terminate()
401
402             # Increment attempts.
403             print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
404             attempts += 1
405
406             # And wait a random length of time before next attempt, to avoid collisions.
407             time.sleep(random.randint(1,5))
408             
409             # Now grab a new port number.
410             self.port = self.get_next_port()
411
412         raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
413
414     def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
415         # We're going to start a child process that the debug monitor stub can later attach to.
416         # This process needs to be started so that it just hangs around for a while.  We'll
417         # have it sleep.
418         if not exe_path:
419             exe_path = os.path.abspath("a.out")
420
421         args = []
422         if inferior_args:
423             args.extend(inferior_args)
424         if sleep_seconds:
425             args.append("sleep:%d" % sleep_seconds)
426
427         inferior = self.spawnSubprocess(exe_path, args)
428         def shutdown_process_for_attach():
429             try:
430                 inferior.terminate()
431             except:
432                 logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
433         self.addTearDownHook(shutdown_process_for_attach)
434         return inferior
435
436     def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
437         """Prep the debug monitor, the inferior, and the expected packet stream.
438
439         Handle the separate cases of using the debug monitor in attach-to-inferior mode
440         and in launch-inferior mode.
441
442         For attach-to-inferior mode, the inferior process is first started, then
443         the debug monitor is started in attach to pid mode (using --attach on the
444         stub command line), and the no-ack-mode setup is appended to the packet
445         stream.  The packet stream is not yet executed, ready to have more expected
446         packet entries added to it.
447
448         For launch-inferior mode, the stub is first started, then no ack mode is
449         setup on the expected packet stream, then the verified launch packets are added
450         to the expected socket stream.  The packet stream is not yet executed, ready
451         to have more expected packet entries added to it.
452
453         The return value is:
454         {inferior:<inferior>, server:<server>}
455         """
456         inferior = None
457         attach_pid = None
458
459         if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
460             # Launch the process that we'll use as the inferior.
461             inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
462             self.assertIsNotNone(inferior)
463             self.assertTrue(inferior.pid > 0)
464             if self._inferior_startup == self._STARTUP_ATTACH:
465                 # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
466                 attach_pid = inferior.pid
467
468         if self._inferior_startup == self._STARTUP_LAUNCH:
469             # Build launch args
470             if not inferior_exe_path:
471                 inferior_exe_path = os.path.abspath("a.out")
472
473             if lldb.remote_platform:
474                 remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
475                 remote_file_spec = lldb.SBFileSpec(remote_path, False)
476                 err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
477                 if err.Fail():
478                     raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
479                 inferior_exe_path = remote_path
480
481             launch_args = [inferior_exe_path]
482             if inferior_args:
483                 launch_args.extend(inferior_args)
484
485         # Launch the debug monitor stub, attaching to the inferior.
486         server = self.connect_to_debug_monitor(attach_pid=attach_pid)
487         self.assertIsNotNone(server)
488
489         # Build the expected protocol stream
490         self.add_no_ack_remote_stream()
491         if self._inferior_startup == self._STARTUP_LAUNCH:
492             self.add_verified_launch_packets(launch_args)
493
494         return {"inferior":inferior, "server":server}
495
496     def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
497         response = ""
498         timeout_time = time.time() + timeout_seconds
499
500         while not expected_content_regex.match(response) and time.time() < timeout_time: 
501             can_read, _, _ = select.select([sock], [], [], timeout_seconds)
502             if can_read and sock in can_read:
503                 recv_bytes = sock.recv(4096)
504                 if recv_bytes:
505                     response += recv_bytes
506
507         self.assertTrue(expected_content_regex.match(response))
508
509     def expect_socket_send(self, sock, content, timeout_seconds):
510         request_bytes_remaining = content
511         timeout_time = time.time() + timeout_seconds
512
513         while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
514             _, can_write, _ = select.select([], [sock], [], timeout_seconds)
515             if can_write and sock in can_write:
516                 written_byte_count = sock.send(request_bytes_remaining)
517                 request_bytes_remaining = request_bytes_remaining[written_byte_count:]
518         self.assertEqual(len(request_bytes_remaining), 0)
519
520     def do_handshake(self, stub_socket, timeout_seconds=5):
521         # Write the ack.
522         self.expect_socket_send(stub_socket, "+", timeout_seconds)
523
524         # Send the start no ack mode packet.
525         NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
526         bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
527         self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
528
529         # Receive the ack and "OK"
530         self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
531
532         # Send the final ack.
533         self.expect_socket_send(stub_socket, "+", timeout_seconds)
534
535     def add_no_ack_remote_stream(self):
536         self.test_sequence.add_log_lines(
537             ["read packet: +",
538              "read packet: $QStartNoAckMode#b0",
539              "send packet: +",
540              "send packet: $OK#9a",
541              "read packet: +"],
542             True)
543
544     def add_verified_launch_packets(self, launch_args):
545         self.test_sequence.add_log_lines(
546             ["read packet: %s" % build_gdbremote_A_packet(launch_args),
547              "send packet: $OK#00",
548              "read packet: $qLaunchSuccess#a5",
549              "send packet: $OK#00"],
550             True)
551
552     def add_thread_suffix_request_packets(self):
553         self.test_sequence.add_log_lines(
554             ["read packet: $QThreadSuffixSupported#e4",
555              "send packet: $OK#00",
556             ], True)
557
558     def add_process_info_collection_packets(self):
559         self.test_sequence.add_log_lines(
560             ["read packet: $qProcessInfo#dc",
561               { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
562             True)
563
564     _KNOWN_PROCESS_INFO_KEYS = [
565         "pid",
566         "parent-pid",
567         "real-uid",
568         "real-gid",
569         "effective-uid",
570         "effective-gid",
571         "cputype",
572         "cpusubtype",
573         "ostype",
574         "triple",
575         "vendor",
576         "endian",
577         "ptrsize"
578         ]
579
580     def parse_process_info_response(self, context):
581         # Ensure we have a process info response.
582         self.assertIsNotNone(context)
583         process_info_raw = context.get("process_info_raw")
584         self.assertIsNotNone(process_info_raw)
585
586         # Pull out key:value; pairs.
587         process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
588
589         # Validate keys are known.
590         for (key, val) in list(process_info_dict.items()):
591             self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
592             self.assertIsNotNone(val)
593
594         return process_info_dict
595
596     def add_register_info_collection_packets(self):
597         self.test_sequence.add_log_lines(
598             [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
599               "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
600               "save_key":"reg_info_responses" } ],
601             True)
602
603     def parse_register_info_packets(self, context):
604         """Return an array of register info dictionaries, one per register info."""
605         reg_info_responses = context.get("reg_info_responses")
606         self.assertIsNotNone(reg_info_responses)
607
608         # Parse register infos.
609         return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
610
611     def expect_gdbremote_sequence(self, timeout_seconds=None):
612         if not timeout_seconds:
613             timeout_seconds = self._TIMEOUT_SECONDS
614         return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence,
615                 self._pump_queues, timeout_seconds, self.logger)
616
617     _KNOWN_REGINFO_KEYS = [
618         "name",
619         "alt-name",
620         "bitsize",
621         "offset",
622         "encoding",
623         "format",
624         "set",
625         "gcc",
626         "ehframe",
627         "dwarf",
628         "generic",
629         "container-regs",
630         "invalidate-regs"
631     ]
632
633     def assert_valid_reg_info(self, reg_info):
634         # Assert we know about all the reginfo keys parsed.
635         for key in reg_info:
636             self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
637
638         # Check the bare-minimum expected set of register info keys.
639         self.assertTrue("name" in reg_info)
640         self.assertTrue("bitsize" in reg_info)
641         self.assertTrue("offset" in reg_info)
642         self.assertTrue("encoding" in reg_info)
643         self.assertTrue("format" in reg_info)
644
645     def find_pc_reg_info(self, reg_infos):
646         lldb_reg_index = 0
647         for reg_info in reg_infos:
648             if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
649                 return (lldb_reg_index, reg_info)
650             lldb_reg_index += 1
651
652         return (None, None)
653
654     def add_lldb_register_index(self, reg_infos):
655         """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
656
657         We'll use this when we want to call packets like P/p with a register index but do so
658         on only a subset of the full register info set.
659         """
660         self.assertIsNotNone(reg_infos)
661
662         reg_index = 0
663         for reg_info in reg_infos:
664             reg_info["lldb_register_index"] = reg_index
665             reg_index += 1
666
667     def add_query_memory_region_packets(self, address):
668         self.test_sequence.add_log_lines(
669             ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
670              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
671             True)
672
673     def parse_key_val_dict(self, key_val_text, allow_dupes=True):
674         self.assertIsNotNone(key_val_text)
675         kv_dict = {}
676         for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
677             key = match.group(1)
678             val = match.group(2)
679             if key in kv_dict:
680                 if allow_dupes:
681                     if type(kv_dict[key]) == list:
682                         kv_dict[key].append(val)
683                     else:
684                         # Promote to list
685                         kv_dict[key] = [kv_dict[key], val]
686                 else:
687                     self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
688             else:
689                 kv_dict[key] = val
690         return kv_dict
691
692     def parse_memory_region_packet(self, context):
693         # Ensure we have a context.
694         self.assertIsNotNone(context.get("memory_region_response"))
695
696         # Pull out key:value; pairs.
697         mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
698
699         # Validate keys are known.
700         for (key, val) in list(mem_region_dict.items()):
701             self.assertTrue(key in ["start", "size", "permissions", "error"])
702             self.assertIsNotNone(val)
703
704         # Return the dictionary of key-value pairs for the memory region.
705         return mem_region_dict
706
707     def assert_address_within_memory_region(self, test_address, mem_region_dict):
708         self.assertIsNotNone(mem_region_dict)
709         self.assertTrue("start" in mem_region_dict)
710         self.assertTrue("size" in mem_region_dict)
711
712         range_start = int(mem_region_dict["start"], 16)
713         range_size = int(mem_region_dict["size"], 16)
714         range_end = range_start + range_size
715
716         if test_address < range_start:
717             self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
718         elif test_address >= range_end:
719             self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
720
721     def add_threadinfo_collection_packets(self):
722         self.test_sequence.add_log_lines(
723             [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
724                 "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
725               "save_key":"threadinfo_responses" } ],
726             True)
727
728     def parse_threadinfo_packets(self, context):
729         """Return an array of thread ids (decimal ints), one per thread."""
730         threadinfo_responses = context.get("threadinfo_responses")
731         self.assertIsNotNone(threadinfo_responses)
732
733         thread_ids = []
734         for threadinfo_response in threadinfo_responses:
735             new_thread_infos = parse_threadinfo_response(threadinfo_response)
736             thread_ids.extend(new_thread_infos)
737         return thread_ids
738
739     def wait_for_thread_count(self, thread_count, timeout_seconds=3):
740         start_time = time.time()
741         timeout_time = start_time + timeout_seconds
742
743         actual_thread_count = 0
744         while actual_thread_count < thread_count:
745             self.reset_test_sequence()
746             self.add_threadinfo_collection_packets()
747
748             context = self.expect_gdbremote_sequence()
749             self.assertIsNotNone(context)
750
751             threads = self.parse_threadinfo_packets(context)
752             self.assertIsNotNone(threads)
753
754             actual_thread_count = len(threads)
755
756             if time.time() > timeout_time:
757                 raise Exception(
758                     'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
759                         timeout_seconds, thread_count, actual_thread_count))
760
761         return threads
762
763     def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
764         self.test_sequence.add_log_lines(
765             [# Set the breakpoint.
766              "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
767              # Verify the stub could set it.
768              "send packet: $OK#00",
769              ], True)
770
771         if (do_continue):
772             self.test_sequence.add_log_lines(
773                 [# Continue the inferior.
774                  "read packet: $c#63",
775                  # Expect a breakpoint stop report.
776                  {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
777                  ], True)        
778
779     def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
780         self.test_sequence.add_log_lines(
781             [# Remove the breakpoint.
782              "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
783              # Verify the stub could unset it.
784              "send packet: $OK#00",
785             ], True)
786
787     def add_qSupported_packets(self):
788         self.test_sequence.add_log_lines(
789             ["read packet: $qSupported#00",
790              {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
791             ], True)
792
793     _KNOWN_QSUPPORTED_STUB_FEATURES = [
794         "augmented-libraries-svr4-read",
795         "PacketSize",
796         "QStartNoAckMode",
797         "QThreadSuffixSupported",
798         "QListThreadsInStopReply",
799         "qXfer:auxv:read",
800         "qXfer:libraries:read",
801         "qXfer:libraries-svr4:read",
802         "qXfer:features:read",
803         "qEcho"
804     ]
805
806     def parse_qSupported_response(self, context):
807         self.assertIsNotNone(context)
808
809         raw_response = context.get("qSupported_response")
810         self.assertIsNotNone(raw_response)
811
812         # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
813         # +,-,? is stripped from the key and set as the value.
814         supported_dict = {}
815         for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
816             key = match.group(1)
817             val = match.group(3)
818
819             # key=val: store as is
820             if val and len(val) > 0:
821                 supported_dict[key] = val
822             else:
823                 if len(key) < 2:
824                     raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
825                 supported_type = key[-1]
826                 key = key[:-1]
827                 if not supported_type in ["+", "-", "?"]:
828                     raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
829                 supported_dict[key] = supported_type 
830             # Ensure we know the supported element
831             if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
832                 raise Exception("unknown qSupported stub feature reported: %s" % key)
833
834         return supported_dict
835
836     def run_process_then_stop(self, run_seconds=1):
837         # Tell the stub to continue.
838         self.test_sequence.add_log_lines(
839              ["read packet: $vCont;c#a8"],
840              True)
841         context = self.expect_gdbremote_sequence()
842
843         # Wait for run_seconds.
844         time.sleep(run_seconds)
845
846         # Send an interrupt, capture a T response.
847         self.reset_test_sequence()
848         self.test_sequence.add_log_lines(
849             ["read packet: {}".format(chr(3)),
850              {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
851             True)
852         context = self.expect_gdbremote_sequence()
853         self.assertIsNotNone(context)
854         self.assertIsNotNone(context.get("stop_result"))
855
856         return context
857
858     def select_modifiable_register(self, reg_infos):
859         """Find a register that can be read/written freely."""
860         PREFERRED_REGISTER_NAMES = set(["rax",])
861
862         # First check for the first register from the preferred register name set.
863         alternative_register_index = None
864
865         self.assertIsNotNone(reg_infos)
866         for reg_info in reg_infos:
867             if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
868                 # We found a preferred register.  Use it.
869                 return reg_info["lldb_register_index"]
870             if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
871                 # A frame pointer register will do as a register to modify temporarily.
872                 alternative_register_index = reg_info["lldb_register_index"]
873
874         # We didn't find a preferred register.  Return whatever alternative register
875         # we found, if any.
876         return alternative_register_index
877
878     def extract_registers_from_stop_notification(self, stop_key_vals_text):
879         self.assertIsNotNone(stop_key_vals_text)
880         kv_dict = self.parse_key_val_dict(stop_key_vals_text)
881
882         registers = {}
883         for (key, val) in list(kv_dict.items()):
884             if re.match(r"^[0-9a-fA-F]+$", key):
885                 registers[int(key, 16)] = val
886         return registers
887
888     def gather_register_infos(self):
889         self.reset_test_sequence()
890         self.add_register_info_collection_packets()
891
892         context = self.expect_gdbremote_sequence()
893         self.assertIsNotNone(context)
894
895         reg_infos = self.parse_register_info_packets(context)
896         self.assertIsNotNone(reg_infos)
897         self.add_lldb_register_index(reg_infos)
898
899         return reg_infos
900
901     def find_generic_register_with_name(self, reg_infos, generic_name):
902         self.assertIsNotNone(reg_infos)
903         for reg_info in reg_infos:
904             if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
905                 return reg_info
906         return None
907
908     def decode_gdbremote_binary(self, encoded_bytes):
909         decoded_bytes = ""
910         i = 0
911         while i < len(encoded_bytes):
912             if encoded_bytes[i] == "}":
913                 # Handle escaped char.
914                 self.assertTrue(i + 1 < len(encoded_bytes))
915                 decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
916                 i +=2
917             elif encoded_bytes[i] == "*":
918                 # Handle run length encoding.
919                 self.assertTrue(len(decoded_bytes) > 0)
920                 self.assertTrue(i + 1 < len(encoded_bytes))
921                 repeat_count = ord(encoded_bytes[i+1]) - 29
922                 decoded_bytes += decoded_bytes[-1] * repeat_count
923                 i += 2
924             else:
925                 decoded_bytes += encoded_bytes[i]
926                 i += 1
927         return decoded_bytes
928
929     def build_auxv_dict(self, endian, word_size, auxv_data):
930         self.assertIsNotNone(endian)
931         self.assertIsNotNone(word_size)
932         self.assertIsNotNone(auxv_data)
933
934         auxv_dict = {}
935
936         while len(auxv_data) > 0:
937             # Chop off key.
938             raw_key = auxv_data[:word_size]
939             auxv_data = auxv_data[word_size:]
940
941             # Chop of value.
942             raw_value = auxv_data[:word_size]
943             auxv_data = auxv_data[word_size:]
944
945             # Convert raw text from target endian.
946             key = unpack_endian_binary_string(endian, raw_key)
947             value = unpack_endian_binary_string(endian, raw_value)
948
949             # Handle ending entry.
950             if key == 0:
951                 self.assertEqual(value, 0)
952                 return auxv_dict
953
954             # The key should not already be present.
955             self.assertFalse(key in auxv_dict)
956             auxv_dict[key] = value
957
958         self.fail("should not reach here - implies required double zero entry not found")
959         return auxv_dict
960
961     def read_binary_data_in_chunks(self, command_prefix, chunk_length):
962         """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
963         offset = 0
964         done = False
965         decoded_data = ""
966
967         while not done:
968             # Grab the next iteration of data.
969             self.reset_test_sequence()
970             self.test_sequence.add_log_lines([
971                 "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
972                 {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
973                 ], True)
974
975             context = self.expect_gdbremote_sequence()
976             self.assertIsNotNone(context)
977
978             response_type = context.get("response_type")
979             self.assertIsNotNone(response_type)
980             self.assertTrue(response_type in ["l", "m"])
981
982             # Move offset along.
983             offset += chunk_length
984
985             # Figure out if we're done.  We're done if the response type is l.
986             done = response_type == "l"
987
988             # Decode binary data.
989             content_raw = context.get("content_raw")
990             if content_raw and len(content_raw) > 0:
991                 self.assertIsNotNone(content_raw)
992                 decoded_data += self.decode_gdbremote_binary(content_raw)
993         return decoded_data
994
995     def add_interrupt_packets(self):
996         self.test_sequence.add_log_lines([
997             # Send the intterupt.
998             "read packet: {}".format(chr(3)),
999             # And wait for the stop notification.
1000             {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
1001             ], True)
1002
1003     def parse_interrupt_packets(self, context):
1004         self.assertIsNotNone(context.get("stop_signo"))
1005         self.assertIsNotNone(context.get("stop_key_val_text"))
1006         return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
1007
1008     def add_QSaveRegisterState_packets(self, thread_id):
1009         if thread_id:
1010             # Use the thread suffix form.
1011             request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
1012         else:
1013             request = "read packet: $QSaveRegisterState#00"
1014             
1015         self.test_sequence.add_log_lines([
1016             request,
1017             {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
1018             ], True)
1019
1020     def parse_QSaveRegisterState_response(self, context):
1021         self.assertIsNotNone(context)
1022
1023         save_response = context.get("save_response")
1024         self.assertIsNotNone(save_response)
1025
1026         if len(save_response) < 1 or save_response[0] == "E":
1027             # error received
1028             return (False, None)
1029         else:
1030             return (True, int(save_response))
1031
1032     def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1033         if thread_id:
1034             # Use the thread suffix form.
1035             request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
1036         else:
1037             request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
1038
1039         self.test_sequence.add_log_lines([
1040             request,
1041             "send packet: $OK#00"
1042             ], True)
1043
1044     def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
1045         self.assertIsNotNone(reg_infos)
1046
1047         successful_writes = 0
1048         failed_writes = 0
1049
1050         for reg_info in reg_infos:
1051             # Use the lldb register index added to the reg info.  We're not necessarily
1052             # working off a full set of register infos, so an inferred register index could be wrong. 
1053             reg_index = reg_info["lldb_register_index"]
1054             self.assertIsNotNone(reg_index)
1055
1056             reg_byte_size = int(reg_info["bitsize"])/8
1057             self.assertTrue(reg_byte_size > 0)
1058
1059             # Handle thread suffix.
1060             if thread_id:
1061                 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1062             else:
1063                 p_request = "read packet: $p{:x}#00".format(reg_index)
1064
1065             # Read the existing value.
1066             self.reset_test_sequence()
1067             self.test_sequence.add_log_lines([
1068                 p_request,
1069                 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1070                 ], True)
1071             context = self.expect_gdbremote_sequence()
1072             self.assertIsNotNone(context)
1073
1074             # Verify the response length.
1075             p_response = context.get("p_response")
1076             self.assertIsNotNone(p_response)
1077             initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1078
1079             # Flip the value by xoring with all 1s
1080             all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
1081             flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1082             # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1083
1084             # Handle thread suffix for P.
1085             if thread_id:
1086                 P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1087             else:
1088                 P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
1089
1090             # Write the flipped value to the register.
1091             self.reset_test_sequence()
1092             self.test_sequence.add_log_lines([
1093                 P_request,
1094                 { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
1095                 ], True)
1096             context = self.expect_gdbremote_sequence()
1097             self.assertIsNotNone(context)
1098
1099             # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1100             # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1101             # all flipping perfectly.
1102             P_response = context.get("P_response")
1103             self.assertIsNotNone(P_response)
1104             if P_response == "OK":
1105                 successful_writes += 1
1106             else:
1107                 failed_writes += 1
1108                 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1109
1110             # Read back the register value, ensure it matches the flipped value.
1111             if P_response == "OK":
1112                 self.reset_test_sequence()
1113                 self.test_sequence.add_log_lines([
1114                     p_request,
1115                     { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1116                     ], True)
1117                 context = self.expect_gdbremote_sequence()
1118                 self.assertIsNotNone(context)
1119
1120                 verify_p_response_raw = context.get("p_response")
1121                 self.assertIsNotNone(verify_p_response_raw)
1122                 verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
1123
1124                 if verify_bits != flipped_bits_int:
1125                     # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1126                     # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1127                     successful_writes -= 1
1128                     failed_writes +=1
1129
1130         return (successful_writes, failed_writes)
1131
1132     def is_bit_flippable_register(self, reg_info):
1133         if not reg_info:
1134             return False
1135         if not "set" in reg_info:
1136             return False
1137         if reg_info["set"] != "General Purpose Registers":
1138             return False
1139         if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1140             # Don't try to bit flip registers contained in another register.
1141             return False
1142         if re.match("^.s$", reg_info["name"]):
1143             # This is a 2-letter register name that ends in "s", like a segment register.
1144             # Don't try to bit flip these.
1145             return False
1146         if re.match("^(c|)psr$", reg_info["name"]):
1147             # This is an ARM program status register; don't flip it.
1148             return False
1149         # Okay, this looks fine-enough.
1150         return True
1151
1152     def read_register_values(self, reg_infos, endian, thread_id=None):
1153         self.assertIsNotNone(reg_infos)
1154         values = {}
1155
1156         for reg_info in reg_infos:
1157             # We append a register index when load reg infos so we can work with subsets.
1158             reg_index = reg_info.get("lldb_register_index")
1159             self.assertIsNotNone(reg_index)
1160
1161             # Handle thread suffix.
1162             if thread_id:
1163                 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1164             else:
1165                 p_request = "read packet: $p{:x}#00".format(reg_index)
1166
1167             # Read it with p.
1168             self.reset_test_sequence()
1169             self.test_sequence.add_log_lines([
1170                 p_request,
1171                 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1172                 ], True)
1173             context = self.expect_gdbremote_sequence()
1174             self.assertIsNotNone(context)
1175
1176             # Convert value from target endian to integral.
1177             p_response = context.get("p_response")
1178             self.assertIsNotNone(p_response)
1179             self.assertTrue(len(p_response) > 0)
1180             self.assertFalse(p_response[0] == "E")
1181             
1182             values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1183             
1184         return values
1185
1186     def add_vCont_query_packets(self):
1187         self.test_sequence.add_log_lines([
1188             "read packet: $vCont?#49",
1189             {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
1190             ], True)
1191
1192     def parse_vCont_query_response(self, context):
1193         self.assertIsNotNone(context)
1194         vCont_query_response = context.get("vCont_query_response")
1195
1196         # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
1197         if not vCont_query_response or len(vCont_query_response) == 0:
1198             return {}
1199
1200         return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
1201
1202     def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
1203         """Used by single step test that appears in a few different contexts."""
1204         single_step_count = 0
1205
1206         while single_step_count < max_step_count:
1207             self.assertIsNotNone(thread_id)
1208
1209             # Build the packet for the single step instruction.  We replace {thread}, if present, with the thread_id.
1210             step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1211             # print("\nstep_packet created: {}\n".format(step_packet))
1212
1213             # Single step.
1214             self.reset_test_sequence()
1215             if use_Hc_packet:
1216                 self.test_sequence.add_log_lines(
1217                     [# Set the continue thread.
1218                      "read packet: $Hc{0:x}#00".format(thread_id),
1219                      "send packet: $OK#00",
1220                      ], True)
1221             self.test_sequence.add_log_lines([
1222                  # Single step.
1223                  step_packet,
1224                  # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1225                  # Expect a breakpoint stop report.
1226                  {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
1227                  ], True)
1228             context = self.expect_gdbremote_sequence()
1229             self.assertIsNotNone(context)
1230             self.assertIsNotNone(context.get("stop_signo"))
1231             self.assertEqual(int(context.get("stop_signo"), 16),
1232                     lldbutil.get_signal_number('SIGTRAP'))
1233
1234             single_step_count += 1
1235
1236             # See if the predicate is true.  If so, we're done.
1237             if predicate(args):
1238                 return (True, single_step_count)
1239
1240         # The predicate didn't return true within the runaway step count.
1241         return (False, single_step_count)
1242
1243     def g_c1_c2_contents_are(self, args):
1244         """Used by single step test that appears in a few different contexts."""
1245         g_c1_address = args["g_c1_address"]
1246         g_c2_address = args["g_c2_address"]
1247         expected_g_c1 = args["expected_g_c1"]
1248         expected_g_c2 = args["expected_g_c2"]
1249
1250         # Read g_c1 and g_c2 contents.
1251         self.reset_test_sequence()
1252         self.test_sequence.add_log_lines(
1253             ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1254              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
1255              "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1256              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
1257             True)
1258
1259         # Run the packet stream.
1260         context = self.expect_gdbremote_sequence()
1261         self.assertIsNotNone(context)
1262
1263         # Check if what we read from inferior memory is what we are expecting.
1264         self.assertIsNotNone(context.get("g_c1_contents"))
1265         self.assertIsNotNone(context.get("g_c2_contents"))
1266
1267         return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
1268
1269     def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
1270         """Used by single step test that appears in a few different contexts."""
1271         # Start up the inferior.
1272         procs = self.prep_debug_monitor_and_inferior(
1273             inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
1274
1275         # Run the process
1276         self.test_sequence.add_log_lines(
1277             [# Start running after initial stop.
1278              "read packet: $c#63",
1279              # Match output line that prints the memory address of the function call entry point.
1280              # Note we require launch-only testing so we can get inferior otuput.
1281              { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 
1282                "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
1283              # Now stop the inferior.
1284              "read packet: {}".format(chr(3)),
1285              # And wait for the stop notification.
1286              {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
1287             True)
1288
1289         # Run the packet stream.
1290         context = self.expect_gdbremote_sequence()
1291         self.assertIsNotNone(context)
1292
1293         # Grab the main thread id.
1294         self.assertIsNotNone(context.get("stop_thread_id"))
1295         main_thread_id = int(context.get("stop_thread_id"), 16)
1296
1297         # Grab the function address.
1298         self.assertIsNotNone(context.get("function_address"))
1299         function_address = int(context.get("function_address"), 16)
1300
1301         # Grab the data addresses.
1302         self.assertIsNotNone(context.get("g_c1_address"))
1303         g_c1_address = int(context.get("g_c1_address"), 16)
1304
1305         self.assertIsNotNone(context.get("g_c2_address"))
1306         g_c2_address = int(context.get("g_c2_address"), 16)
1307
1308         # Set a breakpoint at the given address.
1309         if self.getArchitecture() == "arm":
1310             # TODO: Handle case when setting breakpoint in thumb code
1311             BREAKPOINT_KIND = 4
1312         else:
1313             BREAKPOINT_KIND = 1
1314         self.reset_test_sequence()
1315         self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
1316         context = self.expect_gdbremote_sequence()
1317         self.assertIsNotNone(context)
1318
1319         # Remove the breakpoint.
1320         self.reset_test_sequence()
1321         self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
1322         context = self.expect_gdbremote_sequence()
1323         self.assertIsNotNone(context)
1324
1325         # Verify g_c1 and g_c2 match expected initial state.
1326         args = {}
1327         args["g_c1_address"] = g_c1_address
1328         args["g_c2_address"] = g_c2_address
1329         args["expected_g_c1"] = "0"
1330         args["expected_g_c2"] = "1"
1331
1332         self.assertTrue(self.g_c1_c2_contents_are(args))
1333
1334         # Verify we take only a small number of steps to hit the first state.  Might need to work through function entry prologue code.
1335         args["expected_g_c1"] = "1"
1336         args["expected_g_c2"] = "1"
1337         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1338         self.assertTrue(state_reached)
1339
1340         # Verify we hit the next state.
1341         args["expected_g_c1"] = "1"
1342         args["expected_g_c2"] = "0"
1343         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1344         self.assertTrue(state_reached)
1345         expected_step_count = 1
1346         arch = self.getArchitecture()
1347
1348         #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
1349         if re.match("mips",arch):
1350            expected_step_count = 3
1351         #S390X requires "2" (LARL, MVI) machine instructions for updation of variable value
1352         if re.match("s390x",arch):
1353            expected_step_count = 2
1354         self.assertEqual(step_count, expected_step_count)
1355
1356         # Verify we hit the next state.
1357         args["expected_g_c1"] = "0"
1358         args["expected_g_c2"] = "0"
1359         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1360         self.assertTrue(state_reached)
1361         self.assertEqual(step_count, expected_step_count)
1362
1363         # Verify we hit the next state.
1364         args["expected_g_c1"] = "0"
1365         args["expected_g_c2"] = "1"
1366         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1367         self.assertTrue(state_reached)
1368         self.assertEqual(step_count, expected_step_count)
1369
1370     def maybe_strict_output_regex(self, regex):
1371         return '.*'+regex+'.*' if lldbplatformutil.hasChattyStderr(self) else '^'+regex+'$'
1372