]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/python_api/event/TestEvents.py
Vendor import of lldb release_39 branch r276489:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / python_api / event / TestEvents.py
1 """
2 Test lldb Python event APIs.
3 """
4
5 from __future__ import print_function
6
7
8
9 import os, time
10 import re
11 import lldb
12 from lldbsuite.test.decorators import *
13 from lldbsuite.test.lldbtest import *
14 from lldbsuite.test import lldbutil
15
16 @skipIfLinux   # llvm.org/pr25924, sometimes generating SIGSEGV
17 class EventAPITestCase(TestBase):
18
19     mydir = TestBase.compute_mydir(__file__)
20
21     def setUp(self):
22         # Call super's setUp().
23         TestBase.setUp(self)
24         # Find the line number to of function 'c'.
25         self.line = line_number('main.c', '// Find the line number of function "c" here.')
26
27     @add_test_categories(['pyapi'])
28     @expectedFailureAll(oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
29     def test_listen_for_and_print_event(self):
30         """Exercise SBEvent API."""
31         self.build()
32         exe = os.path.join(os.getcwd(), "a.out")
33
34         self.dbg.SetAsync(True)
35
36         # Create a target by the debugger.
37         target = self.dbg.CreateTarget(exe)
38         self.assertTrue(target, VALID_TARGET)
39
40         # Now create a breakpoint on main.c by name 'c'.
41         breakpoint = target.BreakpointCreateByName('c', 'a.out')
42
43         listener = lldb.SBListener("my listener")
44
45         # Now launch the process, and do not stop at the entry point.
46         error = lldb.SBError()
47         process = target.Launch (listener, 
48                                  None,      # argv
49                                  None,      # envp
50                                  None,      # stdin_path
51                                  None,      # stdout_path
52                                  None,      # stderr_path
53                                  None,      # working directory
54                                  0,         # launch flags
55                                  False,     # Stop at entry
56                                  error)     # error
57
58         self.assertTrue(process.GetState() == lldb.eStateStopped, PROCESS_STOPPED)
59
60         # Create an empty event object.
61         event = lldb.SBEvent()
62
63         traceOn = self.TraceOn()
64         if traceOn:
65             lldbutil.print_stacktraces(process)
66
67         # Create MyListeningThread class to wait for any kind of event.
68         import threading
69         class MyListeningThread(threading.Thread):
70             def run(self):
71                 count = 0
72                 # Let's only try at most 4 times to retrieve any kind of event.
73                 # After that, the thread exits.
74                 while not count > 3:
75                     if traceOn:
76                         print("Try wait for event...")
77                     if listener.WaitForEvent(5, event):
78                         if traceOn:
79                             desc = lldbutil.get_description(event)
80                             print("Event description:", desc)
81                             print("Event data flavor:", event.GetDataFlavor())
82                             print("Process state:", lldbutil.state_type_to_str(process.GetState()))
83                             print()
84                     else:
85                         if traceOn:
86                             print("timeout occurred waiting for event...")
87                     count = count + 1
88                 listener.Clear()
89                 return
90
91         # Let's start the listening thread to retrieve the events.
92         my_thread = MyListeningThread()
93         my_thread.start()
94
95         # Use Python API to continue the process.  The listening thread should be
96         # able to receive the state changed events.
97         process.Continue()
98
99         # Use Python API to kill the process.  The listening thread should be
100         # able to receive the state changed event, too.
101         process.Kill()
102
103         # Wait until the 'MyListeningThread' terminates.
104         my_thread.join()
105
106         # Shouldn't we be testing against some kind of expectation here?
107
108     @add_test_categories(['pyapi'])
109     @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
110     @expectedFlakeyOS(oslist=["windows"])
111     def test_wait_for_event(self):
112         """Exercise SBListener.WaitForEvent() API."""
113         self.build()
114         exe = os.path.join(os.getcwd(), "a.out")
115
116         self.dbg.SetAsync(True)
117
118         # Create a target by the debugger.
119         target = self.dbg.CreateTarget(exe)
120         self.assertTrue(target, VALID_TARGET)
121
122         # Now create a breakpoint on main.c by name 'c'.
123         breakpoint = target.BreakpointCreateByName('c', 'a.out')
124         #print("breakpoint:", breakpoint)
125         self.assertTrue(breakpoint and
126                         breakpoint.GetNumLocations() == 1,
127                         VALID_BREAKPOINT)
128
129         # Get the debugger listener.
130         listener = self.dbg.GetListener()
131
132         # Now launch the process, and do not stop at entry point.
133         error = lldb.SBError()
134         process = target.Launch (listener, 
135                                  None,      # argv
136                                  None,      # envp
137                                  None,      # stdin_path
138                                  None,      # stdout_path
139                                  None,      # stderr_path
140                                  None,      # working directory
141                                  0,         # launch flags
142                                  False,     # Stop at entry
143                                  error)     # error
144         self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
145
146         # Create an empty event object.
147         event = lldb.SBEvent()
148         self.assertFalse(event, "Event should not be valid initially")
149
150         # Create MyListeningThread to wait for any kind of event.
151         import threading
152         class MyListeningThread(threading.Thread):
153             def run(self):
154                 count = 0
155                 # Let's only try at most 3 times to retrieve any kind of event.
156                 while not count > 3:
157                     if listener.WaitForEvent(5, event):
158                         #print("Got a valid event:", event)
159                         #print("Event data flavor:", event.GetDataFlavor())
160                         #print("Event type:", lldbutil.state_type_to_str(event.GetType()))
161                         listener.Clear()
162                         return
163                     count = count + 1
164                     print("Timeout: listener.WaitForEvent")
165                 listener.Clear()
166                 return
167
168         # Use Python API to kill the process.  The listening thread should be
169         # able to receive a state changed event.
170         process.Kill()
171
172         # Let's start the listening thread to retrieve the event.
173         my_thread = MyListeningThread()
174         my_thread.start()
175
176         # Wait until the 'MyListeningThread' terminates.
177         my_thread.join()
178
179         self.assertTrue(event,
180                         "My listening thread successfully received an event")
181
182     @skipIfFreeBSD # llvm.org/pr21325
183     @add_test_categories(['pyapi'])
184     @expectedFailureAll(oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
185     @expectedFailureAll(oslist=["windows"], bugnumber="llvm.org/pr24778")
186     def test_add_listener_to_broadcaster(self):
187         """Exercise some SBBroadcaster APIs."""
188         self.build()
189         exe = os.path.join(os.getcwd(), "a.out")
190
191         self.dbg.SetAsync(True)
192
193         # Create a target by the debugger.
194         target = self.dbg.CreateTarget(exe)
195         self.assertTrue(target, VALID_TARGET)
196
197         # Now create a breakpoint on main.c by name 'c'.
198         breakpoint = target.BreakpointCreateByName('c', 'a.out')
199         #print("breakpoint:", breakpoint)
200         self.assertTrue(breakpoint and
201                         breakpoint.GetNumLocations() == 1,
202                         VALID_BREAKPOINT)
203
204         listener = lldb.SBListener("my listener")
205
206         # Now launch the process, and do not stop at the entry point.
207         error = lldb.SBError()
208         process = target.Launch (listener, 
209                                  None,      # argv
210                                  None,      # envp
211                                  None,      # stdin_path
212                                  None,      # stdout_path
213                                  None,      # stderr_path
214                                  None,      # working directory
215                                  0,         # launch flags
216                                  False,     # Stop at entry
217                                  error)     # error
218
219         # Create an empty event object.
220         event = lldb.SBEvent()
221         self.assertFalse(event, "Event should not be valid initially")
222
223
224         # The finite state machine for our custom listening thread, with an
225         # initial state of None, which means no event has been received.
226         # It changes to 'connected' after 'connected' event is received (for remote platforms)
227         # It changes to 'running' after 'running' event is received (should happen only if the
228         # currentstate is either 'None' or 'connected')
229         # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
230         # current state is 'running'.)
231         self.state = None
232
233         # Create MyListeningThread to wait for state changed events.
234         # By design, a "running" event is expected following by a "stopped" event.
235         import threading
236         class MyListeningThread(threading.Thread):
237             def run(self):
238                 #print("Running MyListeningThread:", self)
239
240                 # Regular expression pattern for the event description.
241                 pattern = re.compile("data = {.*, state = (.*)}$")
242
243                 # Let's only try at most 6 times to retrieve our events.
244                 count = 0
245                 while True:
246                     if listener.WaitForEvent(5, event):
247                         desc = lldbutil.get_description(event)
248                         #print("Event description:", desc)
249                         match = pattern.search(desc)
250                         if not match:
251                             break;
252                         if match.group(1) == 'connected':
253                             # When debugging remote targets with lldb-server, we
254                             # first get the 'connected' event.
255                             self.context.assertTrue(self.context.state == None)
256                             self.context.state = 'connected'
257                             continue
258                         elif match.group(1) == 'running':
259                             self.context.assertTrue(self.context.state == None or self.context.state == 'connected')
260                             self.context.state = 'running'
261                             continue
262                         elif match.group(1) == 'stopped':
263                             self.context.assertTrue(self.context.state == 'running')
264                             # Whoopee, both events have been received!
265                             self.context.state = 'stopped'
266                             break
267                         else:
268                             break
269                     print("Timeout: listener.WaitForEvent")
270                     count = count + 1
271                     if count > 6:
272                         break
273                 listener.Clear()
274                 return
275
276         # Use Python API to continue the process.  The listening thread should be
277         # able to receive the state changed events.
278         process.Continue()
279
280         # Start the listening thread to receive the "running" followed by the
281         # "stopped" events.
282         my_thread = MyListeningThread()
283         # Supply the enclosing context so that our listening thread can access
284         # the 'state' variable.
285         my_thread.context = self
286         my_thread.start()
287
288         # Wait until the 'MyListeningThread' terminates.
289         my_thread.join()
290
291         # The final judgement. :-)
292         self.assertTrue(self.state == 'stopped',
293                         "Both expected state changed events received")