2 Test lldb Python event APIs.
5 from __future__ import print_function
12 from lldbsuite.test.decorators import *
13 from lldbsuite.test.lldbtest import *
14 from lldbsuite.test import lldbutil
16 @skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV
17 class EventAPITestCase(TestBase):
19 mydir = TestBase.compute_mydir(__file__)
22 # Call super's setUp().
24 # Find the line number to of function 'c'.
25 self.line = line_number('main.c', '// Find the line number of function "c" here.')
27 @add_test_categories(['pyapi'])
28 @expectedFailureAll(oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases")
29 def test_listen_for_and_print_event(self):
30 """Exercise SBEvent API."""
32 exe = os.path.join(os.getcwd(), "a.out")
34 self.dbg.SetAsync(True)
36 # Create a target by the debugger.
37 target = self.dbg.CreateTarget(exe)
38 self.assertTrue(target, VALID_TARGET)
40 # Now create a breakpoint on main.c by name 'c'.
41 breakpoint = target.BreakpointCreateByName('c', 'a.out')
43 listener = lldb.SBListener("my listener")
45 # Now launch the process, and do not stop at the entry point.
46 error = lldb.SBError()
47 process = target.Launch (listener,
53 None, # working directory
55 False, # Stop at entry
58 self.assertTrue(process.GetState() == lldb.eStateStopped, PROCESS_STOPPED)
60 # Create an empty event object.
61 event = lldb.SBEvent()
63 traceOn = self.TraceOn()
65 lldbutil.print_stacktraces(process)
67 # Create MyListeningThread class to wait for any kind of event.
69 class MyListeningThread(threading.Thread):
72 # Let's only try at most 4 times to retrieve any kind of event.
73 # After that, the thread exits.
76 print("Try wait for event...")
77 if listener.WaitForEvent(5, event):
79 desc = lldbutil.get_description(event)
80 print("Event description:", desc)
81 print("Event data flavor:", event.GetDataFlavor())
82 print("Process state:", lldbutil.state_type_to_str(process.GetState()))
86 print("timeout occurred waiting for event...")
91 # Let's start the listening thread to retrieve the events.
92 my_thread = MyListeningThread()
95 # Use Python API to continue the process. The listening thread should be
96 # able to receive the state changed events.
99 # Use Python API to kill the process. The listening thread should be
100 # able to receive the state changed event, too.
103 # Wait until the 'MyListeningThread' terminates.
106 # Shouldn't we be testing against some kind of expectation here?
108 @add_test_categories(['pyapi'])
109 @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases
110 @expectedFlakeyOS(oslist=["windows"])
111 def test_wait_for_event(self):
112 """Exercise SBListener.WaitForEvent() API."""
114 exe = os.path.join(os.getcwd(), "a.out")
116 self.dbg.SetAsync(True)
118 # Create a target by the debugger.
119 target = self.dbg.CreateTarget(exe)
120 self.assertTrue(target, VALID_TARGET)
122 # Now create a breakpoint on main.c by name 'c'.
123 breakpoint = target.BreakpointCreateByName('c', 'a.out')
124 #print("breakpoint:", breakpoint)
125 self.assertTrue(breakpoint and
126 breakpoint.GetNumLocations() == 1,
129 # Get the debugger listener.
130 listener = self.dbg.GetListener()
132 # Now launch the process, and do not stop at entry point.
133 error = lldb.SBError()
134 process = target.Launch (listener,
140 None, # working directory
142 False, # Stop at entry
144 self.assertTrue(error.Success() and process, PROCESS_IS_VALID)
146 # Create an empty event object.
147 event = lldb.SBEvent()
148 self.assertFalse(event, "Event should not be valid initially")
150 # Create MyListeningThread to wait for any kind of event.
152 class MyListeningThread(threading.Thread):
155 # Let's only try at most 3 times to retrieve any kind of event.
157 if listener.WaitForEvent(5, event):
158 #print("Got a valid event:", event)
159 #print("Event data flavor:", event.GetDataFlavor())
160 #print("Event type:", lldbutil.state_type_to_str(event.GetType()))
164 print("Timeout: listener.WaitForEvent")
168 # Use Python API to kill the process. The listening thread should be
169 # able to receive a state changed event.
172 # Let's start the listening thread to retrieve the event.
173 my_thread = MyListeningThread()
176 # Wait until the 'MyListeningThread' terminates.
179 self.assertTrue(event,
180 "My listening thread successfully received an event")
182 @skipIfFreeBSD # llvm.org/pr21325
183 @add_test_categories(['pyapi'])
184 @expectedFailureAll(oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases")
185 @expectedFailureAll(oslist=["windows"], bugnumber="llvm.org/pr24778")
186 def test_add_listener_to_broadcaster(self):
187 """Exercise some SBBroadcaster APIs."""
189 exe = os.path.join(os.getcwd(), "a.out")
191 self.dbg.SetAsync(True)
193 # Create a target by the debugger.
194 target = self.dbg.CreateTarget(exe)
195 self.assertTrue(target, VALID_TARGET)
197 # Now create a breakpoint on main.c by name 'c'.
198 breakpoint = target.BreakpointCreateByName('c', 'a.out')
199 #print("breakpoint:", breakpoint)
200 self.assertTrue(breakpoint and
201 breakpoint.GetNumLocations() == 1,
204 listener = lldb.SBListener("my listener")
206 # Now launch the process, and do not stop at the entry point.
207 error = lldb.SBError()
208 process = target.Launch (listener,
214 None, # working directory
216 False, # Stop at entry
219 # Create an empty event object.
220 event = lldb.SBEvent()
221 self.assertFalse(event, "Event should not be valid initially")
224 # The finite state machine for our custom listening thread, with an
225 # initial state of None, which means no event has been received.
226 # It changes to 'connected' after 'connected' event is received (for remote platforms)
227 # It changes to 'running' after 'running' event is received (should happen only if the
228 # currentstate is either 'None' or 'connected')
229 # It changes to 'stopped' if a 'stopped' event is received (should happen only if the
230 # current state is 'running'.)
233 # Create MyListeningThread to wait for state changed events.
234 # By design, a "running" event is expected following by a "stopped" event.
236 class MyListeningThread(threading.Thread):
238 #print("Running MyListeningThread:", self)
240 # Regular expression pattern for the event description.
241 pattern = re.compile("data = {.*, state = (.*)}$")
243 # Let's only try at most 6 times to retrieve our events.
246 if listener.WaitForEvent(5, event):
247 desc = lldbutil.get_description(event)
248 #print("Event description:", desc)
249 match = pattern.search(desc)
252 if match.group(1) == 'connected':
253 # When debugging remote targets with lldb-server, we
254 # first get the 'connected' event.
255 self.context.assertTrue(self.context.state == None)
256 self.context.state = 'connected'
258 elif match.group(1) == 'running':
259 self.context.assertTrue(self.context.state == None or self.context.state == 'connected')
260 self.context.state = 'running'
262 elif match.group(1) == 'stopped':
263 self.context.assertTrue(self.context.state == 'running')
264 # Whoopee, both events have been received!
265 self.context.state = 'stopped'
269 print("Timeout: listener.WaitForEvent")
276 # Use Python API to continue the process. The listening thread should be
277 # able to receive the state changed events.
280 # Start the listening thread to receive the "running" followed by the
282 my_thread = MyListeningThread()
283 # Supply the enclosing context so that our listening thread can access
284 # the 'state' variable.
285 my_thread.context = self
288 # Wait until the 'MyListeningThread' terminates.
291 # The final judgement. :-)
292 self.assertTrue(self.state == 'stopped',
293 "Both expected state changed events received")