]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/gdbremote_testcase.py
Vendor import of lldb trunk r256945:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / gdbremote_testcase.py
1 """
2 Base class for gdb-remote test cases.
3 """
4
5 from __future__ import print_function
6
7
8
9 import errno
10 import os
11 import os.path
12 import platform
13 import random
14 import re
15 import select
16 import signal
17 import socket
18 import subprocess
19 import sys
20 import tempfile
21 import time
22 from lldbsuite.test import configuration
23 from lldbsuite.test.lldbtest import *
24 from lldbgdbserverutils import *
25 import logging
26
27 class GdbRemoteTestCaseBase(TestBase):
28
29     _TIMEOUT_SECONDS = 5
30
31     _GDBREMOTE_KILL_PACKET = "$k#6b"
32
33     _LOGGING_LEVEL = logging.WARNING
34     # _LOGGING_LEVEL = logging.DEBUG
35
36     # Start the inferior separately, attach to the inferior on the stub command line.
37     _STARTUP_ATTACH = "attach"
38     # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
39     _STARTUP_ATTACH_MANUALLY = "attach_manually"
40     # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
41     _STARTUP_LAUNCH = "launch"
42
43     # GDB Signal numbers that are not target-specific used for common exceptions
44     TARGET_EXC_BAD_ACCESS      = 0x91
45     TARGET_EXC_BAD_INSTRUCTION = 0x92
46     TARGET_EXC_ARITHMETIC      = 0x93
47     TARGET_EXC_EMULATION       = 0x94
48     TARGET_EXC_SOFTWARE        = 0x95
49     TARGET_EXC_BREAKPOINT      = 0x96
50
51     def setUp(self):
52         TestBase.setUp(self)
53         FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
54         logging.basicConfig(format=FORMAT)
55         self.logger = logging.getLogger(__name__)
56         self.logger.setLevel(self._LOGGING_LEVEL)
57         self.test_sequence = GdbRemoteTestSequence(self.logger)
58         self.set_inferior_startup_launch()
59         self.port = self.get_next_port()
60         self.named_pipe_path = None
61         self.named_pipe = None
62         self.named_pipe_fd = None
63         self.stub_sends_two_stop_notifications_on_kill = False
64         if configuration.lldb_platform_url:
65             if configuration.lldb_platform_url.startswith('unix-'):
66                 url_pattern = '(.+)://\[?(.+?)\]?/.*'
67             else:
68                 url_pattern = '(.+)://(.+):\d+'
69             scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
70             if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
71                 self.stub_device = host
72                 self.stub_hostname = 'localhost'
73             else:
74                 self.stub_device = None
75                 self.stub_hostname = host
76         else:
77             self.stub_hostname = "localhost"
78
79     def get_next_port(self):
80         return 12000 + random.randint(0,3999)
81
82     def reset_test_sequence(self):
83         self.test_sequence = GdbRemoteTestSequence(self.logger)
84
85     def create_named_pipe(self):
86         # Create a temp dir and name for a pipe.
87         temp_dir = tempfile.mkdtemp()
88         named_pipe_path = os.path.join(temp_dir, "stub_port_number")
89
90         # Create the named pipe.
91         os.mkfifo(named_pipe_path)
92
93         # Open the read side of the pipe in non-blocking mode.  This will return right away, ready or not.
94         named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
95
96         # Create the file for the named pipe.  Note this will follow semantics of
97         # a non-blocking read side of a named pipe, which has different semantics
98         # than a named pipe opened for read in non-blocking mode.
99         named_pipe = os.fdopen(named_pipe_fd, "r")
100         self.assertIsNotNone(named_pipe)
101
102         def shutdown_named_pipe():
103             # Close the pipe.
104             try:
105                 named_pipe.close()
106             except:
107                 print("failed to close named pipe")
108                 None
109
110             # Delete the pipe.
111             try:
112                 os.remove(named_pipe_path)
113             except:
114                 print("failed to delete named pipe: {}".format(named_pipe_path))
115                 None
116
117             # Delete the temp directory.
118             try:
119                 os.rmdir(temp_dir)
120             except:
121                 print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
122                 None
123
124         # Add the shutdown hook to clean up the named pipe.
125         self.addTearDownHook(shutdown_named_pipe)
126
127         # Clear the port so the stub selects a port number.
128         self.port = 0
129
130         return (named_pipe_path, named_pipe, named_pipe_fd)
131
132     def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
133         # Wait for something to read with a max timeout.
134         (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
135         self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
136         self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
137
138         # Read the port from the named pipe.
139         stub_port_raw = self.named_pipe.read()
140         self.assertIsNotNone(stub_port_raw)
141         self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
142
143         # Trim null byte, convert to int.
144         stub_port_raw = stub_port_raw[:-1]
145         stub_port = int(stub_port_raw)
146         self.assertTrue(stub_port > 0)
147
148         return stub_port
149
150     def run_shell_cmd(self, cmd):
151         platform = self.dbg.GetSelectedPlatform()
152         shell_cmd = lldb.SBPlatformShellCommand(cmd)
153         err = platform.Run(shell_cmd)
154         if err.Fail() or shell_cmd.GetStatus():
155             m = "remote_platform.RunShellCommand('%s') failed:\n" % cmd
156             m += ">>> return code: %d\n" % shell_cmd.GetStatus()
157             if err.Fail():
158                 m += ">>> %s\n" % str(err).strip()
159             m += ">>> %s\n" % (shell_cmd.GetOutput() or
160                                "Command generated no output.")
161             raise Exception(m)
162         return shell_cmd.GetOutput().strip()
163
164     def init_llgs_test(self, use_named_pipe=True):
165         if lldb.remote_platform:
166             # Remote platforms don't support named pipe based port negotiation
167             use_named_pipe = False
168
169             # Grab the ppid from /proc/[shell pid]/stat
170             shell_stat = self.run_shell_cmd("cat /proc/$$/stat")
171             # [pid] ([executable]) [state] [*ppid*]
172             pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
173             ls_output = self.run_shell_cmd("ls -l /proc/%s/exe" % pid)
174             exe = ls_output.split()[-1]
175
176             # If the binary has been deleted, the link name has " (deleted)" appended.
177             # Remove if it's there.
178             self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
179         else:
180             self.debug_monitor_exe = get_lldb_server_exe()
181             if not self.debug_monitor_exe:
182                 self.skipTest("lldb-server exe not found")
183
184         self.debug_monitor_extra_args = ["gdbserver"]
185
186         if len(lldbtest_config.channels) > 0:
187             self.debug_monitor_extra_args.append("--log-file={}-server.log".format(self.log_basename))
188             self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
189
190         if use_named_pipe:
191             (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
192
193     def init_debugserver_test(self, use_named_pipe=True):
194         self.debug_monitor_exe = get_debugserver_exe()
195         if not self.debug_monitor_exe:
196             self.skipTest("debugserver exe not found")
197         self.debug_monitor_extra_args = ["--log-file={}-server.log".format(self.log_basename), "--log-flags=0x800000"]
198         if use_named_pipe:
199             (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
200         # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
201         # when the process truly dies.
202         self.stub_sends_two_stop_notifications_on_kill = True
203
204     def forward_adb_port(self, source, target, direction, device):
205         adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
206         def remove_port_forward():
207             subprocess.call(adb + [ "--remove", "tcp:%d" % source])
208
209         subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
210         self.addTearDownHook(remove_port_forward)
211
212     def create_socket(self):
213         sock = socket.socket()
214         logger = self.logger
215
216         triple = self.dbg.GetSelectedPlatform().GetTriple()
217         if re.match(".*-.*-.*-android", triple):
218             self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
219
220         connect_info = (self.stub_hostname, self.port)
221         sock.connect(connect_info)
222
223         def shutdown_socket():
224             if sock:
225                 try:
226                     # send the kill packet so lldb-server shuts down gracefully
227                     sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
228                 except:
229                     logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
230
231                 try:
232                     sock.close()
233                 except:
234                     logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
235
236         self.addTearDownHook(shutdown_socket)
237
238         return sock
239
240     def set_inferior_startup_launch(self):
241         self._inferior_startup = self._STARTUP_LAUNCH
242
243     def set_inferior_startup_attach(self):
244         self._inferior_startup = self._STARTUP_ATTACH
245
246     def set_inferior_startup_attach_manually(self):
247         self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
248
249     def get_debug_monitor_command_line_args(self, attach_pid=None):
250         if lldb.remote_platform:
251             commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
252         else:
253             commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
254
255         if attach_pid:
256             commandline_args += ["--attach=%d" % attach_pid]
257         if self.named_pipe_path:
258             commandline_args += ["--named-pipe", self.named_pipe_path]
259         return commandline_args
260
261     def run_platform_command(self, cmd):
262         platform = self.dbg.GetSelectedPlatform()
263         shell_command = lldb.SBPlatformShellCommand(cmd)
264         err = platform.Run(shell_command)
265         return (err, shell_command.GetOutput())
266
267     def launch_debug_monitor(self, attach_pid=None, logfile=None):
268         # Create the command line.
269         commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
270
271         # Start the server.
272         server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
273         self.addTearDownHook(self.cleanupSubprocesses)
274         self.assertIsNotNone(server)
275
276         # If we're receiving the stub's listening port from the named pipe, do that here.
277         if self.named_pipe:
278             self.port = self.get_stub_port_from_named_socket()
279
280         return server
281
282     def connect_to_debug_monitor(self, attach_pid=None):
283         if self.named_pipe:
284             # Create the stub.
285             server = self.launch_debug_monitor(attach_pid=attach_pid)
286             self.assertIsNotNone(server)
287
288             def shutdown_debug_monitor():
289                 try:
290                     server.terminate()
291                 except:
292                     logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
293             self.addTearDownHook(shutdown_debug_monitor)
294
295             # Schedule debug monitor to be shut down during teardown.
296             logger = self.logger
297
298             # Attach to the stub and return a socket opened to it.
299             self.sock = self.create_socket()
300             return server
301
302         # We're using a random port algorithm to try not to collide with other ports,
303         # and retry a max # times.
304         attempts = 0
305         MAX_ATTEMPTS = 20
306
307         while attempts < MAX_ATTEMPTS:
308             server = self.launch_debug_monitor(attach_pid=attach_pid)
309
310             # Schedule debug monitor to be shut down during teardown.
311             logger = self.logger
312             def shutdown_debug_monitor():
313                 try:
314                     server.terminate()
315                 except:
316                     logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
317             self.addTearDownHook(shutdown_debug_monitor)
318
319             connect_attemps = 0
320             MAX_CONNECT_ATTEMPTS = 10
321
322             while connect_attemps < MAX_CONNECT_ATTEMPTS:
323                 # Create a socket to talk to the server
324                 try:
325                     self.sock = self.create_socket()
326                     return server
327                 except socket.error as serr:
328                     # We're only trying to handle connection refused.
329                     if serr.errno != errno.ECONNREFUSED:
330                         raise serr
331                 time.sleep(0.5)
332                 connect_attemps += 1
333
334             # We should close the server here to be safe.
335             server.terminate()
336
337             # Increment attempts.
338             print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
339             attempts += 1
340
341             # And wait a random length of time before next attempt, to avoid collisions.
342             time.sleep(random.randint(1,5))
343             
344             # Now grab a new port number.
345             self.port = self.get_next_port()
346
347         raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
348
349     def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
350         # We're going to start a child process that the debug monitor stub can later attach to.
351         # This process needs to be started so that it just hangs around for a while.  We'll
352         # have it sleep.
353         if not exe_path:
354             exe_path = os.path.abspath("a.out")
355
356         args = []
357         if inferior_args:
358             args.extend(inferior_args)
359         if sleep_seconds:
360             args.append("sleep:%d" % sleep_seconds)
361
362         inferior = self.spawnSubprocess(exe_path, args)
363         def shutdown_process_for_attach():
364             try:
365                 inferior.terminate()
366             except:
367                 logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
368         self.addTearDownHook(shutdown_process_for_attach)
369         return inferior
370
371     def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
372         """Prep the debug monitor, the inferior, and the expected packet stream.
373
374         Handle the separate cases of using the debug monitor in attach-to-inferior mode
375         and in launch-inferior mode.
376
377         For attach-to-inferior mode, the inferior process is first started, then
378         the debug monitor is started in attach to pid mode (using --attach on the
379         stub command line), and the no-ack-mode setup is appended to the packet
380         stream.  The packet stream is not yet executed, ready to have more expected
381         packet entries added to it.
382
383         For launch-inferior mode, the stub is first started, then no ack mode is
384         setup on the expected packet stream, then the verified launch packets are added
385         to the expected socket stream.  The packet stream is not yet executed, ready
386         to have more expected packet entries added to it.
387
388         The return value is:
389         {inferior:<inferior>, server:<server>}
390         """
391         inferior = None
392         attach_pid = None
393
394         if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
395             # Launch the process that we'll use as the inferior.
396             inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
397             self.assertIsNotNone(inferior)
398             self.assertTrue(inferior.pid > 0)
399             if self._inferior_startup == self._STARTUP_ATTACH:
400                 # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
401                 attach_pid = inferior.pid
402
403         if self._inferior_startup == self._STARTUP_LAUNCH:
404             # Build launch args
405             if not inferior_exe_path:
406                 inferior_exe_path = os.path.abspath("a.out")
407
408             if lldb.remote_platform:
409                 remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
410                 remote_file_spec = lldb.SBFileSpec(remote_path, False)
411                 err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
412                 if err.Fail():
413                     raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
414                 inferior_exe_path = remote_path
415
416             launch_args = [inferior_exe_path]
417             if inferior_args:
418                 launch_args.extend(inferior_args)
419
420         # Launch the debug monitor stub, attaching to the inferior.
421         server = self.connect_to_debug_monitor(attach_pid=attach_pid)
422         self.assertIsNotNone(server)
423
424         # Build the expected protocol stream
425         self.add_no_ack_remote_stream()
426         if self._inferior_startup == self._STARTUP_LAUNCH:
427             self.add_verified_launch_packets(launch_args)
428
429         return {"inferior":inferior, "server":server}
430
431     def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
432         response = ""
433         timeout_time = time.time() + timeout_seconds
434
435         while not expected_content_regex.match(response) and time.time() < timeout_time: 
436             can_read, _, _ = select.select([sock], [], [], timeout_seconds)
437             if can_read and sock in can_read:
438                 recv_bytes = sock.recv(4096)
439                 if recv_bytes:
440                     response += recv_bytes
441
442         self.assertTrue(expected_content_regex.match(response))
443
444     def expect_socket_send(self, sock, content, timeout_seconds):
445         request_bytes_remaining = content
446         timeout_time = time.time() + timeout_seconds
447
448         while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
449             _, can_write, _ = select.select([], [sock], [], timeout_seconds)
450             if can_write and sock in can_write:
451                 written_byte_count = sock.send(request_bytes_remaining)
452                 request_bytes_remaining = request_bytes_remaining[written_byte_count:]
453         self.assertEqual(len(request_bytes_remaining), 0)
454
455     def do_handshake(self, stub_socket, timeout_seconds=5):
456         # Write the ack.
457         self.expect_socket_send(stub_socket, "+", timeout_seconds)
458
459         # Send the start no ack mode packet.
460         NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
461         bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
462         self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
463
464         # Receive the ack and "OK"
465         self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
466
467         # Send the final ack.
468         self.expect_socket_send(stub_socket, "+", timeout_seconds)
469
470     def add_no_ack_remote_stream(self):
471         self.test_sequence.add_log_lines(
472             ["read packet: +",
473              "read packet: $QStartNoAckMode#b0",
474              "send packet: +",
475              "send packet: $OK#9a",
476              "read packet: +"],
477             True)
478
479     def add_verified_launch_packets(self, launch_args):
480         self.test_sequence.add_log_lines(
481             ["read packet: %s" % build_gdbremote_A_packet(launch_args),
482              "send packet: $OK#00",
483              "read packet: $qLaunchSuccess#a5",
484              "send packet: $OK#00"],
485             True)
486
487     def add_thread_suffix_request_packets(self):
488         self.test_sequence.add_log_lines(
489             ["read packet: $QThreadSuffixSupported#e4",
490              "send packet: $OK#00",
491             ], True)
492
493     def add_process_info_collection_packets(self):
494         self.test_sequence.add_log_lines(
495             ["read packet: $qProcessInfo#dc",
496               { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
497             True)
498
499     _KNOWN_PROCESS_INFO_KEYS = [
500         "pid",
501         "parent-pid",
502         "real-uid",
503         "real-gid",
504         "effective-uid",
505         "effective-gid",
506         "cputype",
507         "cpusubtype",
508         "ostype",
509         "triple",
510         "vendor",
511         "endian",
512         "ptrsize"
513         ]
514
515     def parse_process_info_response(self, context):
516         # Ensure we have a process info response.
517         self.assertIsNotNone(context)
518         process_info_raw = context.get("process_info_raw")
519         self.assertIsNotNone(process_info_raw)
520
521         # Pull out key:value; pairs.
522         process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
523
524         # Validate keys are known.
525         for (key, val) in list(process_info_dict.items()):
526             self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
527             self.assertIsNotNone(val)
528
529         return process_info_dict
530
531     def add_register_info_collection_packets(self):
532         self.test_sequence.add_log_lines(
533             [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
534               "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
535               "save_key":"reg_info_responses" } ],
536             True)
537
538     def parse_register_info_packets(self, context):
539         """Return an array of register info dictionaries, one per register info."""
540         reg_info_responses = context.get("reg_info_responses")
541         self.assertIsNotNone(reg_info_responses)
542
543         # Parse register infos.
544         return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
545
546     def expect_gdbremote_sequence(self, timeout_seconds=None):
547         if not timeout_seconds:
548             timeout_seconds = self._TIMEOUT_SECONDS
549         return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, timeout_seconds, self.logger)
550
551     _KNOWN_REGINFO_KEYS = [
552         "name",
553         "alt-name",
554         "bitsize",
555         "offset",
556         "encoding",
557         "format",
558         "set",
559         "ehframe",
560         "dwarf",
561         "generic",
562         "container-regs",
563         "invalidate-regs"
564     ]
565
566     def assert_valid_reg_info(self, reg_info):
567         # Assert we know about all the reginfo keys parsed.
568         for key in reg_info:
569             self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
570
571         # Check the bare-minimum expected set of register info keys.
572         self.assertTrue("name" in reg_info)
573         self.assertTrue("bitsize" in reg_info)
574         self.assertTrue("offset" in reg_info)
575         self.assertTrue("encoding" in reg_info)
576         self.assertTrue("format" in reg_info)
577
578     def find_pc_reg_info(self, reg_infos):
579         lldb_reg_index = 0
580         for reg_info in reg_infos:
581             if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
582                 return (lldb_reg_index, reg_info)
583             lldb_reg_index += 1
584
585         return (None, None)
586
587     def add_lldb_register_index(self, reg_infos):
588         """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
589
590         We'll use this when we want to call packets like P/p with a register index but do so
591         on only a subset of the full register info set.
592         """
593         self.assertIsNotNone(reg_infos)
594
595         reg_index = 0
596         for reg_info in reg_infos:
597             reg_info["lldb_register_index"] = reg_index
598             reg_index += 1
599
600     def add_query_memory_region_packets(self, address):
601         self.test_sequence.add_log_lines(
602             ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
603              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
604             True)
605
606     def parse_key_val_dict(self, key_val_text, allow_dupes=True):
607         self.assertIsNotNone(key_val_text)
608         kv_dict = {}
609         for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
610             key = match.group(1)
611             val = match.group(2)
612             if key in kv_dict:
613                 if allow_dupes:
614                     if type(kv_dict[key]) == list:
615                         kv_dict[key].append(val)
616                     else:
617                         # Promote to list
618                         kv_dict[key] = [kv_dict[key], val]
619                 else:
620                     self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
621             else:
622                 kv_dict[key] = val
623         return kv_dict
624
625     def parse_memory_region_packet(self, context):
626         # Ensure we have a context.
627         self.assertIsNotNone(context.get("memory_region_response"))
628
629         # Pull out key:value; pairs.
630         mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
631
632         # Validate keys are known.
633         for (key, val) in list(mem_region_dict.items()):
634             self.assertTrue(key in ["start", "size", "permissions", "error"])
635             self.assertIsNotNone(val)
636
637         # Return the dictionary of key-value pairs for the memory region.
638         return mem_region_dict
639
640     def assert_address_within_memory_region(self, test_address, mem_region_dict):
641         self.assertIsNotNone(mem_region_dict)
642         self.assertTrue("start" in mem_region_dict)
643         self.assertTrue("size" in mem_region_dict)
644
645         range_start = int(mem_region_dict["start"], 16)
646         range_size = int(mem_region_dict["size"], 16)
647         range_end = range_start + range_size
648
649         if test_address < range_start:
650             self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
651         elif test_address >= range_end:
652             self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
653
654     def add_threadinfo_collection_packets(self):
655         self.test_sequence.add_log_lines(
656             [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
657                 "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
658               "save_key":"threadinfo_responses" } ],
659             True)
660
661     def parse_threadinfo_packets(self, context):
662         """Return an array of thread ids (decimal ints), one per thread."""
663         threadinfo_responses = context.get("threadinfo_responses")
664         self.assertIsNotNone(threadinfo_responses)
665
666         thread_ids = []
667         for threadinfo_response in threadinfo_responses:
668             new_thread_infos = parse_threadinfo_response(threadinfo_response)
669             thread_ids.extend(new_thread_infos)
670         return thread_ids
671
672     def wait_for_thread_count(self, thread_count, timeout_seconds=3):
673         start_time = time.time()
674         timeout_time = start_time + timeout_seconds
675
676         actual_thread_count = 0
677         while actual_thread_count < thread_count:
678             self.reset_test_sequence()
679             self.add_threadinfo_collection_packets()
680
681             context = self.expect_gdbremote_sequence()
682             self.assertIsNotNone(context)
683
684             threads = self.parse_threadinfo_packets(context)
685             self.assertIsNotNone(threads)
686
687             actual_thread_count = len(threads)
688
689             if time.time() > timeout_time:
690                 raise Exception(
691                     'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
692                         timeout_seconds, thread_count, actual_thread_count))
693
694         return threads
695
696     def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
697         self.test_sequence.add_log_lines(
698             [# Set the breakpoint.
699              "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
700              # Verify the stub could set it.
701              "send packet: $OK#00",
702              ], True)
703
704         if (do_continue):
705             self.test_sequence.add_log_lines(
706                 [# Continue the inferior.
707                  "read packet: $c#63",
708                  # Expect a breakpoint stop report.
709                  {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
710                  ], True)        
711
712     def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
713         self.test_sequence.add_log_lines(
714             [# Remove the breakpoint.
715              "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
716              # Verify the stub could unset it.
717              "send packet: $OK#00",
718             ], True)
719
720     def add_qSupported_packets(self):
721         self.test_sequence.add_log_lines(
722             ["read packet: $qSupported#00",
723              {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
724             ], True)
725
726     _KNOWN_QSUPPORTED_STUB_FEATURES = [
727         "augmented-libraries-svr4-read",
728         "PacketSize",
729         "QStartNoAckMode",
730         "QThreadSuffixSupported",
731         "QListThreadsInStopReply",
732         "qXfer:auxv:read",
733         "qXfer:libraries:read",
734         "qXfer:libraries-svr4:read",
735         "qXfer:features:read",
736         "qEcho"
737     ]
738
739     def parse_qSupported_response(self, context):
740         self.assertIsNotNone(context)
741
742         raw_response = context.get("qSupported_response")
743         self.assertIsNotNone(raw_response)
744
745         # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
746         # +,-,? is stripped from the key and set as the value.
747         supported_dict = {}
748         for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
749             key = match.group(1)
750             val = match.group(3)
751
752             # key=val: store as is
753             if val and len(val) > 0:
754                 supported_dict[key] = val
755             else:
756                 if len(key) < 2:
757                     raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
758                 supported_type = key[-1]
759                 key = key[:-1]
760                 if not supported_type in ["+", "-", "?"]:
761                     raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
762                 supported_dict[key] = supported_type 
763             # Ensure we know the supported element
764             if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
765                 raise Exception("unknown qSupported stub feature reported: %s" % key)
766
767         return supported_dict
768
769     def run_process_then_stop(self, run_seconds=1):
770         # Tell the stub to continue.
771         self.test_sequence.add_log_lines(
772              ["read packet: $vCont;c#a8"],
773              True)
774         context = self.expect_gdbremote_sequence()
775
776         # Wait for run_seconds.
777         time.sleep(run_seconds)
778
779         # Send an interrupt, capture a T response.
780         self.reset_test_sequence()
781         self.test_sequence.add_log_lines(
782             ["read packet: {}".format(chr(3)),
783              {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
784             True)
785         context = self.expect_gdbremote_sequence()
786         self.assertIsNotNone(context)
787         self.assertIsNotNone(context.get("stop_result"))
788
789         return context
790
791     def select_modifiable_register(self, reg_infos):
792         """Find a register that can be read/written freely."""
793         PREFERRED_REGISTER_NAMES = set(["rax",])
794
795         # First check for the first register from the preferred register name set.
796         alternative_register_index = None
797
798         self.assertIsNotNone(reg_infos)
799         for reg_info in reg_infos:
800             if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
801                 # We found a preferred register.  Use it.
802                 return reg_info["lldb_register_index"]
803             if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
804                 # A frame pointer register will do as a register to modify temporarily.
805                 alternative_register_index = reg_info["lldb_register_index"]
806
807         # We didn't find a preferred register.  Return whatever alternative register
808         # we found, if any.
809         return alternative_register_index
810
811     def extract_registers_from_stop_notification(self, stop_key_vals_text):
812         self.assertIsNotNone(stop_key_vals_text)
813         kv_dict = self.parse_key_val_dict(stop_key_vals_text)
814
815         registers = {}
816         for (key, val) in list(kv_dict.items()):
817             if re.match(r"^[0-9a-fA-F]+$", key):
818                 registers[int(key, 16)] = val
819         return registers
820
821     def gather_register_infos(self):
822         self.reset_test_sequence()
823         self.add_register_info_collection_packets()
824
825         context = self.expect_gdbremote_sequence()
826         self.assertIsNotNone(context)
827
828         reg_infos = self.parse_register_info_packets(context)
829         self.assertIsNotNone(reg_infos)
830         self.add_lldb_register_index(reg_infos)
831
832         return reg_infos
833
834     def find_generic_register_with_name(self, reg_infos, generic_name):
835         self.assertIsNotNone(reg_infos)
836         for reg_info in reg_infos:
837             if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
838                 return reg_info
839         return None
840
841     def decode_gdbremote_binary(self, encoded_bytes):
842         decoded_bytes = ""
843         i = 0
844         while i < len(encoded_bytes):
845             if encoded_bytes[i] == "}":
846                 # Handle escaped char.
847                 self.assertTrue(i + 1 < len(encoded_bytes))
848                 decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
849                 i +=2
850             elif encoded_bytes[i] == "*":
851                 # Handle run length encoding.
852                 self.assertTrue(len(decoded_bytes) > 0)
853                 self.assertTrue(i + 1 < len(encoded_bytes))
854                 repeat_count = ord(encoded_bytes[i+1]) - 29
855                 decoded_bytes += decoded_bytes[-1] * repeat_count
856                 i += 2
857             else:
858                 decoded_bytes += encoded_bytes[i]
859                 i += 1
860         return decoded_bytes
861
862     def build_auxv_dict(self, endian, word_size, auxv_data):
863         self.assertIsNotNone(endian)
864         self.assertIsNotNone(word_size)
865         self.assertIsNotNone(auxv_data)
866
867         auxv_dict = {}
868
869         while len(auxv_data) > 0:
870             # Chop off key.
871             raw_key = auxv_data[:word_size]
872             auxv_data = auxv_data[word_size:]
873
874             # Chop of value.
875             raw_value = auxv_data[:word_size]
876             auxv_data = auxv_data[word_size:]
877
878             # Convert raw text from target endian.
879             key = unpack_endian_binary_string(endian, raw_key)
880             value = unpack_endian_binary_string(endian, raw_value)
881
882             # Handle ending entry.
883             if key == 0:
884                 self.assertEqual(value, 0)
885                 return auxv_dict
886
887             # The key should not already be present.
888             self.assertFalse(key in auxv_dict)
889             auxv_dict[key] = value
890
891         self.fail("should not reach here - implies required double zero entry not found")
892         return auxv_dict
893
894     def read_binary_data_in_chunks(self, command_prefix, chunk_length):
895         """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
896         offset = 0
897         done = False
898         decoded_data = ""
899
900         while not done:
901             # Grab the next iteration of data.
902             self.reset_test_sequence()
903             self.test_sequence.add_log_lines([
904                 "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
905                 {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
906                 ], True)
907
908             context = self.expect_gdbremote_sequence()
909             self.assertIsNotNone(context)
910
911             response_type = context.get("response_type")
912             self.assertIsNotNone(response_type)
913             self.assertTrue(response_type in ["l", "m"])
914
915             # Move offset along.
916             offset += chunk_length
917
918             # Figure out if we're done.  We're done if the response type is l.
919             done = response_type == "l"
920
921             # Decode binary data.
922             content_raw = context.get("content_raw")
923             if content_raw and len(content_raw) > 0:
924                 self.assertIsNotNone(content_raw)
925                 decoded_data += self.decode_gdbremote_binary(content_raw)
926         return decoded_data
927
928     def add_interrupt_packets(self):
929         self.test_sequence.add_log_lines([
930             # Send the intterupt.
931             "read packet: {}".format(chr(3)),
932             # And wait for the stop notification.
933             {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
934             ], True)
935
936     def parse_interrupt_packets(self, context):
937         self.assertIsNotNone(context.get("stop_signo"))
938         self.assertIsNotNone(context.get("stop_key_val_text"))
939         return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
940
941     def add_QSaveRegisterState_packets(self, thread_id):
942         if thread_id:
943             # Use the thread suffix form.
944             request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
945         else:
946             request = "read packet: $QSaveRegisterState#00"
947             
948         self.test_sequence.add_log_lines([
949             request,
950             {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
951             ], True)
952
953     def parse_QSaveRegisterState_response(self, context):
954         self.assertIsNotNone(context)
955
956         save_response = context.get("save_response")
957         self.assertIsNotNone(save_response)
958
959         if len(save_response) < 1 or save_response[0] == "E":
960             # error received
961             return (False, None)
962         else:
963             return (True, int(save_response))
964
965     def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
966         if thread_id:
967             # Use the thread suffix form.
968             request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
969         else:
970             request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
971
972         self.test_sequence.add_log_lines([
973             request,
974             "send packet: $OK#00"
975             ], True)
976
977     def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
978         self.assertIsNotNone(reg_infos)
979
980         successful_writes = 0
981         failed_writes = 0
982
983         for reg_info in reg_infos:
984             # Use the lldb register index added to the reg info.  We're not necessarily
985             # working off a full set of register infos, so an inferred register index could be wrong. 
986             reg_index = reg_info["lldb_register_index"]
987             self.assertIsNotNone(reg_index)
988
989             reg_byte_size = int(reg_info["bitsize"])/8
990             self.assertTrue(reg_byte_size > 0)
991
992             # Handle thread suffix.
993             if thread_id:
994                 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
995             else:
996                 p_request = "read packet: $p{:x}#00".format(reg_index)
997
998             # Read the existing value.
999             self.reset_test_sequence()
1000             self.test_sequence.add_log_lines([
1001                 p_request,
1002                 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1003                 ], True)
1004             context = self.expect_gdbremote_sequence()
1005             self.assertIsNotNone(context)
1006
1007             # Verify the response length.
1008             p_response = context.get("p_response")
1009             self.assertIsNotNone(p_response)
1010             initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1011
1012             # Flip the value by xoring with all 1s
1013             all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
1014             flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1015             # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1016
1017             # Handle thread suffix for P.
1018             if thread_id:
1019                 P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1020             else:
1021                 P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
1022
1023             # Write the flipped value to the register.
1024             self.reset_test_sequence()
1025             self.test_sequence.add_log_lines([
1026                 P_request,
1027                 { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
1028                 ], True)
1029             context = self.expect_gdbremote_sequence()
1030             self.assertIsNotNone(context)
1031
1032             # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1033             # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1034             # all flipping perfectly.
1035             P_response = context.get("P_response")
1036             self.assertIsNotNone(P_response)
1037             if P_response == "OK":
1038                 successful_writes += 1
1039             else:
1040                 failed_writes += 1
1041                 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1042
1043             # Read back the register value, ensure it matches the flipped value.
1044             if P_response == "OK":
1045                 self.reset_test_sequence()
1046                 self.test_sequence.add_log_lines([
1047                     p_request,
1048                     { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1049                     ], True)
1050                 context = self.expect_gdbremote_sequence()
1051                 self.assertIsNotNone(context)
1052
1053                 verify_p_response_raw = context.get("p_response")
1054                 self.assertIsNotNone(verify_p_response_raw)
1055                 verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
1056
1057                 if verify_bits != flipped_bits_int:
1058                     # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1059                     # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1060                     successful_writes -= 1
1061                     failed_writes +=1
1062
1063         return (successful_writes, failed_writes)
1064
1065     def is_bit_flippable_register(self, reg_info):
1066         if not reg_info:
1067             return False
1068         if not "set" in reg_info:
1069             return False
1070         if reg_info["set"] != "General Purpose Registers":
1071             return False
1072         if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1073             # Don't try to bit flip registers contained in another register.
1074             return False
1075         if re.match("^.s$", reg_info["name"]):
1076             # This is a 2-letter register name that ends in "s", like a segment register.
1077             # Don't try to bit flip these.
1078             return False
1079         if re.match("^(c|)psr$", reg_info["name"]):
1080             # This is an ARM program status register; don't flip it.
1081             return False
1082         # Okay, this looks fine-enough.
1083         return True
1084
1085     def read_register_values(self, reg_infos, endian, thread_id=None):
1086         self.assertIsNotNone(reg_infos)
1087         values = {}
1088
1089         for reg_info in reg_infos:
1090             # We append a register index when load reg infos so we can work with subsets.
1091             reg_index = reg_info.get("lldb_register_index")
1092             self.assertIsNotNone(reg_index)
1093
1094             # Handle thread suffix.
1095             if thread_id:
1096                 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1097             else:
1098                 p_request = "read packet: $p{:x}#00".format(reg_index)
1099
1100             # Read it with p.
1101             self.reset_test_sequence()
1102             self.test_sequence.add_log_lines([
1103                 p_request,
1104                 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1105                 ], True)
1106             context = self.expect_gdbremote_sequence()
1107             self.assertIsNotNone(context)
1108
1109             # Convert value from target endian to integral.
1110             p_response = context.get("p_response")
1111             self.assertIsNotNone(p_response)
1112             self.assertTrue(len(p_response) > 0)
1113             self.assertFalse(p_response[0] == "E")
1114             
1115             values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1116             
1117         return values
1118
1119     def add_vCont_query_packets(self):
1120         self.test_sequence.add_log_lines([
1121             "read packet: $vCont?#49",
1122             {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
1123             ], True)
1124
1125     def parse_vCont_query_response(self, context):
1126         self.assertIsNotNone(context)
1127         vCont_query_response = context.get("vCont_query_response")
1128
1129         # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
1130         if not vCont_query_response or len(vCont_query_response) == 0:
1131             return {}
1132
1133         return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
1134
1135     def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
1136         """Used by single step test that appears in a few different contexts."""
1137         single_step_count = 0
1138
1139         while single_step_count < max_step_count:
1140             self.assertIsNotNone(thread_id)
1141
1142             # Build the packet for the single step instruction.  We replace {thread}, if present, with the thread_id.
1143             step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1144             # print("\nstep_packet created: {}\n".format(step_packet))
1145
1146             # Single step.
1147             self.reset_test_sequence()
1148             if use_Hc_packet:
1149                 self.test_sequence.add_log_lines(
1150                     [# Set the continue thread.
1151                      "read packet: $Hc{0:x}#00".format(thread_id),
1152                      "send packet: $OK#00",
1153                      ], True)
1154             self.test_sequence.add_log_lines([
1155                  # Single step.
1156                  step_packet,
1157                  # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1158                  # Expect a breakpoint stop report.
1159                  {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
1160                  ], True)
1161             context = self.expect_gdbremote_sequence()
1162             self.assertIsNotNone(context)
1163             self.assertIsNotNone(context.get("stop_signo"))
1164             self.assertEqual(int(context.get("stop_signo"), 16),
1165                     lldbutil.get_signal_number('SIGTRAP'))
1166
1167             single_step_count += 1
1168
1169             # See if the predicate is true.  If so, we're done.
1170             if predicate(args):
1171                 return (True, single_step_count)
1172
1173         # The predicate didn't return true within the runaway step count.
1174         return (False, single_step_count)
1175
1176     def g_c1_c2_contents_are(self, args):
1177         """Used by single step test that appears in a few different contexts."""
1178         g_c1_address = args["g_c1_address"]
1179         g_c2_address = args["g_c2_address"]
1180         expected_g_c1 = args["expected_g_c1"]
1181         expected_g_c2 = args["expected_g_c2"]
1182
1183         # Read g_c1 and g_c2 contents.
1184         self.reset_test_sequence()
1185         self.test_sequence.add_log_lines(
1186             ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1187              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
1188              "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1189              {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
1190             True)
1191
1192         # Run the packet stream.
1193         context = self.expect_gdbremote_sequence()
1194         self.assertIsNotNone(context)
1195
1196         # Check if what we read from inferior memory is what we are expecting.
1197         self.assertIsNotNone(context.get("g_c1_contents"))
1198         self.assertIsNotNone(context.get("g_c2_contents"))
1199
1200         return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
1201
1202     def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
1203         """Used by single step test that appears in a few different contexts."""
1204         # Start up the inferior.
1205         procs = self.prep_debug_monitor_and_inferior(
1206             inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
1207
1208         # Run the process
1209         self.test_sequence.add_log_lines(
1210             [# Start running after initial stop.
1211              "read packet: $c#63",
1212              # Match output line that prints the memory address of the function call entry point.
1213              # Note we require launch-only testing so we can get inferior otuput.
1214              { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 
1215                "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
1216              # Now stop the inferior.
1217              "read packet: {}".format(chr(3)),
1218              # And wait for the stop notification.
1219              {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
1220             True)
1221
1222         # Run the packet stream.
1223         context = self.expect_gdbremote_sequence()
1224         self.assertIsNotNone(context)
1225
1226         # Grab the main thread id.
1227         self.assertIsNotNone(context.get("stop_thread_id"))
1228         main_thread_id = int(context.get("stop_thread_id"), 16)
1229
1230         # Grab the function address.
1231         self.assertIsNotNone(context.get("function_address"))
1232         function_address = int(context.get("function_address"), 16)
1233
1234         # Grab the data addresses.
1235         self.assertIsNotNone(context.get("g_c1_address"))
1236         g_c1_address = int(context.get("g_c1_address"), 16)
1237
1238         self.assertIsNotNone(context.get("g_c2_address"))
1239         g_c2_address = int(context.get("g_c2_address"), 16)
1240
1241         # Set a breakpoint at the given address.
1242         if self.getArchitecture() == "arm":
1243             # TODO: Handle case when setting breakpoint in thumb code
1244             BREAKPOINT_KIND = 4
1245         else:
1246             BREAKPOINT_KIND = 1
1247         self.reset_test_sequence()
1248         self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
1249         context = self.expect_gdbremote_sequence()
1250         self.assertIsNotNone(context)
1251
1252         # Remove the breakpoint.
1253         self.reset_test_sequence()
1254         self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
1255         context = self.expect_gdbremote_sequence()
1256         self.assertIsNotNone(context)
1257
1258         # Verify g_c1 and g_c2 match expected initial state.
1259         args = {}
1260         args["g_c1_address"] = g_c1_address
1261         args["g_c2_address"] = g_c2_address
1262         args["expected_g_c1"] = "0"
1263         args["expected_g_c2"] = "1"
1264
1265         self.assertTrue(self.g_c1_c2_contents_are(args))
1266
1267         # Verify we take only a small number of steps to hit the first state.  Might need to work through function entry prologue code.
1268         args["expected_g_c1"] = "1"
1269         args["expected_g_c2"] = "1"
1270         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1271         self.assertTrue(state_reached)
1272
1273         # Verify we hit the next state.
1274         args["expected_g_c1"] = "1"
1275         args["expected_g_c2"] = "0"
1276         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1277         self.assertTrue(state_reached)
1278         expected_step_count = 1
1279         arch = self.getArchitecture()
1280
1281         #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
1282         if re.match("mips",arch):
1283            expected_step_count = 3
1284         self.assertEqual(step_count, expected_step_count)
1285
1286         # Verify we hit the next state.
1287         args["expected_g_c1"] = "0"
1288         args["expected_g_c2"] = "0"
1289         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1290         self.assertTrue(state_reached)
1291         self.assertEqual(step_count, expected_step_count)
1292
1293         # Verify we hit the next state.
1294         args["expected_g_c1"] = "0"
1295         args["expected_g_c2"] = "1"
1296         (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1297         self.assertTrue(state_reached)
1298         self.assertEqual(step_count, expected_step_count)
1299