from binascii import hexlify
import os
[docs]def make_auth_external() -> bytes:
"""Prepare an AUTH command line with the current effective user ID.
This is the preferred authentication method for typical D-Bus connections
over a Unix domain socket.
"""
hex_uid = hexlify(str(os.geteuid()).encode('ascii'))
return b'AUTH EXTERNAL %b\r\n' % hex_uid
[docs]def make_auth_anonymous() -> bytes:
"""Format an AUTH command line for the ANONYMOUS mechanism
Jeepney's higher-level wrappers don't currently use this mechanism,
but third-party code may choose to.
See <https://tools.ietf.org/html/rfc4505> for details.
"""
from . import __version__
trace = hexlify(('Jeepney %s' % __version__).encode('ascii'))
return b'AUTH ANONYMOUS %s\r\n' % trace
BEGIN = b'BEGIN\r\n'
[docs]class AuthenticationError(ValueError):
"""Raised by integration code when DBus authentication fails"""
def __init__(self, data):
self.data = data
def __str__(self):
return "Authentication failed. Bus sent: %r" % self.data
[docs]class SASLParser:
"""Parse authentication messages received"""
def __init__(self):
self.buffer = b''
self.authenticated = False
self.error = None
def process_line(self, line):
if line.startswith(b'OK '):
self.authenticated = True
else:
self.error = line
[docs] def feed(self, data: bytes):
"""Process received data"""
self.buffer += data
while (b'\r\n' in self.buffer) and not self.authenticated:
line, self.buffer = self.buffer.split(b'\r\n', 1)
self.process_line(line)