Package x2go :: Module rforward
[frames] | no frames]

Source Code for Module x2go.rforward

  1  # Copyright (C) 2010-2016 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  2  # 
  3  # Python X2Go is free software; you can redistribute it and/or modify 
  4  # it under the terms of the GNU Affero General Public License as published by 
  5  # the Free Software Foundation; either version 3 of the License, or 
  6  # (at your option) any later version. 
  7  # 
  8  # Python X2Go is distributed in the hope that it will be useful, 
  9  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 10  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 11  # GNU Affero General Public License for more details. 
 12  # 
 13  # You should have received a copy of the GNU Affero General Public License 
 14  # along with this program; if not, write to the 
 15  # Free Software Foundation, Inc., 
 16  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 17   
 18  """\ 
 19  X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and 
 20  X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal 
 21  server sessions. 
 22   
 23  """ 
 24  __NAME__ = 'x2gorevtunnel-pylib' 
 25   
 26  # modules 
 27  import copy 
 28  import threading 
 29  import gevent 
 30  import paramiko 
 31   
 32  # gevent/greenlet 
 33  from gevent import select, socket, Timeout 
 34   
 35  # Python X2Go modules 
 36  import log 
 37   
 38   
39 -def x2go_transport_tcp_handler(chan, (origin_addr, origin_port), (server_addr, server_port)):
40 """\ 41 An X2Go customized TCP handler for the Paramiko/SSH C{Transport()} class. 42 43 Incoming channels will be put into Paramiko's default accept queue. This corresponds to 44 the default behaviour of Paramiko's C{Transport} class. 45 46 However, additionally this handler function checks the server port of the incoming channel 47 and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming 48 channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an L{X2GoSession} instance 49 (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests). 50 51 If the server port of an incoming Paramiko/SSH channel matches the configured port of an L{X2GoRevFwTunnel} 52 instance, this instance gets notified of the incoming channel and a new L{X2GoRevFwChannelThread} is 53 started. This L{X2GoRevFwChannelThread} then takes care of the new channel's incoming data stream. 54 55 """ 56 transport = chan.get_transport() 57 transport._queue_incoming_channel(chan) 58 rev_tuns = transport.reverse_tunnels 59 60 for session_name in rev_tuns.keys(): 61 62 if int(server_port) in [ int(tunnel[0]) for tunnel in rev_tuns[session_name].values() ]: 63 64 if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]): 65 rev_tuns[session_name]['snd'][1].notify() 66 67 elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]): 68 rev_tuns[session_name]['sshfs'][1].notify()
69 70
71 -class X2GoRevFwTunnel(threading.Thread):
72 """\ 73 L{X2GoRevFwTunnel} class objects are used to reversely tunnel 74 X2Go audio, X2Go printing and X2Go folder sharing / device mounting 75 through Paramiko/SSH. 76 77 """
78 - def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
79 """\ 80 Setup a reverse tunnel through Paramiko/SSH. 81 82 After the reverse tunnel has been setup up with L{X2GoRevFwTunnel.start()} it waits 83 for notification from L{X2GoRevFwTunnel.notify()} to accept incoming channels. This 84 notification (L{X2GoRevFwTunnel.notify()} gets called from within the transport's 85 TCP handler function L{x2go_transport_tcp_handler} of the L{X2GoSession} instance. 86 87 @param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel), 88 normally some number above 30000 89 @type server_port: int 90 @param remote_host: the target address for reversely tunneled traffic. With X2Go this should 91 always be set to the localhost (IPv4) address. 92 @type remote_host: str 93 @param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel), 94 normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.) 95 @type remote_port: int 96 @param ssh_transport: the L{X2GoSession}'s Paramiko/SSH transport instance 97 @type ssh_transport: C{paramiko.Transport} instance 98 @param logger: you can pass an L{X2GoLogger} object to the 99 L{X2GoRevFwTunnel} constructor 100 @type logger: L{X2GoLogger} instance 101 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be 102 constructed with the given loglevel 103 @type loglevel: int 104 105 """ 106 if logger is None: 107 self.logger = log.X2GoLogger(loglevel=loglevel) 108 else: 109 self.logger = copy.deepcopy(logger) 110 self.logger.tag = __NAME__ 111 112 self.server_port = server_port 113 self.remote_host = remote_host 114 self.remote_port = remote_port 115 self.ssh_transport = ssh_transport 116 self.session_instance = session_instance 117 118 self.open_channels = {} 119 self.incoming_channel = threading.Condition() 120 121 threading.Thread.__init__(self) 122 self.daemon = True 123 self._accept_channels = True
124
125 - def __del__(self):
126 """\ 127 Class destructor. 128 129 """ 130 self.stop_thread() 131 self.cancel_port_forward('', self.server_port)
132
133 - def cancel_port_forward(self, address, port):
134 """\ 135 Cancel a port forwarding request. This cancellation request is sent to the server and 136 on the server the port forwarding should be unregistered. 137 138 @param address: remote server address 139 @type address: C{str} 140 @param port: remote port 141 @type port: C{int} 142 143 """ 144 timeout = Timeout(10) 145 timeout.start() 146 try: 147 self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True) 148 except: 149 pass 150 finally: 151 timeout.cancel()
152
153 - def pause(self):
154 """\ 155 Prevent acceptance of new incoming connections through the Paramiko/SSH 156 reverse forwarding tunnel. Also, any active connection on this L{X2GoRevFwTunnel} 157 instance will be closed immediately, if this method is called. 158 159 """ 160 if self._accept_channels == True: 161 self.cancel_port_forward('', self.server_port) 162 self._accept_channels = False 163 self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
164
165 - def resume(self):
166 """\ 167 Resume operation of the Paramiko/SSH reverse forwarding tunnel 168 and continue accepting new incoming connections. 169 170 """ 171 if self._accept_channels == False: 172 self._accept_channels = True 173 self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) 174 self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
175
176 - def notify(self):
177 """\ 178 Notify an L{X2GoRevFwTunnel} instance of an incoming Paramiko/SSH channel. 179 180 If an incoming reverse tunnel channel appropriate for this instance has 181 been detected, this method gets called from the L{X2GoSession}'s transport 182 TCP handler. 183 184 The sent notification will trigger a C{thread.Condition()} waiting for notification 185 in L{X2GoRevFwTunnel.run()}. 186 187 """ 188 self.incoming_channel.acquire() 189 self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG) 190 self.incoming_channel.notify() 191 self.incoming_channel.release()
192
193 - def stop_thread(self):
194 """\ 195 Stops this L{X2GoRevFwTunnel} thread completely. 196 197 """ 198 self.pause() 199 self._keepalive = False 200 self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG) 201 self.notify()
202
203 - def _request_port_forwarding(self):
204 try: 205 self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) 206 except paramiko.SSHException: 207 # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on 208 # self.server_port 209 self.cancel_port_forward('', self.server_port) 210 gevent.sleep(1) 211 try: 212 self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler) 213 except paramiko.SSHException, e: 214 if self.session_instance: 215 self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port) 216 else: 217 self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)
218
219 - def run(self):
220 """\ 221 This method gets run once an L{X2GoRevFwTunnel} has been started with its 222 L{start()} method. Use L{X2GoRevFwTunnel}.stop_thread() to stop the 223 reverse forwarding tunnel again. You can also temporarily lock the tunnel 224 down with L{X2GoRevFwTunnel.pause()} and L{X2GoRevFwTunnel.resume()}). 225 226 L{X2GoRevFwTunnel.run()} waits for notifications of an appropriate incoming 227 Paramiko/SSH channel (issued by L{X2GoRevFwTunnel.notify()}). Appropriate in 228 this context means, that its start point on the X2Go server matches the class's 229 property C{server_port}. 230 231 Once a new incoming channel gets announced by the L{notify()} method, a new 232 L{X2GoRevFwChannelThread} instance will be initialized. As a data stream handler, 233 the function L{x2go_rev_forward_channel_handler()} will be used. 234 235 The channel will last till the connection gets dropped on the X2Go server side or 236 until the tunnel gets paused by an L{X2GoRevFwTunnel.pause()} call or stopped via the 237 L{X2GoRevFwTunnel.stop_thread()} method. 238 239 """ 240 self._request_port_forwarding() 241 self._keepalive = True 242 while self._keepalive: 243 244 self.incoming_channel.acquire() 245 246 self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 247 self.incoming_channel.wait() 248 249 if self._keepalive: 250 self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 251 _chan = self.ssh_transport.accept() 252 self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG) 253 else: 254 self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG) 255 256 self.incoming_channel.release() 257 if self._accept_channels and self._keepalive: 258 _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port), 259 target=x2go_rev_forward_channel_handler, 260 kwargs={ 261 'chan': _chan, 262 'addr': self.remote_host, 263 'port': self.remote_port, 264 'parent_thread': self, 265 'logger': self.logger, 266 } 267 ) 268 _new_chan_thread.start() 269 self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread
270 271
272 -def x2go_rev_forward_channel_handler(chan=None, addr='', port=0, parent_thread=None, logger=None, ):
273 """\ 274 Handle the data stream of a requested channel that got set up by a L{X2GoRevFwTunnel} (Paramiko/SSH 275 reverse forwarding tunnel). 276 277 The channel (and the corresponding connections) close either ... 278 279 - ... if the connecting application closes the connection and thus, drops 280 the channel, or 281 - ... if the L{X2GoRevFwTunnel} parent thread gets paused. The call 282 of L{X2GoRevFwTunnel.pause()} on the instance can be used to shut down all incoming 283 tunneled SSH connections associated to this L{X2GoRevFwTunnel} instance 284 from within a Python X2Go application. 285 286 @param chan: channel 287 @type chan: C{class} 288 @param addr: bind address 289 @type addr: C{str} 290 @param port: bind port 291 @type port: C{int} 292 @param parent_thread: the calling L{X2GoRevFwTunnel} instance 293 @type parent_thread: L{X2GoRevFwTunnel} instance 294 @param logger: you can pass an L{X2GoLogger} object to the 295 L{X2GoRevFwTunnel} constructor 296 @type logger: L{X2GoLogger} instance 297 298 """ 299 fw_socket = socket.socket() 300 fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 301 if logger is None: 302 def _dummy_logger(msg, l): 303 pass
304 logger = _dummy_logger 305 306 try: 307 fw_socket.connect((addr, port)) 308 except Exception, e: 309 logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO) 310 return 311 312 logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr, 313 chan.getpeername(), (addr, port)), 314 loglevel=log.loglevel_INFO) 315 while parent_thread._accept_channels: 316 r, w, x = select.select([fw_socket, chan], [], []) 317 try: 318 if fw_socket in r: 319 data = fw_socket.recv(1024) 320 if len(data) == 0: 321 break 322 chan.send(data) 323 if chan in r: 324 data = chan.recv(1024) 325 if len(data) == 0: 326 break 327 fw_socket.send(data) 328 except socket.error, e: 329 logger('Reverse tunnel %s encoutered socket error: %s' % (chan, str(e)), loglevel=log.loglevel_WARN) 330 331 chan.close() 332 fw_socket.close() 333 logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO) 334 335
336 -class X2GoRevFwChannelThread(threading.Thread):
337 """\ 338 Starts a thread for each incoming Paramiko/SSH data channel trough the reverse 339 forwarding tunnel. 340 341 """
342 - def __init__(self, channel, remote=None, **kwargs):
343 """\ 344 Initializes a reverse forwarding channel thread. 345 346 @param channel: incoming Paramiko/SSH channel from the L{X2GoSession}'s transport 347 accept queue 348 @type channel: class 349 @param remote: tuple (addr, port) that specifies the data endpoint of the channel 350 @type remote: C{tuple(str, int)} 351 352 """ 353 self.channel = channel 354 if remote is not None: 355 self.remote_host = remote[0] 356 self.remote_port = remote[1] 357 threading.Thread.__init__(self, **kwargs) 358 self.daemon = True
359