# -*- coding: utf-8 -*-
# vim:fenc=utf-8
# Copyright (C) 2012-2019 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# X2Go Session Broker is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# X2Go Session Broker is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
"""\
Here you find a collection of tools that are used by X2Go Session
Broker's code internally.
Everything that is not directly related to specific broker code and
potentially reusable at other places in the code tree, is placed into
this module.
"""
import os
import sys
import locale
import netaddr
import distutils.version
import pwd, grp
import socket
import binascii
import time
def _checkConfigFileDefaults(data_structure):
"""\
Check an ini-file-like data structure.
:param data_structure: an ini-file-like data structure
:type data_structure: ``dict`` of ``dict``s
:returns: ``True`` if ``data_structure`` matches that of an ini file data structure
:rtype: ``bool``
"""
if data_structure is None:
return False
if type(data_structure) is not dict:
return False
for sub_dict in list(data_structure.values()):
if type(sub_dict) is not dict:
return False
return True
[docs]def touch_file(filename, mode='a'):
"""\
Imitates the behaviour of the GNU/touch command.
:param filename: name of the file to touch
:type filename: ``str``
:param mode: the file mode (as used for Python file objects)
:type mode: ``str``
"""
if not os.path.isdir(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename), mode=0o700)
f = open(filename, mode=mode)
f.close()
[docs]def get_encoding():
"""\
Detect systems default character encoding.
:returns: The system's local character encoding.
:rtype: ``str``
"""
try:
encoding = locale.getdefaultlocale()[1]
if encoding is None:
raise BaseException
except:
try:
encoding = sys.getdefaultencoding()
except:
encoding = 'ascii'
return encoding
[docs]def compare_versions(version_a, op, version_b):
"""\
Compare <version_a> with <version_b> using operator <op>.
In the background ``distutils.version.LooseVersion`` is
used for the comparison operation.
:param version_a: a version string
:type version_a: ``str``
:param op: an operator provide as string (e.g. '<', '>', '==', '>=' etc.)
:type op: ``str``
:param version_b: another version string that is to be compared with <version_a>
:type version_b: ``str``
:returns: if the comparison is ``True`` or ``False``
:rtype: ``bool``
"""
### FIXME: this comparison is not reliable with beta et al. version strings
ver_a = distutils.version.LooseVersion(version_a)
ver_b = distutils.version.LooseVersion(version_b)
return eval("ver_a %s ver_b" % op)
[docs]def normalize_hostnames(servers):
"""\
Take a ``list`` or ``dict`` of servers and check if they match in
their domain part and strip the domain part finally off.
E.g., for servers provided as a ``list`` (tuple would be ok, too::
['server1', 'server2'] -> ['server1', server2']
['server1.domain1, 'server2.domain1'] -> ['server1', server2']
['server1.domain1, 'server2.domain2'] -> (['server1', server2'], ['domain1', 'domain2']
E.g., for servers provided as a ``dict``::
{'server1': <whatever-params>, 'server2': <whatever-params> } -> {'server1': <whatever-params>, 'server2': <whatever-params> }
{'server1.domain1': <whatever-params>, 'server2.domain1': <whatever-params> } -> {'server1': <whatever-params>, 'server2': <whatever-params> }
{'server1.domain1': <whatever-params>, 'server2.domain2': <whatever-params> } -> ({'server1': <whatever-params>, 'server2': <whatever-params> }, ['domain1', 'domain2']
:param servers: a ``list``, ``tuple`` or ``dict`` hash with either server hostnames as items or dictionary keys
:type servers: ``list``, ``tuple`` or ``dict``
:returns: a ``list`` or a ``dict`` with server domains stripped of the items / keys
:rtype: ``list``, ``dict`` or ``tuple``
"""
# test the data type of servers
arg_is_dict = False
servers_normalized = []
if type(servers) is dict:
arg_is_dict = True
servers_normalized = {}
elif type(servers) is tuple:
servers=list(servers)
elif type(servers) not in (list, tuple):
raise ValueError('only lists, tuples and dictionaries are valid for x2gobroker.utils.normalize_hostnames()')
subdomains = []
for server in servers:
# do not deal with IPv4 or IPv6 addresses
if netaddr.valid_ipv4(server) or netaddr.valid_ipv6(server):
continue
else:
_server = server
if '.' not in _server:
_server += '.'
hostname, subdomain = _server.split('.', 1)
if arg_is_dict:
servers_normalized[hostname] = servers[server]
else:
servers_normalized.append(hostname)
# collect the list of subdomains used in all server names
if subdomain and subdomain not in subdomains:
subdomains.append(subdomain)
# return the original servers dict/list/tuple
if len(subdomains) > 1:
servers_normalized = servers
return servers_normalized, subdomains
[docs]def matching_hostnames(server_list_a, server_list_b):
"""\
Compare two list of servers, if they have matching hostnames.
This function tries to smoothly ship around asymmetric usage of
FQDN hostnames and short hostnames in one list.
:param server_list_a: list of servers
:type server_list_a: ``list`` of ``str``
:param server_list_b: list of servers to compare the first list with
:type server_list_b: ``list`` of ``str``
:returns: a sorted list of matching server hostnames (hostnames that
appear in both provided server lists.
:returns: ``list`` of ``str``
"""
matching_hosts = []
### NORMALIZE (=reduce to hostname only) server names (list A) if possible
server_list_a_normalized, subdomains_a = normalize_hostnames(server_list_a)
### NORMALIZE server names (in list B), only if we have a unique domain match in list A
if len(subdomains_a) <= 1:
server_list_b_normalized, subdomains_b = normalize_hostnames(server_list_b)
if len(subdomains_b) <= 1:
if len(subdomains_a) == 0 or len(subdomains_b) == 0:
matching_hosts = list(set(server_list_a_normalized).intersection(set(server_list_b_normalized)))
if not matching_hosts:
matching_hosts = list(set(server_list_a).intersection(set(server_list_b)))
matching_hosts.sort()
return matching_hosts
[docs]def drop_privileges(uid, gid):
"""\
Drop privileges from super-user root to given ``<uid>`` and ``<gid>``.
Only works when run as root, if run with a non-super-user account,
``None`` is returned.
If privileges could be dropped, the environment's HOME variable is
adapted to the new user account's home directory path.
Also, the umask of the account we dropped privileges to is set to
``0o077``.
:param uid: the user ID of the user account to drop privileges to
:type uid: ``str``
:param gid: the group ID to drop privileges to
:type gid: ``str``
"""
if os.getuid() != 0:
# We're not root so, like, whatever dude
return
# Get the uid/gid from the name
running_uid = pwd.getpwnam(uid).pw_uid
running_gid = grp.getgrnam(gid).gr_gid
# Remove group privileges
os.setgroups([])
# Try setting the new uid/gid
os.setgid(running_gid)
os.setuid(running_uid)
# Ensure a very conservative umask
os.umask(0o077)
# set the new user's home directory as $HOME
os.environ['HOME'] = pwd.getpwnam(uid).pw_dir
[docs]def split_host_address(host, default_address=None, default_port=22):
"""\
Try to split a ``<host_addr>:<port>`` expression into hostname and port.
This function is supposed to work with DNS hostnames, IPv4 and IPv6
address.
Both parts (<host_addr> and <port>) can be omitted in the given
``host`` string. If so, ``default_address`` and ``default_port`` come
into play.
:param host: an expression like ``<host_addr>:<port>`` (where either
the host address or the port can be optional)
:type host: ``str``
:param default_address: a fallback host address to be used (default: None)
:type default_address: ``str``
:param default_port: a fallback port to be used (default: 22)
:type default_port: ``int``
:returns: a tuple of host address and port
:rtype: ``tuple(<host_addr>, <port>)``
"""
if type(host) is int:
host = str(host)
# do some stripping first...
host = host.strip()
host = host.lstrip('*')
host = host.lstrip(':')
bind_address = None
bind_port = None
is_ipv6 = None
if host and host[0] == '[':
is_ipv6 = True
if ':' in host:
bind_address, bind_port = host.rsplit(':', 1)
try:
bind_port = int(bind_port)
except ValueError:
# obviously we split an IPv6 address
bind_address = host
bind_port = int(default_port)
else:
try:
# in host we find a port number only
bind_port = int(host)
except ValueError:
if host:
bind_address = host
else:
bind_address = '0.0.0.0'
if type(default_port) is int:
# use the given default, in host, there is an IP address or hostname
bind_port = default_port
else:
# setting a hard-coded port
bind_port = 22
if bind_address is None:
# in "host" we found the bind_port, now we assign the bind_address
bind_address = '0.0.0.0'
if default_address:
bind_address = default_address
bind_address = bind_address.lstrip('[').rstrip(']')
if is_ipv6:
bind_address = '[{address}]'.format(address=bind_address)
return bind_address, bind_port
[docs]def portscan(addr, port=22):
"""\
Perform a port scan to the requested hostname.
:param addr: address (IPv4, IPv6 or hostname) of the host
we want to probe
:type addr: ``str``
:param port: port number (default: 22)
:type port: ``int``
:returns: ``True`` if the port is in use, else ``False`` (also on errors)
:rtype: ``bool``
"""
ip_proto = 0
try:
socket.getaddrinfo(addr, None, socket.AF_INET6)
ip_proto = 6
except socket.gaierror:
try:
socket.getaddrinfo(addr, None, socket.AF_INET)
ip_proto = 4
except socket.gaierror:
# we can't find a valid address for this host, so returning a failure...
return False
if ip_proto == 6 or netaddr.valid_ipv6(addr):
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
elif ip_proto == 4 or netaddr.valid_ipv4(addr):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
try:
result = sock.connect_ex((addr, port))
if result !=0:
sock.close()
return False
except socket.gaierror:
return False
except socket.error:
return False
finally:
sock.close()
return True
[docs]def get_key_fingerprint(key):
"""\
Retrieve the host key fingerprint of the server to be validated.
:param key: a Python Paramik :class:`PKey`` object
:type key: ``PKey``
:returns: host key fingerprint
:rtype: ``str``
"""
return binascii.hexlify(key.get_fingerprint()).decode()
[docs]def get_key_fingerprint_with_colons(key):
"""\
Retrieve the (colonized) host key fingerprint of the server
to be validated.
:param key: a Python Paramik :class:`PKey`` object
:type key: ``PKey``
:returns: host key fingerprint (with colons)
:rtype: ``str``
"""
_fingerprint = get_key_fingerprint(key)
_colon_fingerprint = ''
idx = 0
for char in _fingerprint:
idx += 1
_colon_fingerprint += char
if idx % 2 == 0:
_colon_fingerprint += ':'
return _colon_fingerprint.rstrip(':')
[docs]def delayed_execution(agent_func, delay, *args, **kwargs):
"""\
Delay execution of a function.
:param func: function to be executed.
:type func: ``func``
:param delay: delay of the function start in seconds
:type delay: ``int``
:param args: arg parameters to be handed over to the
to-be-delayed function
:type args: ``list``
:param kwargs: kwarg parameters to be handed over to the
to-be-delayed function
:type kwargs: ``dict``
"""
forkpid = os.fork()
if forkpid == 0:
# close stdin, stdout and stderr in the forked process...
for nm in os.listdir("/proc/self/fd"):
if nm.startswith('.'):
continue
fd = int(nm)
if fd in (0,1,2):
os.close(fd)
# wait for the given delay period
i = 0
while i < delay:
time.sleep(1)
i += 1
# execute the function requested
agent_func(*args, **kwargs)
os._exit(0)