2 from __future__ import print_function
11 from six.moves import queue
14 def _handle_output_packet_string(packet_contents):
15 if (not packet_contents) or (len(packet_contents) < 1):
17 elif packet_contents[0] != "O":
19 elif packet_contents == "OK":
22 return packet_contents[1:].decode("hex")
25 def _dump_queue(the_queue):
26 while not the_queue.empty():
27 print(codecs.encode(the_queue.get(True), "string_escape"))
31 class PumpQueues(object):
34 self._output_queue = queue.Queue()
35 self._packet_queue = queue.Queue()
37 def output_queue(self):
38 return self._output_queue
40 def packet_queue(self):
41 return self._packet_queue
43 def verify_queues_empty(self):
44 # Warn if there is any content left in any of the queues.
45 # That would represent unmatched packets.
46 if not self.output_queue().empty():
47 print("warning: output queue entries still exist:")
48 _dump_queue(self.output_queue())
50 traceback.print_stack()
52 if not self.packet_queue().empty():
53 print("warning: packet queue entries still exist:")
54 _dump_queue(self.packet_queue())
56 traceback.print_stack()
59 class SocketPacketPump(object):
60 """A threaded packet reader that partitions packets into two streams.
62 All incoming $O packet content is accumulated with the current accumulation
63 state put into the OutputQueue.
65 All other incoming packets are placed in the packet queue.
67 A select thread can be started and stopped, and runs to place packet
68 content into the two queues.
71 _GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
73 def __init__(self, pump_socket, pump_queues, logger=None):
75 raise Exception("pump_socket cannot be None")
78 self._stop_thread = False
79 self._socket = pump_socket
81 self._receive_buffer = ""
82 self._accumulated_output = ""
83 self._pump_queues = pump_queues
86 """Support the python 'with' statement.
88 Start the pump thread."""
89 self.start_pump_thread()
92 def __exit__(self, exit_type, value, the_traceback):
93 """Support the python 'with' statement.
95 Shut down the pump thread."""
96 self.stop_pump_thread()
98 def start_pump_thread(self):
100 raise Exception("pump thread is already running")
101 self._stop_thread = False
102 self._thread = threading.Thread(target=self._run_method)
105 def stop_pump_thread(self):
106 self._stop_thread = True
110 def _process_new_bytes(self, new_bytes):
113 if len(new_bytes) < 1:
116 # Add new bytes to our accumulated unprocessed packet bytes.
117 self._receive_buffer += new_bytes
119 # Parse fully-formed packets into individual packets.
120 has_more = len(self._receive_buffer) > 0
122 if len(self._receive_buffer) <= 0:
125 elif self._receive_buffer[0] == "+":
126 self._pump_queues.packet_queue().put("+")
127 self._receive_buffer = self._receive_buffer[1:]
130 "parsed packet from stub: +\n" +
131 "new receive_buffer: {}".format(
132 self._receive_buffer))
134 packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
135 self._receive_buffer)
137 # Our receive buffer matches a packet at the
138 # start of the receive buffer.
139 new_output_content = _handle_output_packet_string(
140 packet_match.group(1))
141 if new_output_content:
142 # This was an $O packet with new content.
143 self._accumulated_output += new_output_content
144 self._pump_queues.output_queue().put(self._accumulated_output)
146 # Any packet other than $O.
147 self._pump_queues.packet_queue().put(packet_match.group(0))
149 # Remove the parsed packet from the receive
151 self._receive_buffer = self._receive_buffer[
152 len(packet_match.group(0)):]
155 "parsed packet from stub: " +
156 packet_match.group(0))
158 "new receive_buffer: " +
159 self._receive_buffer)
161 # We don't have enough in the receive bufferto make a full
162 # packet. Stop trying until we read more.
165 def _run_method(self):
166 self._receive_buffer = ""
167 self._accumulated_output = ""
170 self._logger.info("socket pump starting")
172 # Keep looping around until we're asked to stop the thread.
173 while not self._stop_thread:
174 can_read, _, _ = select.select([self._socket], [], [], 0)
175 if can_read and self._socket in can_read:
177 new_bytes = self._socket.recv(4096)
178 if self._logger and new_bytes and len(new_bytes) > 0:
180 "pump received bytes: {}".format(new_bytes))
182 # Likely a closed socket. Done with the pump thread.
185 "socket read failed, stopping pump read thread\n" +
186 traceback.format_exc(3))
188 self._process_new_bytes(new_bytes)
191 self._logger.info("socket pump exiting")
193 def get_accumulated_output(self):
194 return self._accumulated_output
196 def get_receive_buffer(self):
197 return self._receive_buffer