]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - packages/Python/lldbsuite/test/tools/lldb-server/socket_packet_pump.py
Vendor import of lldb trunk r290819:
[FreeBSD/FreeBSD.git] / packages / Python / lldbsuite / test / tools / lldb-server / socket_packet_pump.py
1
2 from __future__ import print_function
3
4
5 import re
6 import select
7 import threading
8 import traceback
9 import codecs
10
11 from six.moves import queue
12
13
14 def _handle_output_packet_string(packet_contents):
15     if (not packet_contents) or (len(packet_contents) < 1):
16         return None
17     elif packet_contents[0] != "O":
18         return None
19     elif packet_contents == "OK":
20         return None
21     else:
22         return packet_contents[1:].decode("hex")
23
24
25 def _dump_queue(the_queue):
26     while not the_queue.empty():
27         print(codecs.encode(the_queue.get(True), "string_escape"))
28         print("\n")
29
30
31 class PumpQueues(object):
32
33     def __init__(self):
34         self._output_queue = queue.Queue()
35         self._packet_queue = queue.Queue()
36
37     def output_queue(self):
38         return self._output_queue
39
40     def packet_queue(self):
41         return self._packet_queue
42
43     def verify_queues_empty(self):
44         # Warn if there is any content left in any of the queues.
45         # That would represent unmatched packets.
46         if not self.output_queue().empty():
47             print("warning: output queue entries still exist:")
48             _dump_queue(self.output_queue())
49             print("from here:")
50             traceback.print_stack()
51
52         if not self.packet_queue().empty():
53             print("warning: packet queue entries still exist:")
54             _dump_queue(self.packet_queue())
55             print("from here:")
56             traceback.print_stack()
57
58
59 class SocketPacketPump(object):
60     """A threaded packet reader that partitions packets into two streams.
61
62     All incoming $O packet content is accumulated with the current accumulation
63     state put into the OutputQueue.
64
65     All other incoming packets are placed in the packet queue.
66
67     A select thread can be started and stopped, and runs to place packet
68     content into the two queues.
69     """
70
71     _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
72
73     def __init__(self, pump_socket, pump_queues, logger=None):
74         if not pump_socket:
75             raise Exception("pump_socket cannot be None")
76
77         self._thread = None
78         self._stop_thread = False
79         self._socket = pump_socket
80         self._logger = logger
81         self._receive_buffer = ""
82         self._accumulated_output = ""
83         self._pump_queues = pump_queues
84
85     def __enter__(self):
86         """Support the python 'with' statement.
87
88         Start the pump thread."""
89         self.start_pump_thread()
90         return self
91
92     def __exit__(self, exit_type, value, the_traceback):
93         """Support the python 'with' statement.
94
95         Shut down the pump thread."""
96         self.stop_pump_thread()
97
98     def start_pump_thread(self):
99         if self._thread:
100             raise Exception("pump thread is already running")
101         self._stop_thread = False
102         self._thread = threading.Thread(target=self._run_method)
103         self._thread.start()
104
105     def stop_pump_thread(self):
106         self._stop_thread = True
107         if self._thread:
108             self._thread.join()
109
110     def _process_new_bytes(self, new_bytes):
111         if not new_bytes:
112             return
113         if len(new_bytes) < 1:
114             return
115
116         # Add new bytes to our accumulated unprocessed packet bytes.
117         self._receive_buffer += new_bytes
118
119         # Parse fully-formed packets into individual packets.
120         has_more = len(self._receive_buffer) > 0
121         while has_more:
122             if len(self._receive_buffer) <= 0:
123                 has_more = False
124             # handle '+' ack
125             elif self._receive_buffer[0] == "+":
126                 self._pump_queues.packet_queue().put("+")
127                 self._receive_buffer = self._receive_buffer[1:]
128                 if self._logger:
129                     self._logger.debug(
130                         "parsed packet from stub: +\n" +
131                         "new receive_buffer: {}".format(
132                             self._receive_buffer))
133             else:
134                 packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
135                     self._receive_buffer)
136                 if packet_match:
137                     # Our receive buffer matches a packet at the
138                     # start of the receive buffer.
139                     new_output_content = _handle_output_packet_string(
140                         packet_match.group(1))
141                     if new_output_content:
142                         # This was an $O packet with new content.
143                         self._accumulated_output += new_output_content
144                         self._pump_queues.output_queue().put(self._accumulated_output)
145                     else:
146                         # Any packet other than $O.
147                         self._pump_queues.packet_queue().put(packet_match.group(0))
148
149                     # Remove the parsed packet from the receive
150                     # buffer.
151                     self._receive_buffer = self._receive_buffer[
152                         len(packet_match.group(0)):]
153                     if self._logger:
154                         self._logger.debug(
155                             "parsed packet from stub: " +
156                             packet_match.group(0))
157                         self._logger.debug(
158                             "new receive_buffer: " +
159                             self._receive_buffer)
160                 else:
161                     # We don't have enough in the receive bufferto make a full
162                     # packet. Stop trying until we read more.
163                     has_more = False
164
165     def _run_method(self):
166         self._receive_buffer = ""
167         self._accumulated_output = ""
168
169         if self._logger:
170             self._logger.info("socket pump starting")
171
172         # Keep looping around until we're asked to stop the thread.
173         while not self._stop_thread:
174             can_read, _, _ = select.select([self._socket], [], [], 0)
175             if can_read and self._socket in can_read:
176                 try:
177                     new_bytes = self._socket.recv(4096)
178                     if self._logger and new_bytes and len(new_bytes) > 0:
179                         self._logger.debug(
180                             "pump received bytes: {}".format(new_bytes))
181                 except:
182                     # Likely a closed socket.  Done with the pump thread.
183                     if self._logger:
184                         self._logger.debug(
185                             "socket read failed, stopping pump read thread\n" +
186                             traceback.format_exc(3))
187                     break
188                 self._process_new_bytes(new_bytes)
189
190         if self._logger:
191             self._logger.info("socket pump exiting")
192
193     def get_accumulated_output(self):
194         return self._accumulated_output
195
196     def get_receive_buffer(self):
197         return self._receive_buffer