385
def check_option_syntax(parser, options):
386
"""Apply additional restrictions on options, not expressible in
389
def has_actions(options):
390
return any((options.enable,
392
options.bump_timeout,
393
options.start_checker,
394
options.stop_checker,
397
options.checker is not None,
398
options.timeout is not None,
399
options.extended_timeout is not None,
400
options.interval is not None,
401
options.approved_by_default is not None,
402
options.approval_delay is not None,
403
options.approval_duration is not None,
404
options.host is not None,
405
options.secret is not None,
271
class TableOfClients(object):
274
"Enabled": "Enabled",
275
"Timeout": "Timeout",
276
"LastCheckedOK": "Last Successful Check",
277
"LastApprovalRequest": "Last Approval Request",
278
"Created": "Created",
279
"Interval": "Interval",
281
"Fingerprint": "Fingerprint",
283
"CheckerRunning": "Check Is Running",
284
"LastEnabled": "Last Enabled",
285
"ApprovalPending": "Approval Is Pending",
286
"ApprovedByDefault": "Approved By Default",
287
"ApprovalDelay": "Approval Delay",
288
"ApprovalDuration": "Approval Duration",
289
"Checker": "Checker",
290
"ExtendedTimeout": "Extended Timeout",
291
"Expires": "Expires",
292
"LastCheckerStatus": "Last Checker Status",
295
def __init__(self, clients, keywords, tableheaders=None):
296
self.clients = clients
297
self.keywords = keywords
298
if tableheaders is not None:
299
self.tableheaders = tableheaders
302
return "\n".join(self.rows())
304
if sys.version_info.major == 2:
305
__unicode__ = __str__
307
return str(self).encode(locale.getpreferredencoding())
310
format_string = self.row_formatting_string()
311
rows = [self.header_line(format_string)]
312
rows.extend(self.client_line(client, format_string)
313
for client in self.clients)
316
def row_formatting_string(self):
317
"Format string used to format table rows"
318
return " ".join("{{{key}:{width}}}".format(
319
width=max(len(self.tableheaders[key]),
320
*(len(self.string_from_client(client, key))
321
for client in self.clients)),
323
for key in self.keywords)
325
def string_from_client(self, client, key):
326
return self.valuetostring(client[key], key)
329
def valuetostring(value, keyword):
330
if isinstance(value, dbus.Boolean):
331
return "Yes" if value else "No"
332
if keyword in ("Timeout", "Interval", "ApprovalDelay",
333
"ApprovalDuration", "ExtendedTimeout"):
334
return milliseconds_to_string(value)
337
def header_line(self, format_string):
338
return format_string.format(**self.tableheaders)
340
def client_line(self, client, format_string):
341
return format_string.format(
342
**{key: self.string_from_client(client, key)
343
for key in self.keywords})
346
## Classes for commands.
348
# Abstract classes first
349
class Command(object):
350
"""Abstract class for commands"""
351
def run(self, mandos, clients):
352
"""Normal commands should implement run_on_one_client(), but
353
commands which want to operate on all clients at the same time
354
can override this run() method instead."""
356
for client in clients:
357
self.run_on_one_client(client)
359
class PrintCmd(Command):
360
"""Abstract class for commands printing client details"""
361
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
362
"Created", "Interval", "Host", "KeyID",
363
"Fingerprint", "CheckerRunning", "LastEnabled",
364
"ApprovalPending", "ApprovedByDefault",
365
"LastApprovalRequest", "ApprovalDelay",
366
"ApprovalDuration", "Checker", "ExtendedTimeout",
367
"Expires", "LastCheckerStatus")
368
def run(self, mandos, clients):
369
print(self.output(clients))
371
class PropertyCmd(Command):
372
"""Abstract class for Actions for setting one client property"""
373
def run_on_one_client(self, client):
374
"""Set the Client's D-Bus property"""
375
client.Set(client_interface, self.property, self.value_to_set,
376
dbus_interface=dbus.PROPERTIES_IFACE)
378
class ValueArgumentMixIn(object):
379
"""Mixin class for commands taking a value as argument"""
380
def __init__(self, value):
381
self.value_to_set = value
383
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
384
"""Mixin class for commands taking a value argument as
387
def value_to_set(self):
390
def value_to_set(self, value):
391
"""When setting, convert value to a datetime.timedelta"""
392
self._vts = string_to_delta(value).total_seconds() * 1000
394
# Actual (non-abstract) command classes
396
class PrintTableCmd(PrintCmd):
397
def __init__(self, verbose=False):
398
self.verbose = verbose
399
def output(self, clients):
401
keywords = self.all_keywords
403
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
404
return str(TableOfClients(clients.values(), keywords))
406
class DumpJSONCmd(PrintCmd):
407
def output(self, clients):
408
data = {client["Name"]:
409
{key: self.dbus_boolean_to_bool(client[key])
410
for key in self.all_keywords}
411
for client in clients.values()}
412
return json.dumps(data, indent=4, separators=(',', ': '))
414
def dbus_boolean_to_bool(value):
415
if isinstance(value, dbus.Boolean):
419
class IsEnabledCmd(Command):
420
def run_on_one_client(self, client):
421
if self.is_enabled(client):
424
def is_enabled(self, client):
425
return client.Get(client_interface, "Enabled",
426
dbus_interface=dbus.PROPERTIES_IFACE)
428
class RemoveCmd(Command):
429
def run_on_one_client(self, client):
430
self.mandos.RemoveClient(client.__dbus_object_path__)
432
class ApproveCmd(Command):
433
def run_on_one_client(self, client):
434
client.Approve(dbus.Boolean(True),
435
dbus_interface=client_interface)
437
class DenyCmd(Command):
438
def run_on_one_client(self, client):
439
client.Approve(dbus.Boolean(False),
440
dbus_interface=client_interface)
442
class EnableCmd(PropertyCmd):
444
value_to_set = dbus.Boolean(True)
446
class DisableCmd(PropertyCmd):
448
value_to_set = dbus.Boolean(False)
450
class BumpTimeoutCmd(PropertyCmd):
451
property = "LastCheckedOK"
454
class StartCheckerCmd(PropertyCmd):
455
property = "CheckerRunning"
456
value_to_set = dbus.Boolean(True)
458
class StopCheckerCmd(PropertyCmd):
459
property = "CheckerRunning"
460
value_to_set = dbus.Boolean(False)
462
class ApproveByDefaultCmd(PropertyCmd):
463
property = "ApprovedByDefault"
464
value_to_set = dbus.Boolean(True)
466
class DenyByDefaultCmd(PropertyCmd):
467
property = "ApprovedByDefault"
468
value_to_set = dbus.Boolean(False)
470
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
473
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
482
class SetExtendedTimeoutCmd(PropertyCmd,
483
MillisecondsValueArgumentMixIn):
484
property = "ExtendedTimeout"
486
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
487
property = "Interval"
489
class SetApprovalDelayCmd(PropertyCmd,
490
MillisecondsValueArgumentMixIn):
491
property = "ApprovalDelay"
493
class SetApprovalDurationCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ApprovalDuration"
497
def has_actions(options):
498
return any((options.enable,
500
options.bump_timeout,
501
options.start_checker,
502
options.stop_checker,
505
options.checker is not None,
506
options.timeout is not None,
507
options.extended_timeout is not None,
508
options.interval is not None,
509
options.approved_by_default is not None,
510
options.approval_delay is not None,
511
options.approval_duration is not None,
512
options.host is not None,
513
options.secret is not None,
518
def commands_and_clients_from_options():
519
parser = argparse.ArgumentParser()
520
parser.add_argument("--version", action="version",
521
version="%(prog)s {}".format(version),
522
help="show version number and exit")
523
parser.add_argument("-a", "--all", action="store_true",
524
help="Select all clients")
525
parser.add_argument("-v", "--verbose", action="store_true",
526
help="Print all fields")
527
parser.add_argument("-j", "--dump-json", action="store_true",
528
help="Dump client data in JSON format")
529
enable_disable = parser.add_mutually_exclusive_group()
530
enable_disable.add_argument("-e", "--enable", action="store_true",
531
help="Enable client")
532
enable_disable.add_argument("-d", "--disable",
534
help="disable client")
535
parser.add_argument("-b", "--bump-timeout", action="store_true",
536
help="Bump timeout for client")
537
start_stop_checker = parser.add_mutually_exclusive_group()
538
start_stop_checker.add_argument("--start-checker",
540
help="Start checker for client")
541
start_stop_checker.add_argument("--stop-checker",
543
help="Stop checker for client")
544
parser.add_argument("-V", "--is-enabled", action="store_true",
545
help="Check if client is enabled")
546
parser.add_argument("-r", "--remove", action="store_true",
547
help="Remove client")
548
parser.add_argument("-c", "--checker",
549
help="Set checker command for client")
550
parser.add_argument("-t", "--timeout",
551
help="Set timeout for client")
552
parser.add_argument("--extended-timeout",
553
help="Set extended timeout for client")
554
parser.add_argument("-i", "--interval",
555
help="Set checker interval for client")
556
approve_deny_default = parser.add_mutually_exclusive_group()
557
approve_deny_default.add_argument(
558
"--approve-by-default", action="store_true",
559
default=None, dest="approved_by_default",
560
help="Set client to be approved by default")
561
approve_deny_default.add_argument(
562
"--deny-by-default", action="store_false",
563
dest="approved_by_default",
564
help="Set client to be denied by default")
565
parser.add_argument("--approval-delay",
566
help="Set delay before client approve/deny")
567
parser.add_argument("--approval-duration",
568
help="Set duration of one client approval")
569
parser.add_argument("-H", "--host", help="Set host for client")
570
parser.add_argument("-s", "--secret",
571
type=argparse.FileType(mode="rb"),
572
help="Set password blob (file) for client")
573
approve_deny = parser.add_mutually_exclusive_group()
574
approve_deny.add_argument(
575
"-A", "--approve", action="store_true",
576
help="Approve any current client request")
577
approve_deny.add_argument("-D", "--deny", action="store_true",
578
help="Deny any current client request")
579
parser.add_argument("--check", action="store_true",
580
help="Run self-test")
581
parser.add_argument("client", nargs="*", help="Client name")
582
options = parser.parse_args()
409
584
if has_actions(options) and not (options.client or options.all):
410
585
parser.error("Options require clients names or --all.")
416
591
if options.all and not has_actions(options):
417
592
parser.error("--all requires an action.")
418
593
if options.is_enabled and len(options.client) > 1:
419
parser.error("--is-enabled requires exactly one client")
421
options.remove = False
422
if has_actions(options) and not options.deny:
423
parser.error("--remove can only be combined with --deny")
424
options.remove = True
427
def get_mandos_dbus_object(bus):
428
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
dbus_busname, server_dbus_path)
430
with if_dbus_exception_log_with_exception_and_exit(
431
"Could not connect to Mandos server: %s"):
432
mandos_dbus_object = bus.get_object(dbus_busname,
434
return mandos_dbus_object
437
@contextlib.contextmanager
438
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
441
except dbus.exceptions.DBusException as e:
442
log.critical(*(args + (e,)), **kwargs)
446
def get_managed_objects(object_manager):
447
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
with if_dbus_exception_log_with_exception_and_exit(
450
"Failed to access Mandos server through D-Bus:\n%s"):
451
with SilenceLogger("dbus.proxies"):
452
managed_objects = object_manager.GetManagedObjects()
453
return managed_objects
456
class SilenceLogger(object):
457
"Simple context manager to silence a particular logger"
458
def __init__(self, loggername):
459
self.logger = logging.getLogger(loggername)
462
self.logger.addFilter(self.nullfilter)
465
class NullFilter(logging.Filter):
466
def filter(self, record):
469
nullfilter = NullFilter()
471
def __exit__(self, exc_type, exc_val, exc_tb):
472
self.logger.removeFilter(self.nullfilter)
475
def commands_from_options(options):
594
parser.error("--is-enabled requires exactly one client")
479
if options.is_enabled:
480
commands.append(command.IsEnabled())
483
commands.append(command.Approve())
486
commands.append(command.Deny())
489
commands.append(command.Remove())
491
598
if options.dump_json:
492
commands.append(command.DumpJSON())
599
commands.append(DumpJSONCmd())
494
601
if options.enable:
495
commands.append(command.Enable())
602
commands.append(EnableCmd())
497
604
if options.disable:
498
commands.append(command.Disable())
605
commands.append(DisableCmd())
500
607
if options.bump_timeout:
501
commands.append(command.BumpTimeout())
608
commands.append(BumpTimeoutCmd(options.bump_timeout))
503
610
if options.start_checker:
504
commands.append(command.StartChecker())
611
commands.append(StartCheckerCmd())
506
613
if options.stop_checker:
507
commands.append(command.StopChecker())
509
if options.approved_by_default is not None:
510
if options.approved_by_default:
511
commands.append(command.ApproveByDefault())
513
commands.append(command.DenyByDefault())
614
commands.append(StopCheckerCmd())
616
if options.is_enabled:
617
commands.append(IsEnabledCmd())
620
commands.append(RemoveCmd())
515
622
if options.checker is not None:
516
commands.append(command.SetChecker(options.checker))
518
if options.host is not None:
519
commands.append(command.SetHost(options.host))
521
if options.secret is not None:
522
commands.append(command.SetSecret(options.secret))
623
commands.append(SetCheckerCmd())
524
625
if options.timeout is not None:
525
commands.append(command.SetTimeout(options.timeout))
626
commands.append(SetTimeoutCmd(options.timeout))
527
628
if options.extended_timeout:
529
command.SetExtendedTimeout(options.extended_timeout))
630
SetExtendedTimeoutCmd(options.extended_timeout))
531
632
if options.interval is not None:
532
commands.append(command.SetInterval(options.interval))
633
command.append(SetIntervalCmd(options.interval))
635
if options.approved_by_default is not None:
636
if options.approved_by_default:
637
command.append(ApproveByDefaultCmd())
639
command.append(DenyByDefaultCmd())
534
641
if options.approval_delay is not None:
536
command.SetApprovalDelay(options.approval_delay))
642
command.append(SetApprovalDelayCmd(options.approval_delay))
538
644
if options.approval_duration is not None:
540
command.SetApprovalDuration(options.approval_duration))
646
SetApprovalDurationCmd(options.approval_duration))
648
if options.host is not None:
649
command.append(SetHostCmd(options.host))
651
if options.secret is not None:
652
command.append(SetSecretCmd(options.secret))
655
commands.append(ApproveCmd())
658
commands.append(DenyCmd())
542
660
# If no command option has been given, show table of clients,
543
661
# optionally verbosely
545
commands.append(command.PrintTable(verbose=options.verbose))
550
class command(object):
551
"""A namespace for command classes"""
554
"""Abstract base class for commands"""
555
def run(self, clients, bus=None, mandos=None):
556
"""Normal commands should implement run_on_one_client(),
557
but commands which want to operate on all clients at the same time can
558
override this run() method instead.
561
for clientpath, properties in clients.items():
562
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
563
dbus_busname, str(clientpath))
564
client = bus.get_object(dbus_busname, clientpath)
565
self.run_on_one_client(client, properties)
568
class IsEnabled(Base):
569
def run(self, clients, bus=None, mandos=None):
570
client, properties = next(iter(clients.items()))
571
if self.is_enabled(client, properties):
574
def is_enabled(self, client, properties):
575
return properties["Enabled"]
579
def run_on_one_client(self, client, properties):
580
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
581
client.__dbus_object_path__,
582
client_dbus_interface)
583
client.Approve(dbus.Boolean(True),
584
dbus_interface=client_dbus_interface)
588
def run_on_one_client(self, client, properties):
589
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
590
client.__dbus_object_path__,
591
client_dbus_interface)
592
client.Approve(dbus.Boolean(False),
593
dbus_interface=client_dbus_interface)
597
def run_on_one_client(self, client, properties):
598
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
599
dbus_busname, server_dbus_path,
600
server_dbus_interface,
601
str(client.__dbus_object_path__))
602
self.mandos.RemoveClient(client.__dbus_object_path__)
606
"""Abstract class for commands outputting client details"""
607
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
608
"Created", "Interval", "Host", "KeyID",
609
"Fingerprint", "CheckerRunning",
610
"LastEnabled", "ApprovalPending",
611
"ApprovedByDefault", "LastApprovalRequest",
612
"ApprovalDelay", "ApprovalDuration",
613
"Checker", "ExtendedTimeout", "Expires",
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
623
class DumpJSON(Output):
624
def output(self, clients):
625
data = {client["Name"]:
626
{key: self.dbus_boolean_to_bool(client[key])
627
for key in self.all_keywords}
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
632
def dbus_boolean_to_bool(value):
633
if isinstance(value, dbus.Boolean):
638
class PrintTable(Output):
639
def __init__(self, verbose=False):
640
self.verbose = verbose
642
def output(self, clients):
643
default_keywords = ("Name", "Enabled", "Timeout",
645
keywords = default_keywords
647
keywords = self.all_keywords
648
return str(self.TableOfClients(clients, keywords))
650
class TableOfClients(object):
653
"Enabled": "Enabled",
654
"Timeout": "Timeout",
655
"LastCheckedOK": "Last Successful Check",
656
"LastApprovalRequest": "Last Approval Request",
657
"Created": "Created",
658
"Interval": "Interval",
660
"Fingerprint": "Fingerprint",
662
"CheckerRunning": "Check Is Running",
663
"LastEnabled": "Last Enabled",
664
"ApprovalPending": "Approval Is Pending",
665
"ApprovedByDefault": "Approved By Default",
666
"ApprovalDelay": "Approval Delay",
667
"ApprovalDuration": "Approval Duration",
668
"Checker": "Checker",
669
"ExtendedTimeout": "Extended Timeout",
670
"Expires": "Expires",
671
"LastCheckerStatus": "Last Checker Status",
674
def __init__(self, clients, keywords):
675
self.clients = clients
676
self.keywords = keywords
679
return "\n".join(self.rows())
681
if sys.version_info.major == 2:
682
__unicode__ = __str__
684
return str(self).encode(
685
locale.getpreferredencoding())
688
format_string = self.row_formatting_string()
689
rows = [self.header_line(format_string)]
690
rows.extend(self.client_line(client, format_string)
691
for client in self.clients)
694
def row_formatting_string(self):
695
"Format string used to format table rows"
696
return " ".join("{{{key}:{width}}}".format(
697
width=max(len(self.tableheaders[key]),
698
*(len(self.string_from_client(client,
700
for client in self.clients)),
702
for key in self.keywords)
704
def string_from_client(self, client, key):
705
return self.valuetostring(client[key], key)
708
def valuetostring(cls, value, keyword):
709
if isinstance(value, dbus.Boolean):
710
return "Yes" if value else "No"
711
if keyword in ("Timeout", "Interval", "ApprovalDelay",
712
"ApprovalDuration", "ExtendedTimeout"):
713
return cls.milliseconds_to_string(value)
716
def header_line(self, format_string):
717
return format_string.format(**self.tableheaders)
719
def client_line(self, client, format_string):
720
return format_string.format(
721
**{key: self.string_from_client(client, key)
722
for key in self.keywords})
725
def milliseconds_to_string(ms):
726
td = datetime.timedelta(0, 0, 0, ms)
727
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
728
.format(days="{}T".format(td.days)
730
hours=td.seconds // 3600,
731
minutes=(td.seconds % 3600) // 60,
732
seconds=td.seconds % 60))
735
class Property(Base):
736
"Abstract class for Actions for setting one client property"
738
def run_on_one_client(self, client, properties):
739
"""Set the Client's D-Bus property"""
740
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
741
client.__dbus_object_path__,
742
dbus.PROPERTIES_IFACE, client_dbus_interface,
743
self.propname, self.value_to_set
744
if not isinstance(self.value_to_set,
746
else bool(self.value_to_set))
747
client.Set(client_dbus_interface, self.propname,
749
dbus_interface=dbus.PROPERTIES_IFACE)
753
raise NotImplementedError()
756
class Enable(Property):
758
value_to_set = dbus.Boolean(True)
761
class Disable(Property):
763
value_to_set = dbus.Boolean(False)
766
class BumpTimeout(Property):
767
propname = "LastCheckedOK"
771
class StartChecker(Property):
772
propname = "CheckerRunning"
773
value_to_set = dbus.Boolean(True)
776
class StopChecker(Property):
777
propname = "CheckerRunning"
778
value_to_set = dbus.Boolean(False)
781
class ApproveByDefault(Property):
782
propname = "ApprovedByDefault"
783
value_to_set = dbus.Boolean(True)
786
class DenyByDefault(Property):
787
propname = "ApprovedByDefault"
788
value_to_set = dbus.Boolean(False)
791
class PropertyValue(Property):
792
"Abstract class for Property recieving a value as argument"
793
def __init__(self, value):
794
self.value_to_set = value
797
class SetChecker(PropertyValue):
801
class SetHost(PropertyValue):
805
class SetSecret(PropertyValue):
809
def value_to_set(self):
813
def value_to_set(self, value):
814
"""When setting, read data from supplied file object"""
815
self._vts = value.read()
819
class MillisecondsPropertyValueArgument(PropertyValue):
820
"""Abstract class for PropertyValue taking a value argument as
821
a datetime.timedelta() but should store it as milliseconds."""
824
def value_to_set(self):
828
def value_to_set(self, value):
829
"When setting, convert value from a datetime.timedelta"
830
self._vts = int(round(value.total_seconds() * 1000))
833
class SetTimeout(MillisecondsPropertyValueArgument):
837
class SetExtendedTimeout(MillisecondsPropertyValueArgument):
838
propname = "ExtendedTimeout"
841
class SetInterval(MillisecondsPropertyValueArgument):
842
propname = "Interval"
845
class SetApprovalDelay(MillisecondsPropertyValueArgument):
846
propname = "ApprovalDelay"
849
class SetApprovalDuration(MillisecondsPropertyValueArgument):
850
propname = "ApprovalDuration"
663
commands.append(PrintTableCmd(verbose=options.verbose))
665
return commands, options.client
669
commands, clientnames = commands_and_clients_from_options()
672
bus = dbus.SystemBus()
673
mandos_dbus_objc = bus.get_object(busname, server_path)
674
except dbus.exceptions.DBusException:
675
log.critical("Could not connect to Mandos server")
678
mandos_serv = dbus.Interface(mandos_dbus_objc,
679
dbus_interface=server_interface)
680
mandos_serv_object_manager = dbus.Interface(
681
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
683
# Filter out log message from dbus module
684
dbus_logger = logging.getLogger("dbus.proxies")
685
class NullFilter(logging.Filter):
686
def filter(self, record):
688
dbus_filter = NullFilter()
689
dbus_logger.addFilter(dbus_filter)
692
mandos_clients = {path: ifs_and_props[client_interface]
693
for path, ifs_and_props in
694
mandos_serv_object_manager
695
.GetManagedObjects().items()
696
if client_interface in ifs_and_props}
698
# restore dbus logger
699
dbus_logger.removeFilter(dbus_filter)
700
except dbus.exceptions.DBusException as e:
701
log.critical("Failed to access Mandos server through D-Bus:"
705
# Compile dict of (clients: properties) to process
709
clients = {bus.get_object(busname, path): properties
710
for path, properties in mandos_clients.items()}
712
for name in clientnames:
713
for path, client in mandos_clients.items():
714
if client["Name"] == name:
715
client_objc = bus.get_object(busname, path)
716
clients[client_objc] = client
719
log.critical("Client not found on server: %r", name)
722
# Run all commands on clients
723
for command in commands:
724
command.run(mandos_serv, clients)
854
class TestCaseWithAssertLogs(unittest.TestCase):
855
"""unittest.TestCase.assertLogs only exists in Python 3.4"""
857
if not hasattr(unittest.TestCase, "assertLogs"):
858
@contextlib.contextmanager
859
def assertLogs(self, logger, level=logging.INFO):
860
capturing_handler = self.CapturingLevelHandler(level)
861
old_level = logger.level
862
old_propagate = logger.propagate
863
logger.addHandler(capturing_handler)
864
logger.setLevel(level)
865
logger.propagate = False
867
yield capturing_handler.watcher
869
logger.propagate = old_propagate
870
logger.removeHandler(capturing_handler)
871
logger.setLevel(old_level)
872
self.assertGreater(len(capturing_handler.watcher.records),
875
class CapturingLevelHandler(logging.Handler):
876
def __init__(self, level, *args, **kwargs):
877
logging.Handler.__init__(self, *args, **kwargs)
878
self.watcher = self.LoggingWatcher([], [])
879
def emit(self, record):
880
self.watcher.records.append(record)
881
self.watcher.output.append(self.format(record))
883
LoggingWatcher = collections.namedtuple("LoggingWatcher",
888
class Test_string_to_delta(TestCaseWithAssertLogs):
889
# Just test basic RFC 3339 functionality here, the doc string for
890
# rfc3339_duration_to_delta() already has more comprehensive
891
# tests, which is run by doctest.
893
def test_rfc3339_zero_seconds(self):
894
self.assertEqual(string_to_delta("PT0S"),
895
datetime.timedelta())
897
def test_rfc3339_zero_days(self):
898
self.assertEqual(string_to_delta("P0D"),
899
datetime.timedelta())
901
def test_rfc3339_one_second(self):
902
self.assertEqual(string_to_delta("PT1S"),
903
datetime.timedelta(0, 1))
905
def test_rfc3339_two_hours(self):
727
class Test_milliseconds_to_string(unittest.TestCase):
729
self.assertEqual(milliseconds_to_string(93785000),
731
def test_no_days(self):
732
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
733
def test_all_zero(self):
734
self.assertEqual(milliseconds_to_string(0), "00:00:00")
735
def test_no_fractional_seconds(self):
736
self.assertEqual(milliseconds_to_string(400), "00:00:00")
737
self.assertEqual(milliseconds_to_string(900), "00:00:00")
738
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
740
class Test_string_to_delta(unittest.TestCase):
741
def test_handles_basic_rfc3339(self):
906
742
self.assertEqual(string_to_delta("PT2H"),
907
743
datetime.timedelta(0, 7200))
909
744
def test_falls_back_to_pre_1_6_1_with_warning(self):
910
with self.assertLogs(log, logging.WARNING):
911
value = string_to_delta("2h")
745
# assertLogs only exists in Python 3.4
746
if hasattr(self, "assertLogs"):
747
with self.assertLogs(log, logging.WARNING):
748
value = string_to_delta("2h")
750
class WarningFilter(logging.Filter):
751
"""Don't show, but record the presence of, warnings"""
752
def filter(self, record):
753
is_warning = record.levelno >= logging.WARNING
754
self.found = is_warning or getattr(self, "found",
756
return not is_warning
757
warning_filter = WarningFilter()
758
log.addFilter(warning_filter)
760
value = string_to_delta("2h")
762
log.removeFilter(warning_filter)
763
self.assertTrue(getattr(warning_filter, "found", False))
912
764
self.assertEqual(value, datetime.timedelta(0, 7200))
915
class Test_check_option_syntax(unittest.TestCase):
917
self.parser = argparse.ArgumentParser()
918
add_command_line_options(self.parser)
920
def test_actions_requires_client_or_all(self):
921
for action, value in self.actions.items():
922
options = self.parser.parse_args()
923
setattr(options, action, value)
924
with self.assertParseError():
925
self.check_option_syntax(options)
927
# This mostly corresponds to the definition from has_actions() in
928
# check_option_syntax()
930
# The actual values set here are not that important, but we do
931
# at least stick to the correct types, even though they are
935
"bump_timeout": True,
936
"start_checker": True,
937
"stop_checker": True,
941
"timeout": datetime.timedelta(),
942
"extended_timeout": datetime.timedelta(),
943
"interval": datetime.timedelta(),
944
"approved_by_default": True,
945
"approval_delay": datetime.timedelta(),
946
"approval_duration": datetime.timedelta(),
948
"secret": io.BytesIO(b"x"),
953
@contextlib.contextmanager
954
def assertParseError(self):
955
with self.assertRaises(SystemExit) as e:
956
with self.redirect_stderr_to_devnull():
958
# Exit code from argparse is guaranteed to be "2". Reference:
959
# https://docs.python.org/3/library
960
# /argparse.html#exiting-methods
961
self.assertEqual(e.exception.code, 2)
964
@contextlib.contextmanager
965
def redirect_stderr_to_devnull():
966
null = os.open(os.path.devnull, os.O_RDWR)
967
stderrcopy = os.dup(sys.stderr.fileno())
968
os.dup2(null, sys.stderr.fileno())
974
os.dup2(stderrcopy, sys.stderr.fileno())
977
def check_option_syntax(self, options):
978
check_option_syntax(self.parser, options)
980
def test_actions_all_conflicts_with_verbose(self):
981
for action, value in self.actions.items():
982
options = self.parser.parse_args()
983
setattr(options, action, value)
985
options.verbose = True
986
with self.assertParseError():
987
self.check_option_syntax(options)
989
def test_actions_with_client_conflicts_with_verbose(self):
990
for action, value in self.actions.items():
991
options = self.parser.parse_args()
992
setattr(options, action, value)
993
options.verbose = True
994
options.client = ["foo"]
995
with self.assertParseError():
996
self.check_option_syntax(options)
998
def test_dump_json_conflicts_with_verbose(self):
999
options = self.parser.parse_args()
1000
options.dump_json = True
1001
options.verbose = True
1002
with self.assertParseError():
1003
self.check_option_syntax(options)
1005
def test_dump_json_conflicts_with_action(self):
1006
for action, value in self.actions.items():
1007
options = self.parser.parse_args()
1008
setattr(options, action, value)
1009
options.dump_json = True
1010
with self.assertParseError():
1011
self.check_option_syntax(options)
1013
def test_all_can_not_be_alone(self):
1014
options = self.parser.parse_args()
1016
with self.assertParseError():
1017
self.check_option_syntax(options)
1019
def test_all_is_ok_with_any_action(self):
1020
for action, value in self.actions.items():
1021
options = self.parser.parse_args()
1022
setattr(options, action, value)
1024
self.check_option_syntax(options)
1026
def test_any_action_is_ok_with_one_client(self):
1027
for action, value in self.actions.items():
1028
options = self.parser.parse_args()
1029
setattr(options, action, value)
1030
options.client = ["foo"]
1031
self.check_option_syntax(options)
1033
def test_one_client_with_all_actions_except_is_enabled(self):
1034
options = self.parser.parse_args()
1035
for action, value in self.actions.items():
1036
if action == "is_enabled":
1038
setattr(options, action, value)
1039
options.client = ["foo"]
1040
self.check_option_syntax(options)
1042
def test_two_clients_with_all_actions_except_is_enabled(self):
1043
options = self.parser.parse_args()
1044
for action, value in self.actions.items():
1045
if action == "is_enabled":
1047
setattr(options, action, value)
1048
options.client = ["foo", "barbar"]
1049
self.check_option_syntax(options)
1051
def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1052
for action, value in self.actions.items():
1053
if action == "is_enabled":
1055
options = self.parser.parse_args()
1056
setattr(options, action, value)
1057
options.client = ["foo", "barbar"]
1058
self.check_option_syntax(options)
1060
def test_is_enabled_fails_without_client(self):
1061
options = self.parser.parse_args()
1062
options.is_enabled = True
1063
with self.assertParseError():
1064
self.check_option_syntax(options)
1066
def test_is_enabled_fails_with_two_clients(self):
1067
options = self.parser.parse_args()
1068
options.is_enabled = True
1069
options.client = ["foo", "barbar"]
1070
with self.assertParseError():
1071
self.check_option_syntax(options)
1073
def test_remove_can_only_be_combined_with_action_deny(self):
1074
for action, value in self.actions.items():
1075
if action in {"remove", "deny"}:
1077
options = self.parser.parse_args()
1078
setattr(options, action, value)
1080
options.remove = True
1081
with self.assertParseError():
1082
self.check_option_syntax(options)
1085
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
1086
def test_calls_and_returns_get_object_on_bus(self):
1087
class MockBus(object):
1089
def get_object(mockbus_self, busname, dbus_path):
1090
# Note that "self" is still the testcase instance,
1091
# this MockBus instance is in "mockbus_self".
1092
self.assertEqual(busname, dbus_busname)
1093
self.assertEqual(dbus_path, server_dbus_path)
1094
mockbus_self.called = True
1097
mockbus = get_mandos_dbus_object(bus=MockBus())
1098
self.assertIsInstance(mockbus, MockBus)
1099
self.assertTrue(mockbus.called)
1101
def test_logs_and_exits_on_dbus_error(self):
1102
class MockBusFailing(object):
1103
def get_object(self, busname, dbus_path):
1104
raise dbus.exceptions.DBusException("Test")
1106
with self.assertLogs(log, logging.CRITICAL):
1107
with self.assertRaises(SystemExit) as e:
1108
bus = get_mandos_dbus_object(bus=MockBusFailing())
1110
if isinstance(e.exception.code, int):
1111
self.assertNotEqual(e.exception.code, 0)
1113
self.assertIsNotNone(e.exception.code)
1116
class Test_get_managed_objects(TestCaseWithAssertLogs):
1117
def test_calls_and_returns_GetManagedObjects(self):
1118
managed_objects = {"/clients/foo": { "Name": "foo"}}
1119
class MockObjectManager(object):
1120
def GetManagedObjects(self):
1121
return managed_objects
1122
retval = get_managed_objects(MockObjectManager())
1123
self.assertDictEqual(managed_objects, retval)
1125
def test_logs_and_exits_on_dbus_error(self):
1126
dbus_logger = logging.getLogger("dbus.proxies")
1128
class MockObjectManagerFailing(object):
1129
def GetManagedObjects(self):
1130
dbus_logger.error("Test")
1131
raise dbus.exceptions.DBusException("Test")
1133
class CountingHandler(logging.Handler):
1135
def emit(self, record):
1138
counting_handler = CountingHandler()
1140
dbus_logger.addHandler(counting_handler)
1143
with self.assertLogs(log, logging.CRITICAL) as watcher:
1144
with self.assertRaises(SystemExit) as e:
1145
get_managed_objects(MockObjectManagerFailing())
1147
dbus_logger.removeFilter(counting_handler)
1149
# Make sure the dbus logger was suppressed
1150
self.assertEqual(counting_handler.count, 0)
1152
# Test that the dbus_logger still works
1153
with self.assertLogs(dbus_logger, logging.ERROR):
1154
dbus_logger.error("Test")
1156
if isinstance(e.exception.code, int):
1157
self.assertNotEqual(e.exception.code, 0)
1159
self.assertIsNotNone(e.exception.code)
1162
class Test_commands_from_options(unittest.TestCase):
1164
self.parser = argparse.ArgumentParser()
1165
add_command_line_options(self.parser)
1167
def test_is_enabled(self):
1168
self.assert_command_from_args(["--is-enabled", "foo"],
1171
def assert_command_from_args(self, args, command_cls,
1173
"""Assert that parsing ARGS should result in an instance of
1174
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1175
options = self.parser.parse_args(args)
1176
check_option_syntax(self.parser, options)
1177
commands = commands_from_options(options)
1178
self.assertEqual(len(commands), 1)
1179
command = commands[0]
1180
self.assertIsInstance(command, command_cls)
1181
for key, value in cmd_attrs.items():
1182
self.assertEqual(getattr(command, key), value)
1184
def test_is_enabled_short(self):
1185
self.assert_command_from_args(["-V", "foo"],
1188
def test_approve(self):
1189
self.assert_command_from_args(["--approve", "foo"],
1192
def test_approve_short(self):
1193
self.assert_command_from_args(["-A", "foo"], command.Approve)
1195
def test_deny(self):
1196
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1198
def test_deny_short(self):
1199
self.assert_command_from_args(["-D", "foo"], command.Deny)
1201
def test_remove(self):
1202
self.assert_command_from_args(["--remove", "foo"],
1205
def test_deny_before_remove(self):
1206
options = self.parser.parse_args(["--deny", "--remove",
1208
check_option_syntax(self.parser, options)
1209
commands = commands_from_options(options)
1210
self.assertEqual(len(commands), 2)
1211
self.assertIsInstance(commands[0], command.Deny)
1212
self.assertIsInstance(commands[1], command.Remove)
1214
def test_deny_before_remove_reversed(self):
1215
options = self.parser.parse_args(["--remove", "--deny",
1217
check_option_syntax(self.parser, options)
1218
commands = commands_from_options(options)
1219
self.assertEqual(len(commands), 2)
1220
self.assertIsInstance(commands[0], command.Deny)
1221
self.assertIsInstance(commands[1], command.Remove)
1223
def test_remove_short(self):
1224
self.assert_command_from_args(["-r", "foo"], command.Remove)
1226
def test_dump_json(self):
1227
self.assert_command_from_args(["--dump-json"],
1230
def test_enable(self):
1231
self.assert_command_from_args(["--enable", "foo"],
1234
def test_enable_short(self):
1235
self.assert_command_from_args(["-e", "foo"], command.Enable)
1237
def test_disable(self):
1238
self.assert_command_from_args(["--disable", "foo"],
1241
def test_disable_short(self):
1242
self.assert_command_from_args(["-d", "foo"], command.Disable)
1244
def test_bump_timeout(self):
1245
self.assert_command_from_args(["--bump-timeout", "foo"],
1246
command.BumpTimeout)
1248
def test_bump_timeout_short(self):
1249
self.assert_command_from_args(["-b", "foo"],
1250
command.BumpTimeout)
1252
def test_start_checker(self):
1253
self.assert_command_from_args(["--start-checker", "foo"],
1254
command.StartChecker)
1256
def test_stop_checker(self):
1257
self.assert_command_from_args(["--stop-checker", "foo"],
1258
command.StopChecker)
1260
def test_approve_by_default(self):
1261
self.assert_command_from_args(["--approve-by-default", "foo"],
1262
command.ApproveByDefault)
1264
def test_deny_by_default(self):
1265
self.assert_command_from_args(["--deny-by-default", "foo"],
1266
command.DenyByDefault)
1268
def test_checker(self):
1269
self.assert_command_from_args(["--checker", ":", "foo"],
1273
def test_checker_empty(self):
1274
self.assert_command_from_args(["--checker", "", "foo"],
1278
def test_checker_short(self):
1279
self.assert_command_from_args(["-c", ":", "foo"],
1283
def test_host(self):
1284
self.assert_command_from_args(["--host", "foo.example.org",
1285
"foo"], command.SetHost,
1286
value_to_set="foo.example.org")
1288
def test_host_short(self):
1289
self.assert_command_from_args(["-H", "foo.example.org",
1290
"foo"], command.SetHost,
1291
value_to_set="foo.example.org")
1293
def test_secret_devnull(self):
1294
self.assert_command_from_args(["--secret", os.path.devnull,
1295
"foo"], command.SetSecret,
1298
def test_secret_tempfile(self):
1299
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1300
value = b"secret\0xyzzy\nbar"
1303
self.assert_command_from_args(["--secret", f.name,
1304
"foo"], command.SetSecret,
1307
def test_secret_devnull_short(self):
1308
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1312
def test_secret_tempfile_short(self):
1313
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1314
value = b"secret\0xyzzy\nbar"
1317
self.assert_command_from_args(["-s", f.name, "foo"],
1321
def test_timeout(self):
1322
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1324
value_to_set=300000)
1326
def test_timeout_short(self):
1327
self.assert_command_from_args(["-t", "PT5M", "foo"],
1329
value_to_set=300000)
1331
def test_extended_timeout(self):
1332
self.assert_command_from_args(["--extended-timeout", "PT15M",
1334
command.SetExtendedTimeout,
1335
value_to_set=900000)
1337
def test_interval(self):
1338
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1339
command.SetInterval,
1340
value_to_set=120000)
1342
def test_interval_short(self):
1343
self.assert_command_from_args(["-i", "PT2M", "foo"],
1344
command.SetInterval,
1345
value_to_set=120000)
1347
def test_approval_delay(self):
1348
self.assert_command_from_args(["--approval-delay", "PT30S",
1350
command.SetApprovalDelay,
1353
def test_approval_duration(self):
1354
self.assert_command_from_args(["--approval-duration", "PT1S",
1356
command.SetApprovalDuration,
1359
def test_print_table(self):
1360
self.assert_command_from_args([], command.PrintTable,
1363
def test_print_table_verbose(self):
1364
self.assert_command_from_args(["--verbose"],
1368
def test_print_table_verbose_short(self):
1369
self.assert_command_from_args(["-v"], command.PrintTable,
1373
class TestCommand(unittest.TestCase):
1374
"""Abstract class for tests of command classes"""
1378
class MockClient(object):
1379
def __init__(self, name, **attributes):
1380
self.__dbus_object_path__ = "/clients/{}".format(name)
1381
self.attributes = attributes
1382
self.attributes["Name"] = name
1384
def Set(self, interface, propname, value, dbus_interface):
1385
testcase.assertEqual(interface, client_dbus_interface)
1386
testcase.assertEqual(dbus_interface,
1387
dbus.PROPERTIES_IFACE)
1388
self.attributes[propname] = value
1389
def Approve(self, approve, dbus_interface):
1390
testcase.assertEqual(dbus_interface,
1391
client_dbus_interface)
1392
self.calls.append(("Approve", (approve,
1394
self.client = MockClient(
1396
KeyID=("92ed150794387c03ce684574b1139a65"
1397
"94a34f895daaaf09fd8ea90a27cddb12"),
1399
Host="foo.example.org",
1400
Enabled=dbus.Boolean(True),
1402
LastCheckedOK="2019-02-03T00:00:00",
1403
Created="2019-01-02T00:00:00",
1405
Fingerprint=("778827225BA7DE539C5A"
1406
"7CFA59CFF7CDBD9A5920"),
1407
CheckerRunning=dbus.Boolean(False),
1408
LastEnabled="2019-01-03T00:00:00",
1409
ApprovalPending=dbus.Boolean(False),
1410
ApprovedByDefault=dbus.Boolean(True),
1411
LastApprovalRequest="",
1413
ApprovalDuration=1000,
1414
Checker="fping -q -- %(host)s",
1415
ExtendedTimeout=900000,
1416
Expires="2019-02-04T00:00:00",
1417
LastCheckerStatus=0)
1418
self.other_client = MockClient(
1420
KeyID=("0558568eedd67d622f5c83b35a115f79"
1421
"6ab612cff5ad227247e46c2b020f441c"),
1422
Secret=b"secretbar",
1424
Enabled=dbus.Boolean(True),
1426
LastCheckedOK="2019-02-04T00:00:00",
1427
Created="2019-01-03T00:00:00",
1429
Fingerprint=("3E393AEAEFB84C7E89E2"
1430
"F547B3A107558FCA3A27"),
1431
CheckerRunning=dbus.Boolean(True),
1432
LastEnabled="2019-01-04T00:00:00",
1433
ApprovalPending=dbus.Boolean(False),
1434
ApprovedByDefault=dbus.Boolean(False),
1435
LastApprovalRequest="2019-01-03T00:00:00",
1436
ApprovalDelay=30000,
1437
ApprovalDuration=93785000,
1439
ExtendedTimeout=900000,
1440
Expires="2019-02-05T00:00:00",
1441
LastCheckerStatus=-2)
1442
self.clients = collections.OrderedDict(
1444
("/clients/foo", self.client.attributes),
1445
("/clients/barbar", self.other_client.attributes),
1447
self.one_client = {"/clients/foo": self.client.attributes}
1453
def get_object(client_bus_name, path):
1454
self.assertEqual(client_bus_name, dbus_busname)
1456
# Note: "self" here is the TestCmd instance, not
1457
# the Bus instance, since this is a static method!
1458
"/clients/foo": self.client,
1459
"/clients/barbar": self.other_client,
1464
class TestBaseCommands(TestCommand):
1466
def test_IsEnabled_exits_successfully(self):
1467
with self.assertRaises(SystemExit) as e:
1468
command.IsEnabled().run(self.one_client)
1469
if e.exception.code is not None:
1470
self.assertEqual(e.exception.code, 0)
1472
self.assertIsNone(e.exception.code)
1474
def test_IsEnabled_exits_with_failure(self):
1475
self.client.attributes["Enabled"] = dbus.Boolean(False)
1476
with self.assertRaises(SystemExit) as e:
1477
command.IsEnabled().run(self.one_client)
1478
if isinstance(e.exception.code, int):
1479
self.assertNotEqual(e.exception.code, 0)
1481
self.assertIsNotNone(e.exception.code)
1483
def test_Approve(self):
1484
command.Approve().run(self.clients, self.bus)
1485
for clientpath in self.clients:
1486
client = self.bus.get_object(dbus_busname, clientpath)
1487
self.assertIn(("Approve", (True, client_dbus_interface)),
1490
def test_Deny(self):
1491
command.Deny().run(self.clients, self.bus)
1492
for clientpath in self.clients:
1493
client = self.bus.get_object(dbus_busname, clientpath)
1494
self.assertIn(("Approve", (False, client_dbus_interface)),
1497
def test_Remove(self):
1498
class MockMandos(object):
1501
def RemoveClient(self, dbus_path):
1502
self.calls.append(("RemoveClient", (dbus_path,)))
1503
mandos = MockMandos()
1504
command.Remove().run(self.clients, self.bus, mandos)
1505
for clientpath in self.clients:
1506
self.assertIn(("RemoveClient", (clientpath,)),
1512
"KeyID": ("92ed150794387c03ce684574b1139a65"
1513
"94a34f895daaaf09fd8ea90a27cddb12"),
1514
"Host": "foo.example.org",
1517
"LastCheckedOK": "2019-02-03T00:00:00",
1518
"Created": "2019-01-02T00:00:00",
1520
"Fingerprint": ("778827225BA7DE539C5A"
1521
"7CFA59CFF7CDBD9A5920"),
1522
"CheckerRunning": False,
1523
"LastEnabled": "2019-01-03T00:00:00",
1524
"ApprovalPending": False,
1525
"ApprovedByDefault": True,
1526
"LastApprovalRequest": "",
1528
"ApprovalDuration": 1000,
1529
"Checker": "fping -q -- %(host)s",
1530
"ExtendedTimeout": 900000,
1531
"Expires": "2019-02-04T00:00:00",
1532
"LastCheckerStatus": 0,
1536
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
1537
"6ab612cff5ad227247e46c2b020f441c"),
1538
"Host": "192.0.2.3",
1541
"LastCheckedOK": "2019-02-04T00:00:00",
1542
"Created": "2019-01-03T00:00:00",
1544
"Fingerprint": ("3E393AEAEFB84C7E89E2"
1545
"F547B3A107558FCA3A27"),
1546
"CheckerRunning": True,
1547
"LastEnabled": "2019-01-04T00:00:00",
1548
"ApprovalPending": False,
1549
"ApprovedByDefault": False,
1550
"LastApprovalRequest": "2019-01-03T00:00:00",
1551
"ApprovalDelay": 30000,
1552
"ApprovalDuration": 93785000,
1554
"ExtendedTimeout": 900000,
1555
"Expires": "2019-02-05T00:00:00",
1556
"LastCheckerStatus": -2,
1560
def test_DumpJSON_normal(self):
1561
output = command.DumpJSON().output(self.clients.values())
1562
json_data = json.loads(output)
1563
self.assertDictEqual(json_data, self.expected_json)
1565
def test_DumpJSON_one_client(self):
1566
output = command.DumpJSON().output(self.one_client.values())
1567
json_data = json.loads(output)
1568
expected_json = {"foo": self.expected_json["foo"]}
1569
self.assertDictEqual(json_data, expected_json)
1571
def test_PrintTable_normal(self):
1572
output = command.PrintTable().output(self.clients.values())
1573
expected_output = "\n".join((
1574
"Name Enabled Timeout Last Successful Check",
1575
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1576
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1578
self.assertEqual(output, expected_output)
1580
def test_PrintTable_verbose(self):
1581
output = command.PrintTable(verbose=True).output(
1582
self.clients.values())
1597
"Last Successful Check ",
1598
"2019-02-03T00:00:00 ",
1599
"2019-02-04T00:00:00 ",
1602
"2019-01-02T00:00:00 ",
1603
"2019-01-03T00:00:00 ",
1615
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1617
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1621
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1622
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1624
"Check Is Running ",
1629
"2019-01-03T00:00:00 ",
1630
"2019-01-04T00:00:00 ",
1632
"Approval Is Pending ",
1636
"Approved By Default ",
1640
"Last Approval Request ",
1642
"2019-01-03T00:00:00 ",
1648
"Approval Duration ",
1653
"fping -q -- %(host)s ",
1656
"Extended Timeout ",
1661
"2019-02-04T00:00:00 ",
1662
"2019-02-05T00:00:00 ",
1664
"Last Checker Status",
1669
num_lines = max(len(rows) for rows in columns)
1670
expected_output = "\n".join("".join(rows[line]
1671
for rows in columns)
1672
for line in range(num_lines))
1673
self.assertEqual(output, expected_output)
1675
def test_PrintTable_one_client(self):
1676
output = command.PrintTable().output(self.one_client.values())
1677
expected_output = "\n".join((
1678
"Name Enabled Timeout Last Successful Check",
1679
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1681
self.assertEqual(output, expected_output)
1684
class TestPropertyCmd(TestCommand):
1685
"""Abstract class for tests of command.Property classes"""
1687
if not hasattr(self, "command"):
1689
values_to_get = getattr(self, "values_to_get",
1691
for value_to_set, value_to_get in zip(self.values_to_set,
1693
for clientpath in self.clients:
1694
client = self.bus.get_object(dbus_busname, clientpath)
1695
old_value = client.attributes[self.propname]
1696
client.attributes[self.propname] = self.Unique()
1697
self.run_command(value_to_set, self.clients)
1698
for clientpath in self.clients:
1699
client = self.bus.get_object(dbus_busname, clientpath)
1700
value = client.attributes[self.propname]
1701
self.assertNotIsInstance(value, self.Unique)
1702
self.assertEqual(value, value_to_get)
1704
class Unique(object):
1705
"""Class for objects which exist only to be unique objects,
1706
since unittest.mock.sentinel only exists in Python 3.3"""
1708
def run_command(self, value, clients):
1709
self.command().run(clients, self.bus)
1712
class TestEnableCmd(TestPropertyCmd):
1713
command = command.Enable
1714
propname = "Enabled"
1715
values_to_set = [dbus.Boolean(True)]
1718
class TestDisableCmd(TestPropertyCmd):
1719
command = command.Disable
1720
propname = "Enabled"
1721
values_to_set = [dbus.Boolean(False)]
1724
class TestBumpTimeoutCmd(TestPropertyCmd):
1725
command = command.BumpTimeout
1726
propname = "LastCheckedOK"
1727
values_to_set = [""]
1730
class TestStartCheckerCmd(TestPropertyCmd):
1731
command = command.StartChecker
1732
propname = "CheckerRunning"
1733
values_to_set = [dbus.Boolean(True)]
1736
class TestStopCheckerCmd(TestPropertyCmd):
1737
command = command.StopChecker
1738
propname = "CheckerRunning"
1739
values_to_set = [dbus.Boolean(False)]
1742
class TestApproveByDefaultCmd(TestPropertyCmd):
1743
command = command.ApproveByDefault
1744
propname = "ApprovedByDefault"
1745
values_to_set = [dbus.Boolean(True)]
1748
class TestDenyByDefaultCmd(TestPropertyCmd):
1749
command = command.DenyByDefault
1750
propname = "ApprovedByDefault"
1751
values_to_set = [dbus.Boolean(False)]
1754
class TestPropertyValueCmd(TestPropertyCmd):
1755
"""Abstract class for tests of PropertyValueCmd classes"""
1758
if type(self) is TestPropertyValueCmd:
1760
return super(TestPropertyValueCmd, self).runTest()
1762
def run_command(self, value, clients):
1763
self.command(value).run(clients, self.bus)
1766
class TestSetCheckerCmd(TestPropertyValueCmd):
1767
command = command.SetChecker
1768
propname = "Checker"
1769
values_to_set = ["", ":", "fping -q -- %s"]
1772
class TestSetHostCmd(TestPropertyValueCmd):
1773
command = command.SetHost
1775
values_to_set = ["192.0.2.3", "foo.example.org"]
1778
class TestSetSecretCmd(TestPropertyValueCmd):
1779
command = command.SetSecret
1781
values_to_set = [io.BytesIO(b""),
1782
io.BytesIO(b"secret\0xyzzy\nbar")]
1783
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1786
class TestSetTimeoutCmd(TestPropertyValueCmd):
1787
command = command.SetTimeout
1788
propname = "Timeout"
1789
values_to_set = [datetime.timedelta(),
1790
datetime.timedelta(minutes=5),
1791
datetime.timedelta(seconds=1),
1792
datetime.timedelta(weeks=1),
1793
datetime.timedelta(weeks=52)]
1794
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1797
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1798
command = command.SetExtendedTimeout
1799
propname = "ExtendedTimeout"
1800
values_to_set = [datetime.timedelta(),
1801
datetime.timedelta(minutes=5),
1802
datetime.timedelta(seconds=1),
1803
datetime.timedelta(weeks=1),
1804
datetime.timedelta(weeks=52)]
1805
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1808
class TestSetIntervalCmd(TestPropertyValueCmd):
1809
command = command.SetInterval
1810
propname = "Interval"
1811
values_to_set = [datetime.timedelta(),
1812
datetime.timedelta(minutes=5),
1813
datetime.timedelta(seconds=1),
1814
datetime.timedelta(weeks=1),
1815
datetime.timedelta(weeks=52)]
1816
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1819
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1820
command = command.SetApprovalDelay
1821
propname = "ApprovalDelay"
1822
values_to_set = [datetime.timedelta(),
1823
datetime.timedelta(minutes=5),
1824
datetime.timedelta(seconds=1),
1825
datetime.timedelta(weeks=1),
1826
datetime.timedelta(weeks=52)]
1827
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1830
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1831
command = command.SetApprovalDuration
1832
propname = "ApprovalDuration"
1833
values_to_set = [datetime.timedelta(),
1834
datetime.timedelta(minutes=5),
1835
datetime.timedelta(seconds=1),
1836
datetime.timedelta(weeks=1),
1837
datetime.timedelta(weeks=52)]
1838
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
766
class Test_TableOfClients(unittest.TestCase):
768
self.tableheaders = {
772
"Bool": "A D-BUS Boolean",
773
"NonDbusBoolean": "A Non-D-BUS Boolean",
774
"Integer": "An Integer",
775
"Timeout": "Timedelta 1",
776
"Interval": "Timedelta 2",
777
"ApprovalDelay": "Timedelta 3",
778
"ApprovalDuration": "Timedelta 4",
779
"ExtendedTimeout": "Timedelta 5",
780
"String": "A String",
782
self.keywords = ["Attr1", "AttrTwo"]
788
"Bool": dbus.Boolean(False),
789
"NonDbusBoolean": False,
793
"ApprovalDelay": 2000,
794
"ApprovalDuration": 3000,
795
"ExtendedTimeout": 4000,
802
"Bool": dbus.Boolean(True),
803
"NonDbusBoolean": True,
806
"Interval": 93786000,
807
"ApprovalDelay": 93787000,
808
"ApprovalDuration": 93788000,
809
"ExtendedTimeout": 93789000,
810
"String": "A huge string which will not fit," * 10,
813
def test_short_header(self):
814
text = str(TableOfClients(self.clients, self.keywords,
821
self.assertEqual(text, expected_text)
822
def test_booleans(self):
823
keywords = ["Bool", "NonDbusBoolean"]
824
text = str(TableOfClients(self.clients, keywords,
827
A D-BUS Boolean A Non-D-BUS Boolean
831
self.assertEqual(text, expected_text)
832
def test_milliseconds_detection(self):
833
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
834
"ApprovalDuration", "ExtendedTimeout"]
835
text = str(TableOfClients(self.clients, keywords,
838
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
839
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
840
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
842
self.assertEqual(text, expected_text)
843
def test_empty_and_long_string_values(self):
844
keywords = ["String"]
845
text = str(TableOfClients(self.clients, keywords,
850
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
852
self.assertEqual(text, expected_text)