]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/TestGdbRemote_qThreadStopInfo.py
Vendor import of lldb trunk r290819:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / TestGdbRemote_qThreadStopInfo.py
1 from __future__ import print_function
2
3
4 import sys
5
6 import unittest2
7 import gdbremote_testcase
8 from lldbsuite.test.decorators import *
9 from lldbsuite.test.lldbtest import *
10 from lldbsuite.test import lldbutil
11
12
13 class TestGdbRemote_qThreadStopInfo(gdbremote_testcase.GdbRemoteTestCaseBase):
14
15     mydir = TestBase.compute_mydir(__file__)
16     THREAD_COUNT = 5
17
18     def gather_stop_replies_via_qThreadStopInfo(self, thread_count):
19         # Set up the inferior args.
20         inferior_args = []
21         for i in range(thread_count - 1):
22             inferior_args.append("thread:new")
23         inferior_args.append("sleep:10")
24         procs = self.prep_debug_monitor_and_inferior(
25             inferior_args=inferior_args)
26
27         # Assumes test_sequence has anything added needed to setup the initial state.
28         # (Like optionally enabling QThreadsInStopReply.)
29         self.test_sequence.add_log_lines([
30             "read packet: $c#63"
31         ], True)
32         context = self.expect_gdbremote_sequence()
33         self.assertIsNotNone(context)
34
35         # Give threads time to start up, then break.
36         time.sleep(1)
37         self.reset_test_sequence()
38         self.test_sequence.add_log_lines(
39             [
40                 "read packet: {}".format(
41                     chr(3)),
42                 {
43                     "direction": "send",
44                     "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
45                     "capture": {
46                         1: "stop_result",
47                         2: "key_vals_text"}},
48             ],
49             True)
50         context = self.expect_gdbremote_sequence()
51         self.assertIsNotNone(context)
52
53         # Wait until all threads have started.
54         threads = self.wait_for_thread_count(thread_count, timeout_seconds=3)
55         self.assertIsNotNone(threads)
56         self.assertEqual(len(threads), thread_count)
57
58         # Grab stop reply for each thread via qThreadStopInfo{tid:hex}.
59         stop_replies = {}
60         thread_dicts = {}
61         for thread in threads:
62             # Run the qThreadStopInfo command.
63             self.reset_test_sequence()
64             self.test_sequence.add_log_lines(
65                 [
66                     "read packet: $qThreadStopInfo{:x}#00".format(thread),
67                     {
68                         "direction": "send",
69                         "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$",
70                         "capture": {
71                             1: "stop_result",
72                             2: "key_vals_text"}},
73                 ],
74                 True)
75             context = self.expect_gdbremote_sequence()
76             self.assertIsNotNone(context)
77
78             # Parse stop reply contents.
79             key_vals_text = context.get("key_vals_text")
80             self.assertIsNotNone(key_vals_text)
81             kv_dict = self.parse_key_val_dict(key_vals_text)
82             self.assertIsNotNone(kv_dict)
83
84             # Verify there is a thread and that it matches the expected thread
85             # id.
86             kv_thread = kv_dict.get("thread")
87             self.assertIsNotNone(kv_thread)
88             kv_thread_id = int(kv_thread, 16)
89             self.assertEqual(kv_thread_id, thread)
90
91             # Grab the stop id reported.
92             stop_result_text = context.get("stop_result")
93             self.assertIsNotNone(stop_result_text)
94             stop_replies[kv_thread_id] = int(stop_result_text, 16)
95
96             # Hang on to the key-val dictionary for the thread.
97             thread_dicts[kv_thread_id] = kv_dict
98
99         return (stop_replies, thread_dicts)
100
101     def qThreadStopInfo_works_for_multiple_threads(self, thread_count):
102         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
103         self.assertEqual(len(stop_replies), thread_count)
104
105     @debugserver_test
106     def test_qThreadStopInfo_works_for_multiple_threads_debugserver(self):
107         self.init_debugserver_test()
108         self.build()
109         self.set_inferior_startup_launch()
110         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
111
112     @llgs_test
113     def test_qThreadStopInfo_works_for_multiple_threads_llgs(self):
114         self.init_llgs_test()
115         self.build()
116         self.set_inferior_startup_launch()
117         self.qThreadStopInfo_works_for_multiple_threads(self.THREAD_COUNT)
118
119     def qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
120             self, thread_count):
121         (stop_replies, _) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
122         self.assertIsNotNone(stop_replies)
123
124         no_stop_reason_count = sum(
125             1 for stop_reason in list(
126                 stop_replies.values()) if stop_reason == 0)
127         with_stop_reason_count = sum(
128             1 for stop_reason in list(
129                 stop_replies.values()) if stop_reason != 0)
130
131         # All but one thread should report no stop reason.
132         self.assertEqual(no_stop_reason_count, thread_count - 1)
133
134         # Only one thread should should indicate a stop reason.
135         self.assertEqual(with_stop_reason_count, 1)
136
137     @debugserver_test
138     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_debugserver(
139             self):
140         self.init_debugserver_test()
141         self.build()
142         self.set_inferior_startup_launch()
143         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
144             self.THREAD_COUNT)
145
146     @llgs_test
147     def test_qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt_llgs(
148             self):
149         self.init_llgs_test()
150         self.build()
151         self.set_inferior_startup_launch()
152         self.qThreadStopInfo_only_reports_one_thread_stop_reason_during_interrupt(
153             self.THREAD_COUNT)
154
155     def qThreadStopInfo_has_valid_thread_names(
156             self, thread_count, expected_thread_name):
157         (_, thread_dicts) = self.gather_stop_replies_via_qThreadStopInfo(thread_count)
158         self.assertIsNotNone(thread_dicts)
159
160         for thread_dict in list(thread_dicts.values()):
161             name = thread_dict.get("name")
162             self.assertIsNotNone(name)
163             self.assertEqual(name, expected_thread_name)
164
165     @unittest2.skip("MacOSX doesn't have a default thread name")
166     @debugserver_test
167     def test_qThreadStopInfo_has_valid_thread_names_debugserver(self):
168         self.init_debugserver_test()
169         self.build()
170         self.set_inferior_startup_launch()
171         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")
172
173     # test requires OS with set, equal thread names by default.
174     @skipUnlessPlatform(["linux"])
175     @llgs_test
176     def test_qThreadStopInfo_has_valid_thread_names_llgs(self):
177         self.init_llgs_test()
178         self.build()
179         self.set_inferior_startup_launch()
180         self.qThreadStopInfo_has_valid_thread_names(self.THREAD_COUNT, "a.out")