2 Base class for gdb-remote test cases.
5 from __future__ import print_function
22 from lldbsuite.test import configuration
23 from lldbsuite.test.lldbtest import *
24 from lldbgdbserverutils import *
27 class _ConnectionRefused(IOError):
30 class GdbRemoteTestCaseBase(TestBase):
32 NO_DEBUG_INFO_TESTCASE = True
36 _GDBREMOTE_KILL_PACKET = "$k#6b"
38 # Start the inferior separately, attach to the inferior on the stub command line.
39 _STARTUP_ATTACH = "attach"
40 # Start the inferior separately, start the stub without attaching, allow the test to attach to the inferior however it wants (e.g. $vAttach;pid).
41 _STARTUP_ATTACH_MANUALLY = "attach_manually"
42 # Start the stub, and launch the inferior with an $A packet via the initial packet stream.
43 _STARTUP_LAUNCH = "launch"
45 # GDB Signal numbers that are not target-specific used for common exceptions
46 TARGET_EXC_BAD_ACCESS = 0x91
47 TARGET_EXC_BAD_INSTRUCTION = 0x92
48 TARGET_EXC_ARITHMETIC = 0x93
49 TARGET_EXC_EMULATION = 0x94
50 TARGET_EXC_SOFTWARE = 0x95
51 TARGET_EXC_BREAKPOINT = 0x96
53 _verbose_log_handler = None
54 _log_formatter = logging.Formatter(fmt='%(asctime)-15s %(levelname)-8s %(message)s')
56 def setUpBaseLogging(self):
57 self.logger = logging.getLogger(__name__)
59 if len(self.logger.handlers) > 0:
60 return # We have set up this handler already
62 self.logger.propagate = False
63 self.logger.setLevel(logging.DEBUG)
65 # log all warnings to stderr
66 handler = logging.StreamHandler()
67 handler.setLevel(logging.WARNING)
68 handler.setFormatter(self._log_formatter)
69 self.logger.addHandler(handler)
72 def isVerboseLoggingRequested(self):
73 # We will report our detailed logs if the user requested that the "gdb-remote" channel is
75 return any(("gdb-remote" in channel) for channel in lldbtest_config.channels)
80 self.setUpBaseLogging()
81 self.debug_monitor_extra_args = []
82 self._pump_queues = socket_packet_pump.PumpQueues()
84 if self.isVerboseLoggingRequested():
85 # If requested, full logs go to a log file
86 self._verbose_log_handler = logging.FileHandler(self.log_basename + "-host.log")
87 self._verbose_log_handler.setFormatter(self._log_formatter)
88 self._verbose_log_handler.setLevel(logging.DEBUG)
89 self.logger.addHandler(self._verbose_log_handler)
91 self.test_sequence = GdbRemoteTestSequence(self.logger)
92 self.set_inferior_startup_launch()
93 self.port = self.get_next_port()
94 self.named_pipe_path = None
95 self.named_pipe = None
96 self.named_pipe_fd = None
97 self.stub_sends_two_stop_notifications_on_kill = False
98 if configuration.lldb_platform_url:
99 if configuration.lldb_platform_url.startswith('unix-'):
100 url_pattern = '(.+)://\[?(.+?)\]?/.*'
102 url_pattern = '(.+)://(.+):\d+'
103 scheme, host = re.match(url_pattern, configuration.lldb_platform_url).groups()
104 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
105 self.stub_device = host
106 self.stub_hostname = 'localhost'
108 self.stub_device = None
109 self.stub_hostname = host
111 self.stub_hostname = "localhost"
114 self._pump_queues.verify_queues_empty()
116 self.logger.removeHandler(self._verbose_log_handler)
117 self._verbose_log_handler = None
118 TestBase.tearDown(self)
120 def getLocalServerLogFile(self):
121 return self.log_basename + "-server.log"
123 def setUpServerLogging(self, is_llgs):
124 if len(lldbtest_config.channels) == 0:
125 return # No logging requested
127 if lldb.remote_platform:
128 log_file = lldbutil.join_remote_paths(lldb.remote_platform.GetWorkingDirectory(), "server.log")
130 log_file = self.getLocalServerLogFile()
133 self.debug_monitor_extra_args.append("--log-file=" + log_file)
134 self.debug_monitor_extra_args.append("--log-channels={}".format(":".join(lldbtest_config.channels)))
136 self.debug_monitor_extra_args = ["--log-file=" + self.log_file, "--log-flags=0x800000"]
138 def get_next_port(self):
139 return 12000 + random.randint(0,3999)
141 def reset_test_sequence(self):
142 self.test_sequence = GdbRemoteTestSequence(self.logger)
144 def create_named_pipe(self):
145 # Create a temp dir and name for a pipe.
146 temp_dir = tempfile.mkdtemp()
147 named_pipe_path = os.path.join(temp_dir, "stub_port_number")
149 # Create the named pipe.
150 os.mkfifo(named_pipe_path)
152 # Open the read side of the pipe in non-blocking mode. This will return right away, ready or not.
153 named_pipe_fd = os.open(named_pipe_path, os.O_RDONLY | os.O_NONBLOCK)
155 # Create the file for the named pipe. Note this will follow semantics of
156 # a non-blocking read side of a named pipe, which has different semantics
157 # than a named pipe opened for read in non-blocking mode.
158 named_pipe = os.fdopen(named_pipe_fd, "r")
159 self.assertIsNotNone(named_pipe)
161 def shutdown_named_pipe():
166 print("failed to close named pipe")
171 os.remove(named_pipe_path)
173 print("failed to delete named pipe: {}".format(named_pipe_path))
176 # Delete the temp directory.
180 print("failed to delete temp dir: {}, directory contents: '{}'".format(temp_dir, os.listdir(temp_dir)))
183 # Add the shutdown hook to clean up the named pipe.
184 self.addTearDownHook(shutdown_named_pipe)
186 # Clear the port so the stub selects a port number.
189 return (named_pipe_path, named_pipe, named_pipe_fd)
191 def get_stub_port_from_named_socket(self, read_timeout_seconds=5):
192 # Wait for something to read with a max timeout.
193 (ready_readers, _, _) = select.select([self.named_pipe_fd], [], [], read_timeout_seconds)
194 self.assertIsNotNone(ready_readers, "write side of pipe has not written anything - stub isn't writing to pipe.")
195 self.assertNotEqual(len(ready_readers), 0, "write side of pipe has not written anything - stub isn't writing to pipe.")
197 # Read the port from the named pipe.
198 stub_port_raw = self.named_pipe.read()
199 self.assertIsNotNone(stub_port_raw)
200 self.assertNotEqual(len(stub_port_raw), 0, "no content to read on pipe")
202 # Trim null byte, convert to int.
203 stub_port_raw = stub_port_raw[:-1]
204 stub_port = int(stub_port_raw)
205 self.assertTrue(stub_port > 0)
209 def init_llgs_test(self, use_named_pipe=True):
210 if lldb.remote_platform:
211 # Remote platforms don't support named pipe based port negotiation
212 use_named_pipe = False
214 # Grab the ppid from /proc/[shell pid]/stat
215 err, retcode, shell_stat = self.run_platform_command("cat /proc/$$/stat")
216 self.assertTrue(err.Success() and retcode == 0,
217 "Failed to read file /proc/$$/stat: %s, retcode: %d" % (err.GetCString(), retcode))
219 # [pid] ([executable]) [state] [*ppid*]
220 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
221 err, retcode, ls_output = self.run_platform_command("ls -l /proc/%s/exe" % pid)
222 self.assertTrue(err.Success() and retcode == 0,
223 "Failed to read file /proc/%s/exe: %s, retcode: %d" % (pid, err.GetCString(), retcode))
224 exe = ls_output.split()[-1]
226 # If the binary has been deleted, the link name has " (deleted)" appended.
227 # Remove if it's there.
228 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
230 self.debug_monitor_exe = get_lldb_server_exe()
231 if not self.debug_monitor_exe:
232 self.skipTest("lldb-server exe not found")
234 self.debug_monitor_extra_args = ["gdbserver"]
235 self.setUpServerLogging(is_llgs=True)
238 (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
240 def init_debugserver_test(self, use_named_pipe=True):
241 self.debug_monitor_exe = get_debugserver_exe()
242 if not self.debug_monitor_exe:
243 self.skipTest("debugserver exe not found")
244 self.setUpServerLogging(is_llgs=False)
246 (self.named_pipe_path, self.named_pipe, self.named_pipe_fd) = self.create_named_pipe()
247 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
248 # when the process truly dies.
249 self.stub_sends_two_stop_notifications_on_kill = True
251 def forward_adb_port(self, source, target, direction, device):
252 adb = [ 'adb' ] + ([ '-s', device ] if device else []) + [ direction ]
253 def remove_port_forward():
254 subprocess.call(adb + [ "--remove", "tcp:%d" % source])
256 subprocess.call(adb + [ "tcp:%d" % source, "tcp:%d" % target])
257 self.addTearDownHook(remove_port_forward)
259 def _verify_socket(self, sock):
260 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
261 # connect() attempt. However, due to the way how ADB forwarding works, on android targets
262 # the connect() will always be successful, but the connection will be immediately dropped
263 # if ADB could not connect on the remote side. This function tries to detect this
264 # situation, and report it as "connection refused" so that the upper layers attempt the
266 triple = self.dbg.GetSelectedPlatform().GetTriple()
267 if not re.match(".*-.*-.*-android", triple):
268 return # Not android.
269 can_read, _, _ = select.select([sock], [], [], 0.1)
270 if sock not in can_read:
271 return # Data is not available, but the connection is alive.
272 if len(sock.recv(1, socket.MSG_PEEK)) == 0:
273 raise _ConnectionRefused() # Got EOF, connection dropped.
275 def create_socket(self):
276 sock = socket.socket()
279 triple = self.dbg.GetSelectedPlatform().GetTriple()
280 if re.match(".*-.*-.*-android", triple):
281 self.forward_adb_port(self.port, self.port, "forward", self.stub_device)
283 logger.info("Connecting to debug monitor on %s:%d", self.stub_hostname, self.port)
284 connect_info = (self.stub_hostname, self.port)
286 sock.connect(connect_info)
287 except socket.error as serr:
288 if serr.errno == errno.ECONNREFUSED:
289 raise _ConnectionRefused()
292 def shutdown_socket():
295 # send the kill packet so lldb-server shuts down gracefully
296 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
298 logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
303 logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
305 self.addTearDownHook(shutdown_socket)
307 self._verify_socket(sock)
311 def set_inferior_startup_launch(self):
312 self._inferior_startup = self._STARTUP_LAUNCH
314 def set_inferior_startup_attach(self):
315 self._inferior_startup = self._STARTUP_ATTACH
317 def set_inferior_startup_attach_manually(self):
318 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
320 def get_debug_monitor_command_line_args(self, attach_pid=None):
321 if lldb.remote_platform:
322 commandline_args = self.debug_monitor_extra_args + ["*:{}".format(self.port)]
324 commandline_args = self.debug_monitor_extra_args + ["localhost:{}".format(self.port)]
327 commandline_args += ["--attach=%d" % attach_pid]
328 if self.named_pipe_path:
329 commandline_args += ["--named-pipe", self.named_pipe_path]
330 return commandline_args
332 def launch_debug_monitor(self, attach_pid=None, logfile=None):
333 # Create the command line.
334 commandline_args = self.get_debug_monitor_command_line_args(attach_pid=attach_pid)
337 server = self.spawnSubprocess(self.debug_monitor_exe, commandline_args, install_remote=False)
338 self.addTearDownHook(self.cleanupSubprocesses)
339 self.assertIsNotNone(server)
341 # If we're receiving the stub's listening port from the named pipe, do that here.
343 self.port = self.get_stub_port_from_named_socket()
347 def connect_to_debug_monitor(self, attach_pid=None):
350 server = self.launch_debug_monitor(attach_pid=attach_pid)
351 self.assertIsNotNone(server)
353 def shutdown_debug_monitor():
357 logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
358 self.addTearDownHook(shutdown_debug_monitor)
360 # Schedule debug monitor to be shut down during teardown.
363 # Attach to the stub and return a socket opened to it.
364 self.sock = self.create_socket()
367 # We're using a random port algorithm to try not to collide with other ports,
368 # and retry a max # times.
372 while attempts < MAX_ATTEMPTS:
373 server = self.launch_debug_monitor(attach_pid=attach_pid)
375 # Schedule debug monitor to be shut down during teardown.
377 def shutdown_debug_monitor():
381 logger.warning("failed to terminate server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
382 self.addTearDownHook(shutdown_debug_monitor)
385 MAX_CONNECT_ATTEMPTS = 10
387 while connect_attemps < MAX_CONNECT_ATTEMPTS:
388 # Create a socket to talk to the server
390 logger.info("Connect attempt %d", connect_attemps+1)
391 self.sock = self.create_socket()
393 except _ConnectionRefused as serr:
394 # Ignore, and try again.
399 # We should close the server here to be safe.
402 # Increment attempts.
403 print("connect to debug monitor on port %d failed, attempt #%d of %d" % (self.port, attempts + 1, MAX_ATTEMPTS))
406 # And wait a random length of time before next attempt, to avoid collisions.
407 time.sleep(random.randint(1,5))
409 # Now grab a new port number.
410 self.port = self.get_next_port()
412 raise Exception("failed to create a socket to the launched debug monitor after %d tries" % attempts)
414 def launch_process_for_attach(self, inferior_args=None, sleep_seconds=3, exe_path=None):
415 # We're going to start a child process that the debug monitor stub can later attach to.
416 # This process needs to be started so that it just hangs around for a while. We'll
419 exe_path = os.path.abspath("a.out")
423 args.extend(inferior_args)
425 args.append("sleep:%d" % sleep_seconds)
427 inferior = self.spawnSubprocess(exe_path, args)
428 def shutdown_process_for_attach():
432 logger.warning("failed to terminate inferior process for attach: {}; ignoring".format(sys.exc_info()[0]))
433 self.addTearDownHook(shutdown_process_for_attach)
436 def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3, inferior_exe_path=None):
437 """Prep the debug monitor, the inferior, and the expected packet stream.
439 Handle the separate cases of using the debug monitor in attach-to-inferior mode
440 and in launch-inferior mode.
442 For attach-to-inferior mode, the inferior process is first started, then
443 the debug monitor is started in attach to pid mode (using --attach on the
444 stub command line), and the no-ack-mode setup is appended to the packet
445 stream. The packet stream is not yet executed, ready to have more expected
446 packet entries added to it.
448 For launch-inferior mode, the stub is first started, then no ack mode is
449 setup on the expected packet stream, then the verified launch packets are added
450 to the expected socket stream. The packet stream is not yet executed, ready
451 to have more expected packet entries added to it.
454 {inferior:<inferior>, server:<server>}
459 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
460 # Launch the process that we'll use as the inferior.
461 inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds, exe_path=inferior_exe_path)
462 self.assertIsNotNone(inferior)
463 self.assertTrue(inferior.pid > 0)
464 if self._inferior_startup == self._STARTUP_ATTACH:
465 # In this case, we want the stub to attach via the command line, so set the command line attach pid here.
466 attach_pid = inferior.pid
468 if self._inferior_startup == self._STARTUP_LAUNCH:
470 if not inferior_exe_path:
471 inferior_exe_path = os.path.abspath("a.out")
473 if lldb.remote_platform:
474 remote_path = lldbutil.append_to_process_working_directory(os.path.basename(inferior_exe_path))
475 remote_file_spec = lldb.SBFileSpec(remote_path, False)
476 err = lldb.remote_platform.Install(lldb.SBFileSpec(inferior_exe_path, True), remote_file_spec)
478 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % (inferior_exe_path, remote_path, err))
479 inferior_exe_path = remote_path
481 launch_args = [inferior_exe_path]
483 launch_args.extend(inferior_args)
485 # Launch the debug monitor stub, attaching to the inferior.
486 server = self.connect_to_debug_monitor(attach_pid=attach_pid)
487 self.assertIsNotNone(server)
489 # Build the expected protocol stream
490 self.add_no_ack_remote_stream()
491 if self._inferior_startup == self._STARTUP_LAUNCH:
492 self.add_verified_launch_packets(launch_args)
494 return {"inferior":inferior, "server":server}
496 def expect_socket_recv(self, sock, expected_content_regex, timeout_seconds):
498 timeout_time = time.time() + timeout_seconds
500 while not expected_content_regex.match(response) and time.time() < timeout_time:
501 can_read, _, _ = select.select([sock], [], [], timeout_seconds)
502 if can_read and sock in can_read:
503 recv_bytes = sock.recv(4096)
505 response += recv_bytes
507 self.assertTrue(expected_content_regex.match(response))
509 def expect_socket_send(self, sock, content, timeout_seconds):
510 request_bytes_remaining = content
511 timeout_time = time.time() + timeout_seconds
513 while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
514 _, can_write, _ = select.select([], [sock], [], timeout_seconds)
515 if can_write and sock in can_write:
516 written_byte_count = sock.send(request_bytes_remaining)
517 request_bytes_remaining = request_bytes_remaining[written_byte_count:]
518 self.assertEqual(len(request_bytes_remaining), 0)
520 def do_handshake(self, stub_socket, timeout_seconds=5):
522 self.expect_socket_send(stub_socket, "+", timeout_seconds)
524 # Send the start no ack mode packet.
525 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
526 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST)
527 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
529 # Receive the ack and "OK"
530 self.expect_socket_recv(stub_socket, re.compile(r"^\+\$OK#[0-9a-fA-F]{2}$"), timeout_seconds)
532 # Send the final ack.
533 self.expect_socket_send(stub_socket, "+", timeout_seconds)
535 def add_no_ack_remote_stream(self):
536 self.test_sequence.add_log_lines(
538 "read packet: $QStartNoAckMode#b0",
540 "send packet: $OK#9a",
544 def add_verified_launch_packets(self, launch_args):
545 self.test_sequence.add_log_lines(
546 ["read packet: %s" % build_gdbremote_A_packet(launch_args),
547 "send packet: $OK#00",
548 "read packet: $qLaunchSuccess#a5",
549 "send packet: $OK#00"],
552 def add_thread_suffix_request_packets(self):
553 self.test_sequence.add_log_lines(
554 ["read packet: $QThreadSuffixSupported#e4",
555 "send packet: $OK#00",
558 def add_process_info_collection_packets(self):
559 self.test_sequence.add_log_lines(
560 ["read packet: $qProcessInfo#dc",
561 { "direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"process_info_raw"} }],
564 _KNOWN_PROCESS_INFO_KEYS = [
580 def parse_process_info_response(self, context):
581 # Ensure we have a process info response.
582 self.assertIsNotNone(context)
583 process_info_raw = context.get("process_info_raw")
584 self.assertIsNotNone(process_info_raw)
586 # Pull out key:value; pairs.
587 process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
589 # Validate keys are known.
590 for (key, val) in list(process_info_dict.items()):
591 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
592 self.assertIsNotNone(val)
594 return process_info_dict
596 def add_register_info_collection_packets(self):
597 self.test_sequence.add_log_lines(
598 [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
599 "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
600 "save_key":"reg_info_responses" } ],
603 def parse_register_info_packets(self, context):
604 """Return an array of register info dictionaries, one per register info."""
605 reg_info_responses = context.get("reg_info_responses")
606 self.assertIsNotNone(reg_info_responses)
608 # Parse register infos.
609 return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
611 def expect_gdbremote_sequence(self, timeout_seconds=None):
612 if not timeout_seconds:
613 timeout_seconds = self._TIMEOUT_SECONDS
614 return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence,
615 self._pump_queues, timeout_seconds, self.logger)
617 _KNOWN_REGINFO_KEYS = [
633 def assert_valid_reg_info(self, reg_info):
634 # Assert we know about all the reginfo keys parsed.
636 self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
638 # Check the bare-minimum expected set of register info keys.
639 self.assertTrue("name" in reg_info)
640 self.assertTrue("bitsize" in reg_info)
641 self.assertTrue("offset" in reg_info)
642 self.assertTrue("encoding" in reg_info)
643 self.assertTrue("format" in reg_info)
645 def find_pc_reg_info(self, reg_infos):
647 for reg_info in reg_infos:
648 if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
649 return (lldb_reg_index, reg_info)
654 def add_lldb_register_index(self, reg_infos):
655 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
657 We'll use this when we want to call packets like P/p with a register index but do so
658 on only a subset of the full register info set.
660 self.assertIsNotNone(reg_infos)
663 for reg_info in reg_infos:
664 reg_info["lldb_register_index"] = reg_index
667 def add_query_memory_region_packets(self, address):
668 self.test_sequence.add_log_lines(
669 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
670 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
673 def parse_key_val_dict(self, key_val_text, allow_dupes=True):
674 self.assertIsNotNone(key_val_text)
676 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
681 if type(kv_dict[key]) == list:
682 kv_dict[key].append(val)
685 kv_dict[key] = [kv_dict[key], val]
687 self.fail("key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(key, val, key_val_text, kv_dict))
692 def parse_memory_region_packet(self, context):
693 # Ensure we have a context.
694 self.assertIsNotNone(context.get("memory_region_response"))
696 # Pull out key:value; pairs.
697 mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
699 # Validate keys are known.
700 for (key, val) in list(mem_region_dict.items()):
701 self.assertTrue(key in ["start", "size", "permissions", "error"])
702 self.assertIsNotNone(val)
704 # Return the dictionary of key-value pairs for the memory region.
705 return mem_region_dict
707 def assert_address_within_memory_region(self, test_address, mem_region_dict):
708 self.assertIsNotNone(mem_region_dict)
709 self.assertTrue("start" in mem_region_dict)
710 self.assertTrue("size" in mem_region_dict)
712 range_start = int(mem_region_dict["start"], 16)
713 range_size = int(mem_region_dict["size"], 16)
714 range_end = range_start + range_size
716 if test_address < range_start:
717 self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
718 elif test_address >= range_end:
719 self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
721 def add_threadinfo_collection_packets(self):
722 self.test_sequence.add_log_lines(
723 [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
724 "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
725 "save_key":"threadinfo_responses" } ],
728 def parse_threadinfo_packets(self, context):
729 """Return an array of thread ids (decimal ints), one per thread."""
730 threadinfo_responses = context.get("threadinfo_responses")
731 self.assertIsNotNone(threadinfo_responses)
734 for threadinfo_response in threadinfo_responses:
735 new_thread_infos = parse_threadinfo_response(threadinfo_response)
736 thread_ids.extend(new_thread_infos)
739 def wait_for_thread_count(self, thread_count, timeout_seconds=3):
740 start_time = time.time()
741 timeout_time = start_time + timeout_seconds
743 actual_thread_count = 0
744 while actual_thread_count < thread_count:
745 self.reset_test_sequence()
746 self.add_threadinfo_collection_packets()
748 context = self.expect_gdbremote_sequence()
749 self.assertIsNotNone(context)
751 threads = self.parse_threadinfo_packets(context)
752 self.assertIsNotNone(threads)
754 actual_thread_count = len(threads)
756 if time.time() > timeout_time:
758 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
759 timeout_seconds, thread_count, actual_thread_count))
763 def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
764 self.test_sequence.add_log_lines(
765 [# Set the breakpoint.
766 "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
767 # Verify the stub could set it.
768 "send packet: $OK#00",
772 self.test_sequence.add_log_lines(
773 [# Continue the inferior.
774 "read packet: $c#63",
775 # Expect a breakpoint stop report.
776 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
779 def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
780 self.test_sequence.add_log_lines(
781 [# Remove the breakpoint.
782 "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
783 # Verify the stub could unset it.
784 "send packet: $OK#00",
787 def add_qSupported_packets(self):
788 self.test_sequence.add_log_lines(
789 ["read packet: $qSupported#00",
790 {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
793 _KNOWN_QSUPPORTED_STUB_FEATURES = [
794 "augmented-libraries-svr4-read",
797 "QThreadSuffixSupported",
798 "QListThreadsInStopReply",
800 "qXfer:libraries:read",
801 "qXfer:libraries-svr4:read",
802 "qXfer:features:read",
806 def parse_qSupported_response(self, context):
807 self.assertIsNotNone(context)
809 raw_response = context.get("qSupported_response")
810 self.assertIsNotNone(raw_response)
812 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
813 # +,-,? is stripped from the key and set as the value.
815 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
819 # key=val: store as is
820 if val and len(val) > 0:
821 supported_dict[key] = val
824 raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
825 supported_type = key[-1]
827 if not supported_type in ["+", "-", "?"]:
828 raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
829 supported_dict[key] = supported_type
830 # Ensure we know the supported element
831 if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
832 raise Exception("unknown qSupported stub feature reported: %s" % key)
834 return supported_dict
836 def run_process_then_stop(self, run_seconds=1):
837 # Tell the stub to continue.
838 self.test_sequence.add_log_lines(
839 ["read packet: $vCont;c#a8"],
841 context = self.expect_gdbremote_sequence()
843 # Wait for run_seconds.
844 time.sleep(run_seconds)
846 # Send an interrupt, capture a T response.
847 self.reset_test_sequence()
848 self.test_sequence.add_log_lines(
849 ["read packet: {}".format(chr(3)),
850 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
852 context = self.expect_gdbremote_sequence()
853 self.assertIsNotNone(context)
854 self.assertIsNotNone(context.get("stop_result"))
858 def select_modifiable_register(self, reg_infos):
859 """Find a register that can be read/written freely."""
860 PREFERRED_REGISTER_NAMES = set(["rax",])
862 # First check for the first register from the preferred register name set.
863 alternative_register_index = None
865 self.assertIsNotNone(reg_infos)
866 for reg_info in reg_infos:
867 if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
868 # We found a preferred register. Use it.
869 return reg_info["lldb_register_index"]
870 if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
871 # A frame pointer register will do as a register to modify temporarily.
872 alternative_register_index = reg_info["lldb_register_index"]
874 # We didn't find a preferred register. Return whatever alternative register
876 return alternative_register_index
878 def extract_registers_from_stop_notification(self, stop_key_vals_text):
879 self.assertIsNotNone(stop_key_vals_text)
880 kv_dict = self.parse_key_val_dict(stop_key_vals_text)
883 for (key, val) in list(kv_dict.items()):
884 if re.match(r"^[0-9a-fA-F]+$", key):
885 registers[int(key, 16)] = val
888 def gather_register_infos(self):
889 self.reset_test_sequence()
890 self.add_register_info_collection_packets()
892 context = self.expect_gdbremote_sequence()
893 self.assertIsNotNone(context)
895 reg_infos = self.parse_register_info_packets(context)
896 self.assertIsNotNone(reg_infos)
897 self.add_lldb_register_index(reg_infos)
901 def find_generic_register_with_name(self, reg_infos, generic_name):
902 self.assertIsNotNone(reg_infos)
903 for reg_info in reg_infos:
904 if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
908 def decode_gdbremote_binary(self, encoded_bytes):
911 while i < len(encoded_bytes):
912 if encoded_bytes[i] == "}":
913 # Handle escaped char.
914 self.assertTrue(i + 1 < len(encoded_bytes))
915 decoded_bytes += chr(ord(encoded_bytes[i+1]) ^ 0x20)
917 elif encoded_bytes[i] == "*":
918 # Handle run length encoding.
919 self.assertTrue(len(decoded_bytes) > 0)
920 self.assertTrue(i + 1 < len(encoded_bytes))
921 repeat_count = ord(encoded_bytes[i+1]) - 29
922 decoded_bytes += decoded_bytes[-1] * repeat_count
925 decoded_bytes += encoded_bytes[i]
929 def build_auxv_dict(self, endian, word_size, auxv_data):
930 self.assertIsNotNone(endian)
931 self.assertIsNotNone(word_size)
932 self.assertIsNotNone(auxv_data)
936 while len(auxv_data) > 0:
938 raw_key = auxv_data[:word_size]
939 auxv_data = auxv_data[word_size:]
942 raw_value = auxv_data[:word_size]
943 auxv_data = auxv_data[word_size:]
945 # Convert raw text from target endian.
946 key = unpack_endian_binary_string(endian, raw_key)
947 value = unpack_endian_binary_string(endian, raw_value)
949 # Handle ending entry.
951 self.assertEqual(value, 0)
954 # The key should not already be present.
955 self.assertFalse(key in auxv_dict)
956 auxv_dict[key] = value
958 self.fail("should not reach here - implies required double zero entry not found")
961 def read_binary_data_in_chunks(self, command_prefix, chunk_length):
962 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
968 # Grab the next iteration of data.
969 self.reset_test_sequence()
970 self.test_sequence.add_log_lines([
971 "read packet: ${}{:x},{:x}:#00".format(command_prefix, offset, chunk_length),
972 {"direction":"send", "regex":re.compile(r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", re.MULTILINE|re.DOTALL), "capture":{1:"response_type", 2:"content_raw"} }
975 context = self.expect_gdbremote_sequence()
976 self.assertIsNotNone(context)
978 response_type = context.get("response_type")
979 self.assertIsNotNone(response_type)
980 self.assertTrue(response_type in ["l", "m"])
983 offset += chunk_length
985 # Figure out if we're done. We're done if the response type is l.
986 done = response_type == "l"
988 # Decode binary data.
989 content_raw = context.get("content_raw")
990 if content_raw and len(content_raw) > 0:
991 self.assertIsNotNone(content_raw)
992 decoded_data += self.decode_gdbremote_binary(content_raw)
995 def add_interrupt_packets(self):
996 self.test_sequence.add_log_lines([
997 # Send the intterupt.
998 "read packet: {}".format(chr(3)),
999 # And wait for the stop notification.
1000 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", "capture":{1:"stop_signo", 2:"stop_key_val_text" } },
1003 def parse_interrupt_packets(self, context):
1004 self.assertIsNotNone(context.get("stop_signo"))
1005 self.assertIsNotNone(context.get("stop_key_val_text"))
1006 return (int(context["stop_signo"], 16), self.parse_key_val_dict(context["stop_key_val_text"]))
1008 def add_QSaveRegisterState_packets(self, thread_id):
1010 # Use the thread suffix form.
1011 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(thread_id)
1013 request = "read packet: $QSaveRegisterState#00"
1015 self.test_sequence.add_log_lines([
1017 {"direction":"send", "regex":r"^\$(E?.*)#[0-9a-fA-F]{2}$", "capture":{1:"save_response" } },
1020 def parse_QSaveRegisterState_response(self, context):
1021 self.assertIsNotNone(context)
1023 save_response = context.get("save_response")
1024 self.assertIsNotNone(save_response)
1026 if len(save_response) < 1 or save_response[0] == "E":
1028 return (False, None)
1030 return (True, int(save_response))
1032 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1034 # Use the thread suffix form.
1035 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(save_id, thread_id)
1037 request = "read packet: $QRestoreRegisterState:{}#00".format(save_id)
1039 self.test_sequence.add_log_lines([
1041 "send packet: $OK#00"
1044 def flip_all_bits_in_each_register_value(self, reg_infos, endian, thread_id=None):
1045 self.assertIsNotNone(reg_infos)
1047 successful_writes = 0
1050 for reg_info in reg_infos:
1051 # Use the lldb register index added to the reg info. We're not necessarily
1052 # working off a full set of register infos, so an inferred register index could be wrong.
1053 reg_index = reg_info["lldb_register_index"]
1054 self.assertIsNotNone(reg_index)
1056 reg_byte_size = int(reg_info["bitsize"])/8
1057 self.assertTrue(reg_byte_size > 0)
1059 # Handle thread suffix.
1061 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1063 p_request = "read packet: $p{:x}#00".format(reg_index)
1065 # Read the existing value.
1066 self.reset_test_sequence()
1067 self.test_sequence.add_log_lines([
1069 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1071 context = self.expect_gdbremote_sequence()
1072 self.assertIsNotNone(context)
1074 # Verify the response length.
1075 p_response = context.get("p_response")
1076 self.assertIsNotNone(p_response)
1077 initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
1079 # Flip the value by xoring with all 1s
1080 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
1081 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1082 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1084 # Handle thread suffix for P.
1086 P_request = "read packet: $P{:x}={};thread:{:x}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1088 P_request = "read packet: $P{:x}={}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size))
1090 # Write the flipped value to the register.
1091 self.reset_test_sequence()
1092 self.test_sequence.add_log_lines([
1094 { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
1096 context = self.expect_gdbremote_sequence()
1097 self.assertIsNotNone(context)
1099 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail
1100 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them
1101 # all flipping perfectly.
1102 P_response = context.get("P_response")
1103 self.assertIsNotNone(P_response)
1104 if P_response == "OK":
1105 successful_writes += 1
1108 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1110 # Read back the register value, ensure it matches the flipped value.
1111 if P_response == "OK":
1112 self.reset_test_sequence()
1113 self.test_sequence.add_log_lines([
1115 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1117 context = self.expect_gdbremote_sequence()
1118 self.assertIsNotNone(context)
1120 verify_p_response_raw = context.get("p_response")
1121 self.assertIsNotNone(verify_p_response_raw)
1122 verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
1124 if verify_bits != flipped_bits_int:
1125 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
1126 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1127 successful_writes -= 1
1130 return (successful_writes, failed_writes)
1132 def is_bit_flippable_register(self, reg_info):
1135 if not "set" in reg_info:
1137 if reg_info["set"] != "General Purpose Registers":
1139 if ("container-regs" in reg_info) and (len(reg_info["container-regs"]) > 0):
1140 # Don't try to bit flip registers contained in another register.
1142 if re.match("^.s$", reg_info["name"]):
1143 # This is a 2-letter register name that ends in "s", like a segment register.
1144 # Don't try to bit flip these.
1146 if re.match("^(c|)psr$", reg_info["name"]):
1147 # This is an ARM program status register; don't flip it.
1149 # Okay, this looks fine-enough.
1152 def read_register_values(self, reg_infos, endian, thread_id=None):
1153 self.assertIsNotNone(reg_infos)
1156 for reg_info in reg_infos:
1157 # We append a register index when load reg infos so we can work with subsets.
1158 reg_index = reg_info.get("lldb_register_index")
1159 self.assertIsNotNone(reg_index)
1161 # Handle thread suffix.
1163 p_request = "read packet: $p{:x};thread:{:x}#00".format(reg_index, thread_id)
1165 p_request = "read packet: $p{:x}#00".format(reg_index)
1168 self.reset_test_sequence()
1169 self.test_sequence.add_log_lines([
1171 { "direction":"send", "regex":r"^\$([0-9a-fA-F]+)#", "capture":{1:"p_response"} },
1173 context = self.expect_gdbremote_sequence()
1174 self.assertIsNotNone(context)
1176 # Convert value from target endian to integral.
1177 p_response = context.get("p_response")
1178 self.assertIsNotNone(p_response)
1179 self.assertTrue(len(p_response) > 0)
1180 self.assertFalse(p_response[0] == "E")
1182 values[reg_index] = unpack_register_hex_unsigned(endian, p_response)
1186 def add_vCont_query_packets(self):
1187 self.test_sequence.add_log_lines([
1188 "read packet: $vCont?#49",
1189 {"direction":"send", "regex":r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", "capture":{2:"vCont_query_response" } },
1192 def parse_vCont_query_response(self, context):
1193 self.assertIsNotNone(context)
1194 vCont_query_response = context.get("vCont_query_response")
1196 # Handle case of no vCont support at all - in which case the capture group will be none or zero length.
1197 if not vCont_query_response or len(vCont_query_response) == 0:
1200 return {key:1 for key in vCont_query_response.split(";") if key and len(key) > 0}
1202 def count_single_steps_until_true(self, thread_id, predicate, args, max_step_count=100, use_Hc_packet=True, step_instruction="s"):
1203 """Used by single step test that appears in a few different contexts."""
1204 single_step_count = 0
1206 while single_step_count < max_step_count:
1207 self.assertIsNotNone(thread_id)
1209 # Build the packet for the single step instruction. We replace {thread}, if present, with the thread_id.
1210 step_packet = "read packet: ${}#00".format(re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1211 # print("\nstep_packet created: {}\n".format(step_packet))
1214 self.reset_test_sequence()
1216 self.test_sequence.add_log_lines(
1217 [# Set the continue thread.
1218 "read packet: $Hc{0:x}#00".format(thread_id),
1219 "send packet: $OK#00",
1221 self.test_sequence.add_log_lines([
1224 # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1225 # Expect a breakpoint stop report.
1226 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
1228 context = self.expect_gdbremote_sequence()
1229 self.assertIsNotNone(context)
1230 self.assertIsNotNone(context.get("stop_signo"))
1231 self.assertEqual(int(context.get("stop_signo"), 16),
1232 lldbutil.get_signal_number('SIGTRAP'))
1234 single_step_count += 1
1236 # See if the predicate is true. If so, we're done.
1238 return (True, single_step_count)
1240 # The predicate didn't return true within the runaway step count.
1241 return (False, single_step_count)
1243 def g_c1_c2_contents_are(self, args):
1244 """Used by single step test that appears in a few different contexts."""
1245 g_c1_address = args["g_c1_address"]
1246 g_c2_address = args["g_c2_address"]
1247 expected_g_c1 = args["expected_g_c1"]
1248 expected_g_c2 = args["expected_g_c2"]
1250 # Read g_c1 and g_c2 contents.
1251 self.reset_test_sequence()
1252 self.test_sequence.add_log_lines(
1253 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1254 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c1_contents"} },
1255 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1256 {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"g_c2_contents"} }],
1259 # Run the packet stream.
1260 context = self.expect_gdbremote_sequence()
1261 self.assertIsNotNone(context)
1263 # Check if what we read from inferior memory is what we are expecting.
1264 self.assertIsNotNone(context.get("g_c1_contents"))
1265 self.assertIsNotNone(context.get("g_c2_contents"))
1267 return (context.get("g_c1_contents").decode("hex") == expected_g_c1) and (context.get("g_c2_contents").decode("hex") == expected_g_c2)
1269 def single_step_only_steps_one_instruction(self, use_Hc_packet=True, step_instruction="s"):
1270 """Used by single step test that appears in a few different contexts."""
1271 # Start up the inferior.
1272 procs = self.prep_debug_monitor_and_inferior(
1273 inferior_args=["get-code-address-hex:swap_chars", "get-data-address-hex:g_c1", "get-data-address-hex:g_c2", "sleep:1", "call-function:swap_chars", "sleep:5"])
1276 self.test_sequence.add_log_lines(
1277 [# Start running after initial stop.
1278 "read packet: $c#63",
1279 # Match output line that prints the memory address of the function call entry point.
1280 # Note we require launch-only testing so we can get inferior otuput.
1281 { "type":"output_match", "regex":r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1282 "capture":{ 1:"function_address", 2:"g_c1_address", 3:"g_c2_address"} },
1283 # Now stop the inferior.
1284 "read packet: {}".format(chr(3)),
1285 # And wait for the stop notification.
1286 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} }],
1289 # Run the packet stream.
1290 context = self.expect_gdbremote_sequence()
1291 self.assertIsNotNone(context)
1293 # Grab the main thread id.
1294 self.assertIsNotNone(context.get("stop_thread_id"))
1295 main_thread_id = int(context.get("stop_thread_id"), 16)
1297 # Grab the function address.
1298 self.assertIsNotNone(context.get("function_address"))
1299 function_address = int(context.get("function_address"), 16)
1301 # Grab the data addresses.
1302 self.assertIsNotNone(context.get("g_c1_address"))
1303 g_c1_address = int(context.get("g_c1_address"), 16)
1305 self.assertIsNotNone(context.get("g_c2_address"))
1306 g_c2_address = int(context.get("g_c2_address"), 16)
1308 # Set a breakpoint at the given address.
1309 if self.getArchitecture() == "arm":
1310 # TODO: Handle case when setting breakpoint in thumb code
1314 self.reset_test_sequence()
1315 self.add_set_breakpoint_packets(function_address, do_continue=True, breakpoint_kind=BREAKPOINT_KIND)
1316 context = self.expect_gdbremote_sequence()
1317 self.assertIsNotNone(context)
1319 # Remove the breakpoint.
1320 self.reset_test_sequence()
1321 self.add_remove_breakpoint_packets(function_address, breakpoint_kind=BREAKPOINT_KIND)
1322 context = self.expect_gdbremote_sequence()
1323 self.assertIsNotNone(context)
1325 # Verify g_c1 and g_c2 match expected initial state.
1327 args["g_c1_address"] = g_c1_address
1328 args["g_c2_address"] = g_c2_address
1329 args["expected_g_c1"] = "0"
1330 args["expected_g_c2"] = "1"
1332 self.assertTrue(self.g_c1_c2_contents_are(args))
1334 # Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code.
1335 args["expected_g_c1"] = "1"
1336 args["expected_g_c2"] = "1"
1337 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=25, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1338 self.assertTrue(state_reached)
1340 # Verify we hit the next state.
1341 args["expected_g_c1"] = "1"
1342 args["expected_g_c2"] = "0"
1343 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1344 self.assertTrue(state_reached)
1345 expected_step_count = 1
1346 arch = self.getArchitecture()
1348 #MIPS required "3" (ADDIU, SB, LD) machine instructions for updation of variable value
1349 if re.match("mips",arch):
1350 expected_step_count = 3
1351 #S390X requires "2" (LARL, MVI) machine instructions for updation of variable value
1352 if re.match("s390x",arch):
1353 expected_step_count = 2
1354 self.assertEqual(step_count, expected_step_count)
1356 # Verify we hit the next state.
1357 args["expected_g_c1"] = "0"
1358 args["expected_g_c2"] = "0"
1359 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1360 self.assertTrue(state_reached)
1361 self.assertEqual(step_count, expected_step_count)
1363 # Verify we hit the next state.
1364 args["expected_g_c1"] = "0"
1365 args["expected_g_c2"] = "1"
1366 (state_reached, step_count) = self.count_single_steps_until_true(main_thread_id, self.g_c1_c2_contents_are, args, max_step_count=5, use_Hc_packet=use_Hc_packet, step_instruction=step_instruction)
1367 self.assertTrue(state_reached)
1368 self.assertEqual(step_count, expected_step_count)
1370 def maybe_strict_output_regex(self, regex):
1371 return '.*'+regex+'.*' if lldbplatformutil.hasChattyStderr(self) else '^'+regex+'$'