2 Base class for gdb-remote test cases.
5 from __future__ import print_function
22 from lldbsuite.test import configuration
23 from lldbsuite.test.lldbtest import *
24 from lldbgdbserverutils import *
27 class GdbRemoteTestCaseBase(TestBase):
31 _GDBREMOTE_KILL_PACKET = "$k#6b"
33 _LOGGING_LEVEL = logging.WARNING
34 # _LOGGING_LEVEL = logging.DEBUG
36 # Start the inferior separately, attach to the inferior on the stub command line.
37 _STARTUP_ATTACH = "attach"
38 # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
39 _STARTUP_ATTACH_MANUALLY = "attach_manually"
40 # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
41 _STARTUP_LAUNCH = "launch"
43 # GDB Signal numbers that are not target-specific used for common exceptions
44 TARGET_EXC_BAD_ACCESS = 0x91
45 TARGET_EXC_BAD_INSTRUCTION = 0x92
46 TARGET_EXC_ARITHMETIC = 0x93
47 TARGET_EXC_EMULATION = 0x94
48 TARGET_EXC_SOFTWARE = 0x95
49 TARGET_EXC_BREAKPOINT = 0x96
53 FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
54 logging.basicConfig(format=FORMAT)
55 self.logger = logging.getLogger(__name__)
56 self.logger.setLevel(self._LOGGING_LEVEL)
57 self.test_sequence = GdbRemoteTestSequence(self.logger)
58 self.set_inferior_startup_launch()
59 self.port = self.get_next_port()
60 self.named_pipe_path = None
61 self.named_pipe = None
62 self.named_pipe_fd = None
63 self.stub_sends_two_stop_notifications_on_kill = False
64 if configuration.lldb_platform_url:
65 if configuration.lldb_platform_url.startswith('unix-'):
66 url_pattern = '(.+)://\[?(.+?)\]?/.*'
68 url_pattern = '(.+)://(.+):\d+'
69 scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
70 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
71 self.stub_device = host
72 self.stub_hostname = 'localhost'
74 self.stub_device = None
75 self.stub_hostname = host
77 self.stub_hostname = "localhost"
79 def get_next_port(self):
80 return 12000 + random.randint(0,3999)
82 def reset_test_sequence(self):
83 self.test_sequence = GdbRemoteTestSequence(self.logger)
85 def create_named_pipe(self):
86 # Create a temp dir and name for a pipe.
87 temp_dir = tempfile.mkdtemp()
88 named_pipe_path = os.path.join(temp_dir, "stub_port_number")
90 # Create the named pipe.
91 os.mkfifo(named_pipe_path)
93 # Open the read side of the pipe in non-blocking mode. This will return right away, ready or not.
94 named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
96 # Create the file for the named pipe. Note this will follow semantics of
97 # a non-blocking read side of a named pipe, which has different semantics
98 # than a named pipe opened for read in non-blocking mode.
99 named_pipe = os.fdopen(named_pipe_fd, "r")
100 self.assertIsNotNone(named_pipe)
102 def shutdown_named_pipe():
107 print("failed to close named pipe")
112 os.remove(named_pipe_path)
114 print("failed to delete named pipe: {}".format(named_pipe_path))
117 # Delete the temp directory.
121 print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
124 # Add the shutdown hook to clean up the named pipe.
125 self.addTearDownHook(shutdown_named_pipe)
127 # Clear the port so the stub selects a port number.
130 return (named_pipe_path, named_pipe, named_pipe_fd)
132 def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
133 # Wait for something to read with a max timeout.
134 (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
135 self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
136 self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
138 # Read the port from the named pipe.
139 stub_port_raw = self.named_pipe.read()
140 self.assertIsNotNone(stub_port_raw)
141 self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
143 # Trim null byte, convert to int.
144 stub_port_raw = stub_port_raw[:-1]
145 stub_port = int(stub_port_raw)
146 self.assertTrue(stub_port > 0)
150 def run_shell_cmd(self, cmd):
151 platform = self.dbg.GetSelectedPlatform()
152 shell_cmd = lldb.SBPlatformShellCommand(cmd)
153 err = platform.Run(shell_cmd)
154 if err.Fail() or shell_cmd.GetStatus():
155 m = "remote_platform.RunShellCommand('%s') failed:\n" % cmd
156 m += ">>> return code: %d\n" % shell_cmd.GetStatus()
158 m += ">>> %s\n" % str(err).strip()
159 m += ">>> %s\n" % (shell_cmd.GetOutput() or
160 "Command generated no output.")
162 return shell_cmd.GetOutput().strip()
164 def init_llgs_test(self, use_named_pipe=True):
165 if lldb.remote_platform:
166 # Remote platforms don't support named pipe based port negotiation
167 use_named_pipe = False
169 # Grab the ppid from /proc/[shell pid]/stat
170 shell_stat = self.run_shell_cmd("cat /proc/$$/stat")
171 # [pid] ([executable]) [state] [*ppid*]
172 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
173 ls_output = self.run_shell_cmd("ls -l /proc/%s/exe" % pid)
174 exe = ls_output.split()[-1]
176 # If the binary has been deleted, the link name has " (deleted)" appended.
177 # Remove if it's there.
178 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
180 self.debug_monitor_exe = get_lldb_server_exe()
181 if not self.debug_monitor_exe:
182 self.skipTest("lldb-server exe not found")
184 self.debug_monitor_extra_args = ["gdbserver"]
186 if len(lldbtest_config.channels) > 0:
187 self.debug_monitor_extra_args.append("--log-file={}-server.log".format(self.log_basename))
188 self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
191 (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
193 def init_debugserver_test(self, use_named_pipe=True):
194 self.debug_monitor_exe = get_debugserver_exe()
195 if not self.debug_monitor_exe:
196 self.skipTest("debugserver exe not found")
197 self.debug_monitor_extra_args = ["--log-file={}-server.log".format(self.log_basename), "--log-flags=0x800000"]
199 (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
200 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
201 # when the process truly dies.
202 self.stub_sends_two_stop_notifications_on_kill = True
204 def forward_adb_port(self, source, target, direction, device):
205 adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
206 def remove_port_forward():
207 subprocess.call(adb + [ "--remove", "tcp:%d" % source])
209 subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
210 self.addTearDownHook(remove_port_forward)
212 def create_socket(self):
213 sock = socket.socket()
216 triple = self.dbg.GetSelectedPlatform().GetTriple()
217 if re.match(".*-.*-.*-android", triple):
218 self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
220 connect_info = (self.stub_hostname, self.port)
221 sock.connect(connect_info)
223 def shutdown_socket():
226 # send the kill packet so lldb-server shuts down gracefully
227 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
229 logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
234 logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
236 self.addTearDownHook(shutdown_socket)
240 def set_inferior_startup_launch(self):
241 self._inferior_startup = self._STARTUP_LAUNCH
243 def set_inferior_startup_attach(self):
244 self._inferior_startup = self._STARTUP_ATTACH
246 def set_inferior_startup_attach_manually(self):
247 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
249 def get_debug_monitor_command_line_args(self, attach_pid=None):
250 if lldb.remote_platform:
251 commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
253 commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
256 commandline_args += ["--attach=%d" % attach_pid]
257 if self.named_pipe_path:
258 commandline_args += ["--named-pipe", self.named_pipe_path]
259 return commandline_args
261 def run_platform_command(self, cmd):
262 platform = self.dbg.GetSelectedPlatform()
263 shell_command = lldb.SBPlatformShellCommand(cmd)
264 err = platform.Run(shell_command)
265 return (err, shell_command.GetOutput())
267 def launch_debug_monitor(self, attach_pid=None, logfile=None):
268 # Create the command line.
269 commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
272 server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
273 self.addTearDownHook(self.cleanupSubprocesses)
274 self.assertIsNotNone(server)
276 # If we're receiving the stub's listening port from the named pipe, do that here.
278 self.port = self.get_stub_port_from_named_socket()
282 def connect_to_debug_monitor(self, attach_pid=None):
285 server = self.launch_debug_monitor(attach_pid=attach_pid)
286 self.assertIsNotNone(server)
288 def shutdown_debug_monitor():
292 logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
293 self.addTearDownHook(shutdown_debug_monitor)
295 # Schedule debug monitor to be shut down during teardown.
298 # Attach to the stub and return a socket opened to it.
299 self.sock = self.create_socket()
302 # We're using a random port algorithm to try not to collide with other ports,
303 # and retry a max # times.
307 while attempts < MAX_ATTEMPTS:
308 server = self.launch_debug_monitor(attach_pid=attach_pid)
310 # Schedule debug monitor to be shut down during teardown.
312 def shutdown_debug_monitor():
316 logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
317 self.addTearDownHook(shutdown_debug_monitor)
320 MAX_CONNECT_ATTEMPTS = 10
322 while connect_attemps < MAX_CONNECT_ATTEMPTS:
323 # Create a socket to talk to the server
325 self.sock = self.create_socket()
327 except socket.error as serr:
328 # We're only trying to handle connection refused.
329 if serr.errno != errno.ECONNREFUSED:
334 # We should close the server here to be safe.
337 # Increment attempts.
338 print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
341 # And wait a random length of time before next attempt, to avoid collisions.
342 time.sleep(random.randint(1,5))
344 # Now grab a new port number.
345 self.port = self.get_next_port()
347 raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
349 def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
350 # We're going to start a child process that the debug monitor stub can later attach to.
351 # This process needs to be started so that it just hangs around for a while. We'll
354 exe_path = os.path.abspath("a.out")
358 args.extend(inferior_args)
360 args.append("sleep:%d" % sleep_seconds)
362 inferior = self.spawnSubprocess(exe_path, args)
363 def shutdown_process_for_attach():
367 logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
368 self.addTearDownHook(shutdown_process_for_attach)
371 def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
372 """Prep the debug monitor, the inferior, and the expected packet stream.
374 Handle the separate cases of using the debug monitor in attach-to-inferior mode
375 and in launch-inferior mode.
377 For attach-to-inferior mode, the inferior process is first started, then
378 the debug monitor is started in attach to pid mode (using --attach on the
379 stub command line), and the no-ack-mode setup is appended to the packet
380 stream. The packet stream is not yet executed, ready to have more expected
381 packet entries added to it.
383 For launch-inferior mode, the stub is first started, then no ack mode is
384 setup on the expected packet stream, then the verified launch packets are added
385 to the expected socket stream. The packet stream is not yet executed, ready
386 to have more expected packet entries added to it.
389 {inferior:<inferior>, server:<server>}
394 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
395 # Launch the process that we'll use as the inferior.
396 inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
397 self.assertIsNotNone(inferior)
398 self.assertTrue(inferior.pid > 0)
399 if self._inferior_startup == self._STARTUP_ATTACH:
400 # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
401 attach_pid = inferior.pid
403 if self._inferior_startup == self._STARTUP_LAUNCH:
405 if not inferior_exe_path:
406 inferior_exe_path = os.path.abspath("a.out")
408 if lldb.remote_platform:
409 remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
410 remote_file_spec = lldb.SBFileSpec(remote_path, False)
411 err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
413 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
414 inferior_exe_path = remote_path
416 launch_args = [inferior_exe_path]
418 launch_args.extend(inferior_args)
420 # Launch the debug monitor stub, attaching to the inferior.
421 server = self.connect_to_debug_monitor(attach_pid=attach_pid)
422 self.assertIsNotNone(server)
424 # Build the expected protocol stream
425 self.add_no_ack_remote_stream()
426 if self._inferior_startup == self._STARTUP_LAUNCH:
427 self.add_verified_launch_packets(launch_args)
429 return {"inferior":inferior, "server":server}
431 def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
433 timeout_time = time.time() + timeout_seconds
435 while not expected_content_regex.match(response) and time.time() < timeout_time:
436 can_read, _, _ = select.select([sock], [], [], timeout_seconds)
437 if can_read and sock in can_read:
438 recv_bytes = sock.recv(4096)
440 response += recv_bytes
442 self.assertTrue(expected_content_regex.match(response))
444 def expect_socket_send(self, sock, content, timeout_seconds):
445 request_bytes_remaining = content
446 timeout_time = time.time() + timeout_seconds
448 while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
449 _, can_write, _ = select.select([], [sock], [], timeout_seconds)
450 if can_write and sock in can_write:
451 written_byte_count = sock.send(request_bytes_remaining)
452 request_bytes_remaining = request_bytes_remaining[written_byte_count:]
453 self.assertEqual(len(request_bytes_remaining), 0)
455 def do_handshake(self, stub_socket, timeout_seconds=5):
457 self.expect_socket_send(stub_socket, "+", timeout_seconds)
459 # Send the start no ack mode packet.
460 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
461 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
462 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
464 # Receive the ack and "OK"
465 self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
467 # Send the final ack.
468 self.expect_socket_send(stub_socket, "+", timeout_seconds)
470 def add_no_ack_remote_stream(self):
471 self.test_sequence.add_log_lines(
473 "read packet: $QStartNoAckMode#b0",
475 "send packet: $OK#9a",
479 def add_verified_launch_packets(self, launch_args):
480 self.test_sequence.add_log_lines(
481 ["read packet: %s" % build_gdbremote_A_packet(launch_args),
482 "send packet: $OK#00",
483 "read packet: $qLaunchSuccess#a5",
484 "send packet: $OK#00"],
487 def add_thread_suffix_request_packets(self):
488 self.test_sequence.add_log_lines(
489 ["read packet: $QThreadSuffixSupported#e4",
490 "send packet: $OK#00",
493 def add_process_info_collection_packets(self):
494 self.test_sequence.add_log_lines(
495 ["read packet: $qProcessInfo#dc",
496 { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
499 _KNOWN_PROCESS_INFO_KEYS = [
515 def parse_process_info_response(self, context):
516 # Ensure we have a process info response.
517 self.assertIsNotNone(context)
518 process_info_raw = context.get("process_info_raw")
519 self.assertIsNotNone(process_info_raw)
521 # Pull out key:value; pairs.
522 process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
524 # Validate keys are known.
525 for (key, val) in list(process_info_dict.items()):
526 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
527 self.assertIsNotNone(val)
529 return process_info_dict
531 def add_register_info_collection_packets(self):
532 self.test_sequence.add_log_lines(
533 [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
534 "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
535 "save_key":"reg_info_responses" } ],
538 def parse_register_info_packets(self, context):
539 """Return an array of register info dictionaries, one per register info."""
540 reg_info_responses = context.get("reg_info_responses")
541 self.assertIsNotNone(reg_info_responses)
543 # Parse register infos.
544 return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
546 def expect_gdbremote_sequence(self, timeout_seconds=None):
547 if not timeout_seconds:
548 timeout_seconds = self._TIMEOUT_SECONDS
549 return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, timeout_seconds, self.logger)
551 _KNOWN_REGINFO_KEYS = [
566 def assert_valid_reg_info(self, reg_info):
567 # Assert we know about all the reginfo keys parsed.
569 self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
571 # Check the bare-minimum expected set of register info keys.
572 self.assertTrue("name" in reg_info)
573 self.assertTrue("bitsize" in reg_info)
574 self.assertTrue("offset" in reg_info)
575 self.assertTrue("encoding" in reg_info)
576 self.assertTrue("format" in reg_info)
578 def find_pc_reg_info(self, reg_infos):
580 for reg_info in reg_infos:
581 if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
582 return (lldb_reg_index, reg_info)
587 def add_lldb_register_index(self, reg_infos):
588 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
590 We'll use this when we want to call packets like P/p with a register index but do so
591 on only a subset of the full register info set.
593 self.assertIsNotNone(reg_infos)
596 for reg_info in reg_infos:
597 reg_info["lldb_register_index"] = reg_index
600 def add_query_memory_region_packets(self, address):
601 self.test_sequence.add_log_lines(
602 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
603 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
606 def parse_key_val_dict(self, key_val_text, allow_dupes=True):
607 self.assertIsNotNone(key_val_text)
609 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
614 if type(kv_dict[key]) == list:
615 kv_dict[key].append(val)
618 kv_dict[key] = [kv_dict[key], val]
620 self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
625 def parse_memory_region_packet(self, context):
626 # Ensure we have a context.
627 self.assertIsNotNone(context.get("memory_region_response"))
629 # Pull out key:value; pairs.
630 mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
632 # Validate keys are known.
633 for (key, val) in list(mem_region_dict.items()):
634 self.assertTrue(key in ["start", "size", "permissions", "error"])
635 self.assertIsNotNone(val)
637 # Return the dictionary of key-value pairs for the memory region.
638 return mem_region_dict
640 def assert_address_within_memory_region(self, test_address, mem_region_dict):
641 self.assertIsNotNone(mem_region_dict)
642 self.assertTrue("start" in mem_region_dict)
643 self.assertTrue("size" in mem_region_dict)
645 range_start = int(mem_region_dict["start"], 16)
646 range_size = int(mem_region_dict["size"], 16)
647 range_end = range_start + range_size
649 if test_address < range_start:
650 self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
651 elif test_address >= range_end:
652 self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
654 def add_threadinfo_collection_packets(self):
655 self.test_sequence.add_log_lines(
656 [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
657 "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
658 "save_key":"threadinfo_responses" } ],
661 def parse_threadinfo_packets(self, context):
662 """Return an array of thread ids (decimal ints), one per thread."""
663 threadinfo_responses = context.get("threadinfo_responses")
664 self.assertIsNotNone(threadinfo_responses)
667 for threadinfo_response in threadinfo_responses:
668 new_thread_infos = parse_threadinfo_response(threadinfo_response)
669 thread_ids.extend(new_thread_infos)
672 def wait_for_thread_count(self, thread_count, timeout_seconds=3):
673 start_time = time.time()
674 timeout_time = start_time + timeout_seconds
676 actual_thread_count = 0
677 while actual_thread_count < thread_count:
678 self.reset_test_sequence()
679 self.add_threadinfo_collection_packets()
681 context = self.expect_gdbremote_sequence()
682 self.assertIsNotNone(context)
684 threads = self.parse_threadinfo_packets(context)
685 self.assertIsNotNone(threads)
687 actual_thread_count = len(threads)
689 if time.time() > timeout_time:
691 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
692 timeout_seconds, thread_count, actual_thread_count))
696 def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
697 self.test_sequence.add_log_lines(
698 [# Set the breakpoint.
699 "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
700 # Verify the stub could set it.
701 "send packet: $OK#00",
705 self.test_sequence.add_log_lines(
706 [# Continue the inferior.
707 "read packet: $c#63",
708 # Expect a breakpoint stop report.
709 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
712 def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
713 self.test_sequence.add_log_lines(
714 [# Remove the breakpoint.
715 "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
716 # Verify the stub could unset it.
717 "send packet: $OK#00",
720 def add_qSupported_packets(self):
721 self.test_sequence.add_log_lines(
722 ["read packet: $qSupported#00",
723 {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
726 _KNOWN_QSUPPORTED_STUB_FEATURES = [
727 "augmented-libraries-svr4-read",
730 "QThreadSuffixSupported",
731 "QListThreadsInStopReply",
733 "qXfer:libraries:read",
734 "qXfer:libraries-svr4:read",
735 "qXfer:features:read",
739 def parse_qSupported_response(self, context):
740 self.assertIsNotNone(context)
742 raw_response = context.get("qSupported_response")
743 self.assertIsNotNone(raw_response)
745 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
746 # +,-,? is stripped from the key and set as the value.
748 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
752 # key=val: store as is
753 if val and len(val) > 0:
754 supported_dict[key] = val
757 raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
758 supported_type = key[-1]
760 if not supported_type in ["+", "-", "?"]:
761 raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
762 supported_dict[key] = supported_type
763 # Ensure we know the supported element
764 if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
765 raise Exception("unknown qSupported stub feature reported: %s" % key)
767 return supported_dict
769 def run_process_then_stop(self, run_seconds=1):
770 # Tell the stub to continue.
771 self.test_sequence.add_log_lines(
772 ["read packet: $vCont;c#a8"],
774 context = self.expect_gdbremote_sequence()
776 # Wait for run_seconds.
777 time.sleep(run_seconds)
779 # Send an interrupt, capture a T response.
780 self.reset_test_sequence()
781 self.test_sequence.add_log_lines(
782 ["read packet: {}".format(chr(3)),
783 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
785 context = self.expect_gdbremote_sequence()
786 self.assertIsNotNone(context)
787 self.assertIsNotNone(context.get("stop_result"))
791 def select_modifiable_register(self, reg_infos):
792 """Find a register that can be read/written freely."""
793 PREFERRED_REGISTER_NAMES = set(["rax",])
795 # First check for the first register from the preferred register name set.
796 alternative_register_index = None
798 self.assertIsNotNone(reg_infos)
799 for reg_info in reg_infos:
800 if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
801 # We found a preferred register. Use it.
802 return reg_info["lldb_register_index"]
803 if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
804 # A frame pointer register will do as a register to modify temporarily.
805 alternative_register_index = reg_info["lldb_register_index"]
807 # We didn't find a preferred register. Return whatever alternative register
809 return alternative_register_index
811 def extract_registers_from_stop_notification(self, stop_key_vals_text):
812 self.assertIsNotNone(stop_key_vals_text)
813 kv_dict = self.parse_key_val_dict(stop_key_vals_text)
816 for (key, val) in list(kv_dict.items()):
817 if re.match(r"^[0-9a-fA-F]+$", key):
818 registers[int(key, 16)] = val
821 def gather_register_infos(self):
822 self.reset_test_sequence()
823 self.add_register_info_collection_packets()
825 context = self.expect_gdbremote_sequence()
826 self.assertIsNotNone(context)
828 reg_infos = self.parse_register_info_packets(context)
829 self.assertIsNotNone(reg_infos)
830 self.add_lldb_register_index(reg_infos)
834 def find_generic_register_with_name(self, reg_infos, generic_name):
835 self.assertIsNotNone(reg_infos)
836 for reg_info in reg_infos:
837 if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
841 def decode_gdbremote_binary(self, encoded_bytes):
844 while i < len(encoded_bytes):
845 if encoded_bytes[i] == "}":
846 # Handle escaped char.
847 self.assertTrue(i + 1 < len(encoded_bytes))
848 decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
850 elif encoded_bytes[i] == "*":
851 # Handle run length encoding.
852 self.assertTrue(len(decoded_bytes) > 0)
853 self.assertTrue(i + 1 < len(encoded_bytes))
854 repeat_count = ord(encoded_bytes[i+1]) - 29
855 decoded_bytes += decoded_bytes[-1] * repeat_count
858 decoded_bytes += encoded_bytes[i]
862 def build_auxv_dict(self, endian, word_size, auxv_data):
863 self.assertIsNotNone(endian)
864 self.assertIsNotNone(word_size)
865 self.assertIsNotNone(auxv_data)
869 while len(auxv_data) > 0:
871 raw_key = auxv_data[:word_size]
872 auxv_data = auxv_data[word_size:]
875 raw_value = auxv_data[:word_size]
876 auxv_data = auxv_data[word_size:]
878 # Convert raw text from target endian.
879 key = unpack_endian_binary_string(endian, raw_key)
880 value = unpack_endian_binary_string(endian, raw_value)
882 # Handle ending entry.
884 self.assertEqual(value, 0)
887 # The key should not already be present.
888 self.assertFalse(key in auxv_dict)
889 auxv_dict[key] = value
891 self.fail("should not reach here - implies required double zero entry not found")
894 def read_binary_data_in_chunks(self, command_prefix, chunk_length):
895 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
901 # Grab the next iteration of data.
902 self.reset_test_sequence()
903 self.test_sequence.add_log_lines([
904 "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
905 {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
908 context = self.expect_gdbremote_sequence()
909 self.assertIsNotNone(context)
911 response_type = context.get("response_type")
912 self.assertIsNotNone(response_type)
913 self.assertTrue(response_type in ["l", "m"])
916 offset += chunk_length
918 # Figure out if we're done. We're done if the response type is l.
919 done = response_type == "l"
921 # Decode binary data.
922 content_raw = context.get("content_raw")
923 if content_raw and len(content_raw) > 0:
924 self.assertIsNotNone(content_raw)
925 decoded_data += self.decode_gdbremote_binary(content_raw)
928 def add_interrupt_packets(self):
929 self.test_sequence.add_log_lines([
930 # Send the intterupt.
931 "read packet: {}".format(chr(3)),
932 # And wait for the stop notification.
933 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
936 def parse_interrupt_packets(self, context):
937 self.assertIsNotNone(context.get("stop_signo"))
938 self.assertIsNotNone(context.get("stop_key_val_text"))
939 return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
941 def add_QSaveRegisterState_packets(self, thread_id):
943 # Use the thread suffix form.
944 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
946 request = "read packet: $QSaveRegisterState#00"
948 self.test_sequence.add_log_lines([
950 {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
953 def parse_QSaveRegisterState_response(self, context):
954 self.assertIsNotNone(context)
956 save_response = context.get("save_response")
957 self.assertIsNotNone(save_response)
959 if len(save_response) < 1 or save_response[0] == "E":
963 return (True, int(save_response))
965 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
967 # Use the thread suffix form.
968 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
970 request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
972 self.test_sequence.add_log_lines([
974 "send packet: $OK#00"
977 def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
978 self.assertIsNotNone(reg_infos)
980 successful_writes = 0
983 for reg_info in reg_infos:
984 # Use the lldb register index added to the reg info. We're not necessarily
985 # working off a full set of register infos, so an inferred register index could be wrong.
986 reg_index = reg_info["lldb_register_index"]
987 self.assertIsNotNone(reg_index)
989 reg_byte_size = int(reg_info["bitsize"])/8
990 self.assertTrue(reg_byte_size > 0)
992 # Handle thread suffix.
994 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
996 p_request = "read packet: $p{:x}#00".format(reg_index)
998 # Read the existing value.
999 self.reset_test_sequence()
1000 self.test_sequence.add_log_lines([
1002 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1004 context = self.expect_gdbremote_sequence()
1005 self.assertIsNotNone(context)
1007 # Verify the response length.
1008 p_response = context.get("p_response")
1009 self.assertIsNotNone(p_response)
1010 initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1012 # Flip the value by xoring with all 1s
1013 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
1014 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1015 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1017 # Handle thread suffix for P.
1019 P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1021 P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
1023 # Write the flipped value to the register.
1024 self.reset_test_sequence()
1025 self.test_sequence.add_log_lines([
1027 { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
1029 context = self.expect_gdbremote_sequence()
1030 self.assertIsNotNone(context)
1032 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
1033 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
1034 # all flipping perfectly.
1035 P_response = context.get("P_response")
1036 self.assertIsNotNone(P_response)
1037 if P_response == "OK":
1038 successful_writes += 1
1041 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1043 # Read back the register value, ensure it matches the flipped value.
1044 if P_response == "OK":
1045 self.reset_test_sequence()
1046 self.test_sequence.add_log_lines([
1048 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1050 context = self.expect_gdbremote_sequence()
1051 self.assertIsNotNone(context)
1053 verify_p_response_raw = context.get("p_response")
1054 self.assertIsNotNone(verify_p_response_raw)
1055 verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
1057 if verify_bits != flipped_bits_int:
1058 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
1059 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1060 successful_writes -= 1
1063 return (successful_writes, failed_writes)
1065 def is_bit_flippable_register(self, reg_info):
1068 if not "set" in reg_info:
1070 if reg_info["set"] != "General Purpose Registers":
1072 if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1073 # Don't try to bit flip registers contained in another register.
1075 if re.match("^.s$", reg_info["name"]):
1076 # This is a 2-letter register name that ends in "s", like a segment register.
1077 # Don't try to bit flip these.
1079 if re.match("^(c|)psr$", reg_info["name"]):
1080 # This is an ARM program status register; don't flip it.
1082 # Okay, this looks fine-enough.
1085 def read_register_values(self, reg_infos, endian, thread_id=None):
1086 self.assertIsNotNone(reg_infos)
1089 for reg_info in reg_infos:
1090 # We append a register index when load reg infos so we can work with subsets.
1091 reg_index = reg_info.get("lldb_register_index")
1092 self.assertIsNotNone(reg_index)
1094 # Handle thread suffix.
1096 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1098 p_request = "read packet: $p{:x}#00".format(reg_index)
1101 self.reset_test_sequence()
1102 self.test_sequence.add_log_lines([
1104 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1106 context = self.expect_gdbremote_sequence()
1107 self.assertIsNotNone(context)
1109 # Convert value from target endian to integral.
1110 p_response = context.get("p_response")
1111 self.assertIsNotNone(p_response)
1112 self.assertTrue(len(p_response) > 0)
1113 self.assertFalse(p_response[0] == "E")
1115 values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1119 def add_vCont_query_packets(self):
1120 self.test_sequence.add_log_lines([
1121 "read packet: $vCont?#49",
1122 {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
1125 def parse_vCont_query_response(self, context):
1126 self.assertIsNotNone(context)
1127 vCont_query_response = context.get("vCont_query_response")
1129 # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
1130 if not vCont_query_response or len(vCont_query_response) == 0:
1133 return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
1135 def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
1136 """Used by single step test that appears in a few different contexts."""
1137 single_step_count = 0
1139 while single_step_count < max_step_count:
1140 self.assertIsNotNone(thread_id)
1142 # Build the packet for the single step instruction. We replace {thread}, if present, with the thread_id.
1143 step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1144 # print("\nstep_packet created: {}\n".format(step_packet))
1147 self.reset_test_sequence()
1149 self.test_sequence.add_log_lines(
1150 [# Set the continue thread.
1151 "read packet: $Hc{0:x}#00".format(thread_id),
1152 "send packet: $OK#00",
1154 self.test_sequence.add_log_lines([
1157 # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1158 # Expect a breakpoint stop report.
1159 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
1161 context = self.expect_gdbremote_sequence()
1162 self.assertIsNotNone(context)
1163 self.assertIsNotNone(context.get("stop_signo"))
1164 self.assertEqual(int(context.get("stop_signo"), 16),
1165 lldbutil.get_signal_number('SIGTRAP'))
1167 single_step_count += 1
1169 # See if the predicate is true. If so, we're done.
1171 return (True, single_step_count)
1173 # The predicate didn't return true within the runaway step count.
1174 return (False, single_step_count)
1176 def g_c1_c2_contents_are(self, args):
1177 """Used by single step test that appears in a few different contexts."""
1178 g_c1_address = args["g_c1_address"]
1179 g_c2_address = args["g_c2_address"]
1180 expected_g_c1 = args["expected_g_c1"]
1181 expected_g_c2 = args["expected_g_c2"]
1183 # Read g_c1 and g_c2 contents.
1184 self.reset_test_sequence()
1185 self.test_sequence.add_log_lines(
1186 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1187 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
1188 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1189 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
1192 # Run the packet stream.
1193 context = self.expect_gdbremote_sequence()
1194 self.assertIsNotNone(context)
1196 # Check if what we read from inferior memory is what we are expecting.
1197 self.assertIsNotNone(context.get("g_c1_contents"))
1198 self.assertIsNotNone(context.get("g_c2_contents"))
1200 return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
1202 def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
1203 """Used by single step test that appears in a few different contexts."""
1204 # Start up the inferior.
1205 procs = self.prep_debug_monitor_and_inferior(
1206 inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
1209 self.test_sequence.add_log_lines(
1210 [# Start running after initial stop.
1211 "read packet: $c#63",
1212 # Match output line that prints the memory address of the function call entry point.
1213 # Note we require launch-only testing so we can get inferior otuput.
1214 { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1215 "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
1216 # Now stop the inferior.
1217 "read packet: {}".format(chr(3)),
1218 # And wait for the stop notification.
1219 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
1222 # Run the packet stream.
1223 context = self.expect_gdbremote_sequence()
1224 self.assertIsNotNone(context)
1226 # Grab the main thread id.
1227 self.assertIsNotNone(context.get("stop_thread_id"))
1228 main_thread_id = int(context.get("stop_thread_id"), 16)
1230 # Grab the function address.
1231 self.assertIsNotNone(context.get("function_address"))
1232 function_address = int(context.get("function_address"), 16)
1234 # Grab the data addresses.
1235 self.assertIsNotNone(context.get("g_c1_address"))
1236 g_c1_address = int(context.get("g_c1_address"), 16)
1238 self.assertIsNotNone(context.get("g_c2_address"))
1239 g_c2_address = int(context.get("g_c2_address"), 16)
1241 # Set a breakpoint at the given address.
1242 if self.getArchitecture() == "arm":
1243 # TODO: Handle case when setting breakpoint in thumb code
1247 self.reset_test_sequence()
1248 self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
1249 context = self.expect_gdbremote_sequence()
1250 self.assertIsNotNone(context)
1252 # Remove the breakpoint.
1253 self.reset_test_sequence()
1254 self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
1255 context = self.expect_gdbremote_sequence()
1256 self.assertIsNotNone(context)
1258 # Verify g_c1 and g_c2 match expected initial state.
1260 args["g_c1_address"] = g_c1_address
1261 args["g_c2_address"] = g_c2_address
1262 args["expected_g_c1"] = "0"
1263 args["expected_g_c2"] = "1"
1265 self.assertTrue(self.g_c1_c2_contents_are(args))
1267 # Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code.
1268 args["expected_g_c1"] = "1"
1269 args["expected_g_c2"] = "1"
1270 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1271 self.assertTrue(state_reached)
1273 # Verify we hit the next state.
1274 args["expected_g_c1"] = "1"
1275 args["expected_g_c2"] = "0"
1276 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1277 self.assertTrue(state_reached)
1278 expected_step_count = 1
1279 arch = self.getArchitecture()
1281 #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
1282 if re.match("mips",arch):
1283 expected_step_count = 3
1284 self.assertEqual(step_count, expected_step_count)
1286 # Verify we hit the next state.
1287 args["expected_g_c1"] = "0"
1288 args["expected_g_c2"] = "0"
1289 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1290 self.assertTrue(state_reached)
1291 self.assertEqual(step_count, expected_step_count)
1293 # Verify we hit the next state.
1294 args["expected_g_c1"] = "0"
1295 args["expected_g_c2"] = "1"
1296 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1297 self.assertTrue(state_reached)
1298 self.assertEqual(step_count, expected_step_count)