]> CyberLeo.Net >> Repos - FreeBSD/FreeBSD.git/blob - third_party/Python/module/pexpect-2.4/pxssh.py
Vendor import of lldb trunk r290819:
[FreeBSD/FreeBSD.git] / third_party / Python / module / pexpect-2.4 / pxssh.py
1 """This class extends pexpect.spawn to specialize setting up SSH connections.
2 This adds methods for login, logout, and expecting the shell prompt.
3
4 $Id: pxssh.py 513 2008-02-09 18:26:13Z noah $
5 """
6
7 from pexpect import *
8 import pexpect
9 import time
10
11 __all__ = ['ExceptionPxssh', 'pxssh']
12
13 # Exception classes used by this module.
14
15
16 class ExceptionPxssh(ExceptionPexpect):
17     """Raised for pxssh exceptions.
18     """
19
20
21 class pxssh (spawn):
22
23     """This class extends pexpect.spawn to specialize setting up SSH
24     connections. This adds methods for login, logout, and expecting the shell
25     prompt. It does various tricky things to handle many situations in the SSH
26     login process. For example, if the session is your first login, then pxssh
27     automatically accepts the remote certificate; or if you have public key
28     authentication setup then pxssh won't wait for the password prompt.
29
30     pxssh uses the shell prompt to synchronize output from the remote host. In
31     order to make this more robust it sets the shell prompt to something more
32     unique than just $ or #. This should work on most Borne/Bash or Csh style
33     shells.
34
35     Example that runs a few commands on a remote server and prints the result::
36
37         import pxssh
38         import getpass
39         try:
40             s = pxssh.pxssh()
41             hostname = raw_input('hostname: ')
42             username = raw_input('username: ')
43             password = getpass.getpass('password: ')
44             s.login (hostname, username, password)
45             s.sendline ('uptime')  # run a command
46             s.prompt()             # match the prompt
47             print s.before         # print everything before the prompt.
48             s.sendline ('ls -l')
49             s.prompt()
50             print s.before
51             s.sendline ('df')
52             s.prompt()
53             print s.before
54             s.logout()
55         except pxssh.ExceptionPxssh, e:
56             print "pxssh failed on login."
57             print str(e)
58
59     Note that if you have ssh-agent running while doing development with pxssh
60     then this can lead to a lot of confusion. Many X display managers (xdm,
61     gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
62     dialog box popup asking for a password during development. You should turn
63     off any key agents during testing. The 'force_password' attribute will turn
64     off public key authentication. This will only work if the remote SSH server
65     is configured to allow password logins. Example of using 'force_password'
66     attribute::
67
68             s = pxssh.pxssh()
69             s.force_password = True
70             hostname = raw_input('hostname: ')
71             username = raw_input('username: ')
72             password = getpass.getpass('password: ')
73             s.login (hostname, username, password)
74     """
75
76     def __init__(
77             self,
78             timeout=30,
79             maxread=2000,
80             searchwindowsize=None,
81             logfile=None,
82             cwd=None,
83             env=None):
84         spawn.__init__(
85             self,
86             None,
87             timeout=timeout,
88             maxread=maxread,
89             searchwindowsize=searchwindowsize,
90             logfile=logfile,
91             cwd=cwd,
92             env=env)
93
94         self.name = '<pxssh>'
95
96         # SUBTLE HACK ALERT! Note that the command to set the prompt uses a
97         # slightly different string than the regular expression to match it. This
98         # is because when you set the prompt the command will echo back, but we
99         # don't want to match the echoed command. So if we make the set command
100         # slightly different than the regex we eliminate the problem. To make the
101         # set command different we add a backslash in front of $. The $ doesn't
102         # need to be escaped, but it doesn't hurt and serves to make the set
103         # prompt command different than the regex.
104
105         # used to match the command-line prompt
106         self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
107         self.PROMPT = self.UNIQUE_PROMPT
108
109         # used to set shell command-line prompt to UNIQUE_PROMPT.
110         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
111         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
112         self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
113         # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
114         # displaying a GUI password dialog. I have not figured out how to
115         # disable only SSH_ASKPASS without also disabling X11 forwarding.
116         # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
117         #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
118         self.force_password = False
119         self.auto_prompt_reset = True
120
121     def levenshtein_distance(self, a, b):
122         """This calculates the Levenshtein distance between a and b.
123         """
124
125         n, m = len(a), len(b)
126         if n > m:
127             a, b = b, a
128             n, m = m, n
129         current = range(n + 1)
130         for i in range(1, m + 1):
131             previous, current = current, [i] + [0] * n
132             for j in range(1, n + 1):
133                 add, delete = previous[j] + 1, current[j - 1] + 1
134                 change = previous[j - 1]
135                 if a[j - 1] != b[i - 1]:
136                     change = change + 1
137                 current[j] = min(add, delete, change)
138         return current[n]
139
140     def sync_original_prompt(self):
141         """This attempts to find the prompt. Basically, press enter and record
142         the response; press enter again and record the response; if the two
143         responses are similar then assume we are at the original prompt. This
144         is a slow function. It can take over 10 seconds. """
145
146         # All of these timing pace values are magic.
147         # I came up with these based on what seemed reliable for
148         # connecting to a heavily loaded machine I have.
149         # If latency is worse than these values then this will fail.
150
151         try:
152             # GAS: Clear out the cache before getting the prompt
153             self.read_nonblocking(size=10000, timeout=1)
154         except TIMEOUT:
155             pass
156         time.sleep(0.1)
157         self.sendline()
158         time.sleep(0.5)
159         x = self.read_nonblocking(size=1000, timeout=1)
160         time.sleep(0.1)
161         self.sendline()
162         time.sleep(0.5)
163         a = self.read_nonblocking(size=1000, timeout=1)
164         time.sleep(0.1)
165         self.sendline()
166         time.sleep(0.5)
167         b = self.read_nonblocking(size=1000, timeout=1)
168         ld = self.levenshtein_distance(a, b)
169         len_a = len(a)
170         if len_a == 0:
171             return False
172         if float(ld) / len_a < 0.4:
173             return True
174         return False
175
176     # TODO: This is getting messy and I'm pretty sure this isn't perfect.
177     # TODO: I need to draw a flow chart for this.
178     def login(
179             self,
180             server,
181             username,
182             password='',
183             terminal_type='ansi',
184             original_prompt=r"[#$]",
185             login_timeout=10,
186             port=None,
187             auto_prompt_reset=True):
188         """This logs the user into the given server. It uses the
189         'original_prompt' to try to find the prompt right after login. When it
190         finds the prompt it immediately tries to reset the prompt to something
191         more easily matched. The default 'original_prompt' is very optimistic
192         and is easily fooled. It's more reliable to try to match the original
193         prompt as exactly as possible to prevent false matches by server
194         strings such as the "Message Of The Day". On many systems you can
195         disable the MOTD on the remote server by creating a zero-length file
196         called "~/.hushlogin" on the remote server. If a prompt cannot be found
197         then this will not necessarily cause the login to fail. In the case of
198         a timeout when looking for the prompt we assume that the original
199         prompt was so weird that we could not match it, so we use a few tricks
200         to guess when we have reached the prompt. Then we hope for the best and
201         blindly try to reset the prompt to something more unique. If that fails
202         then login() raises an ExceptionPxssh exception.
203
204         In some situations it is not possible or desirable to reset the
205         original prompt. In this case, set 'auto_prompt_reset' to False to
206         inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
207         uses a unique prompt in the prompt() method. If the original prompt is
208         not reset then this will disable the prompt() method unless you
209         manually set the PROMPT attribute. """
210
211         ssh_options = '-q'
212         if self.force_password:
213             ssh_options = ssh_options + ' ' + self.SSH_OPTS
214         if port is not None:
215             ssh_options = ssh_options + ' -p %s' % (str(port))
216         cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
217
218         # This does not distinguish between a remote server 'password' prompt
219         # and a local ssh 'passphrase' prompt (for unlocking a private key).
220         spawn._spawn(self, cmd)
221         i = self.expect(
222             [
223                 "(?i)are you sure you want to continue connecting",
224                 original_prompt,
225                 "(?i)(?:password)|(?:passphrase for key)",
226                 "(?i)permission denied",
227                 "(?i)terminal type",
228                 TIMEOUT,
229                 "(?i)connection closed by remote host"],
230             timeout=login_timeout)
231
232         # First phase
233         if i == 0:
234             # New certificate -- always accept it.
235             # This is what you get if SSH does not have the remote host's
236             # public key stored in the 'known_hosts' cache.
237             self.sendline("yes")
238             i = self.expect(
239                 [
240                     "(?i)are you sure you want to continue connecting",
241                     original_prompt,
242                     "(?i)(?:password)|(?:passphrase for key)",
243                     "(?i)permission denied",
244                     "(?i)terminal type",
245                     TIMEOUT])
246         if i == 2:  # password or passphrase
247             self.sendline(password)
248             i = self.expect(
249                 [
250                     "(?i)are you sure you want to continue connecting",
251                     original_prompt,
252                     "(?i)(?:password)|(?:passphrase for key)",
253                     "(?i)permission denied",
254                     "(?i)terminal type",
255                     TIMEOUT])
256         if i == 4:
257             self.sendline(terminal_type)
258             i = self.expect(
259                 [
260                     "(?i)are you sure you want to continue connecting",
261                     original_prompt,
262                     "(?i)(?:password)|(?:passphrase for key)",
263                     "(?i)permission denied",
264                     "(?i)terminal type",
265                     TIMEOUT])
266
267         # Second phase
268         if i == 0:
269             # This is weird. This should not happen twice in a row.
270             self.close()
271             raise ExceptionPxssh(
272                 'Weird error. Got "are you sure" prompt twice.')
273         elif i == 1:  # can occur if you have a public key pair set to authenticate.
274             # TODO: May NOT be OK if expect() got tricked and matched a false
275             # prompt.
276             pass
277         elif i == 2:  # password prompt again
278             # For incorrect passwords, some ssh servers will
279             # ask for the password again, others return 'denied' right away.
280             # If we get the password prompt again then this means
281             # we didn't get the password right the first time.
282             self.close()
283             raise ExceptionPxssh('password refused')
284         elif i == 3:  # permission denied -- password was bad.
285             self.close()
286             raise ExceptionPxssh('permission denied')
287         elif i == 4:  # terminal type again? WTF?
288             self.close()
289             raise ExceptionPxssh(
290                 'Weird error. Got "terminal type" prompt twice.')
291         elif i == 5:  # Timeout
292             # This is tricky... I presume that we are at the command-line prompt.
293             # It may be that the shell prompt was so weird that we couldn't match
294             # it. Or it may be that we couldn't log in for some other reason. I
295             # can't be sure, but it's safe to guess that we did login because if
296             # I presume wrong and we are not logged in then this should be caught
297             # later when I try to set the shell prompt.
298             pass
299         elif i == 6:  # Connection closed by remote host
300             self.close()
301             raise ExceptionPxssh('connection closed')
302         else:  # Unexpected
303             self.close()
304             raise ExceptionPxssh('unexpected login response')
305         if not self.sync_original_prompt():
306             self.close()
307             raise ExceptionPxssh('could not synchronize with original prompt')
308         # We appear to be in.
309         # set shell prompt to something unique.
310         if auto_prompt_reset:
311             if not self.set_unique_prompt():
312                 self.close()
313                 raise ExceptionPxssh(
314                     'could not set shell prompt\n' + self.before)
315         return True
316
317     def logout(self):
318         """This sends exit to the remote shell. If there are stopped jobs then
319         this automatically sends exit twice. """
320
321         self.sendline("exit")
322         index = self.expect([EOF, "(?i)there are stopped jobs"])
323         if index == 1:
324             self.sendline("exit")
325             self.expect(EOF)
326         self.close()
327
328     def prompt(self, timeout=20):
329         """This matches the shell prompt. This is little more than a short-cut
330         to the expect() method. This returns True if the shell prompt was
331         matched. This returns False if there was a timeout. Note that if you
332         called login() with auto_prompt_reset set to False then you should have
333         manually set the PROMPT attribute to a regex pattern for matching the
334         prompt. """
335
336         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
337         if i == 1:
338             return False
339         return True
340
341     def set_unique_prompt(self):
342         """This sets the remote prompt to something more unique than # or $.
343         This makes it easier for the prompt() method to match the shell prompt
344         unambiguously. This method is called automatically by the login()
345         method, but you may want to call it manually if you somehow reset the
346         shell prompt. For example, if you 'su' to a different user then you
347         will need to manually reset the prompt. This sends shell commands to
348         the remote host to set the prompt, so this assumes the remote host is
349         ready to receive commands.
350
351         Alternatively, you may use your own prompt pattern. Just set the PROMPT
352         attribute to a regular expression that matches it. In this case you
353         should call login() with auto_prompt_reset=False; then set the PROMPT
354         attribute. After that the prompt() method will try to match your prompt
355         pattern."""
356
357         self.sendline("unset PROMPT_COMMAND")
358         self.sendline(self.PROMPT_SET_SH)  # sh-style
359         i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
360         if i == 0:  # csh-style
361             self.sendline(self.PROMPT_SET_CSH)
362             i = self.expect([TIMEOUT, self.PROMPT], timeout=10)
363             if i == 0:
364                 return False
365         return True
366
367 # vi:ts=4:sw=4:expandtab:ft=python: