]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemote_qThreadStopInfo.py
Vendor import of lldb trunk r321017:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / TestGdbRemote_qThreadStopInfo.py
1 from __future__ import print_function
2
3
4 import sys
5
6 import unittest2
7 import gdbremote_testcase
8 from lldbsuite.test.decorators import *
9 from lldbsuite.test.lldbtest import *
10 from lldbsuite.test import lldbutil
11
12
13 class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
14
15     mydir = TestBase.compute_mydir(__file__)
16     THREAD_COUNT = 5
17
18     @skipIfDarwinEmbedded # <rdar://problem/34539270> lldb-server tests not updated to work on ios etc yet
19     @skipIfDarwinEmbedded # <rdar://problem/27005337> 
20     def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
21         # Set up the inferior args.
22         inferior_args = []
23         for i in range(thread_count - 1):
24             inferior_args.append("thread:new")
25         inferior_args.append("sleep:10")
26         procs = self.prep_debug_monitor_and_inferior(
27             inferior_args=inferior_args)
28
29         # Assumes test_sequence has anything added needed to setup the initial state.
30         # (Like optionally enabling QThreadsInStopReply.)
31         self.test_sequence.add_log_lines([
32             "read packet: $c#63"
33         ], True)
34         context = self.expect_gdbremote_sequence()
35         self.assertIsNotNone(context)
36
37         # Give threads time to start up, then break.
38         time.sleep(1)
39         self.reset_test_sequence()
40         self.test_sequence.add_log_lines(
41             [
42                 "read packet: {}".format(
43                     chr(3)),
44                 {
45                     "direction": "send",
46                     "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
47                     "capture": {
48                         1: "stop_result",
49                         2: "key_vals_text"}},
50             ],
51             True)
52         context = self.expect_gdbremote_sequence()
53         self.assertIsNotNone(context)
54
55         # Wait until all threads have started.
56         threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
57         self.assertIsNotNone(threads)
58         self.assertEqual(len(threads), thread_count)
59
60         # Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
61         stop_replies = {}
62         thread_dicts = {}
63         for thread in threads:
64             # Run the qThreadStopInfo command.
65             self.reset_test_sequence()
66             self.test_sequence.add_log_lines(
67                 [
68                     "read packet: $qThreadStopInfo{:x}#00".format(thread),
69                     {
70                         "direction": "send",
71                         "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
72                         "capture": {
73                             1: "stop_result",
74                             2: "key_vals_text"}},
75                 ],
76                 True)
77             context = self.expect_gdbremote_sequence()
78             self.assertIsNotNone(context)
79
80             # Parse stop reply contents.
81             key_vals_text = context.get("key_vals_text")
82             self.assertIsNotNone(key_vals_text)
83             kv_dict = self.parse_key_val_dict(key_vals_text)
84             self.assertIsNotNone(kv_dict)
85
86             # Verify there is a thread and that it matches the expected thread
87             # id.
88             kv_thread = kv_dict.get("thread")
89             self.assertIsNotNone(kv_thread)
90             kv_thread_id = int(kv_thread, 16)
91             self.assertEqual(kv_thread_id, thread)
92
93             # Grab the stop id reported.
94             stop_result_text = context.get("stop_result")
95             self.assertIsNotNone(stop_result_text)
96             stop_replies[kv_thread_id] = int(stop_result_text, 16)
97
98             # Hang on to the key-val dictionary for the thread.
99             thread_dicts[kv_thread_id] = kv_dict
100
101         return (stop_replies, thread_dicts)
102
103     def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
104         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
105         self.assertEqual(len(stop_replies), thread_count)
106
107     @debugserver_test
108     def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
109         self.init_debugserver_test()
110         self.build()
111         self.set_inferior_startup_launch()
112         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
113
114     @llgs_test
115     def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
116         self.init_llgs_test()
117         self.build()
118         self.set_inferior_startup_launch()
119         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
120
121     def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
122             self, thread_count):
123         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
124         self.assertIsNotNone(stop_replies)
125
126         no_stop_reason_count = sum(
127             1 for stop_reason in list(
128                 stop_replies.values()) if stop_reason == 0)
129         with_stop_reason_count = sum(
130             1 for stop_reason in list(
131                 stop_replies.values()) if stop_reason != 0)
132
133         # All but one thread should report no stop reason.
134         self.assertEqual(no_stop_reason_count, thread_count - 1)
135
136         # Only one thread should should indicate a stop reason.
137         self.assertEqual(with_stop_reason_count, 1)
138
139     @debugserver_test
140     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(
141             self):
142         self.init_debugserver_test()
143         self.build()
144         self.set_inferior_startup_launch()
145         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
146             self.THREAD_COUNT)
147
148     @llgs_test
149     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(
150             self):
151         self.init_llgs_test()
152         self.build()
153         self.set_inferior_startup_launch()
154         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
155             self.THREAD_COUNT)
156
157     def qThreadStopInfo_has_valid_thread_names(
158             self, thread_count, expected_thread_name):
159         (_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
160         self.assertIsNotNone(thread_dicts)
161
162         for thread_dict in list(thread_dicts.values()):
163             name = thread_dict.get("name")
164             self.assertIsNotNone(name)
165             self.assertEqual(name, expected_thread_name)
166
167     @unittest2.skip("MacOSX doesn't have a default thread name")
168     @debugserver_test
169     def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
170         self.init_debugserver_test()
171         self.build()
172         self.set_inferior_startup_launch()
173         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
174
175     # test requires OS with set, equal thread names by default.
176     @skipUnlessPlatform(["linux"])
177     @llgs_test
178     def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
179         self.init_llgs_test()
180         self.build()
181         self.set_inferior_startup_launch()
182         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")