271
class TableOfClients(object):
274
"Enabled": "Enabled",
275
"Timeout": "Timeout",
276
"LastCheckedOK": "Last Successful Check",
277
"LastApprovalRequest": "Last Approval Request",
278
"Created": "Created",
279
"Interval": "Interval",
281
"Fingerprint": "Fingerprint",
283
"CheckerRunning": "Check Is Running",
284
"LastEnabled": "Last Enabled",
285
"ApprovalPending": "Approval Is Pending",
286
"ApprovedByDefault": "Approved By Default",
287
"ApprovalDelay": "Approval Delay",
288
"ApprovalDuration": "Approval Duration",
289
"Checker": "Checker",
290
"ExtendedTimeout": "Extended Timeout",
291
"Expires": "Expires",
292
"LastCheckerStatus": "Last Checker Status",
295
def __init__(self, clients, keywords, tablewords=None):
296
self.clients = clients
297
self.keywords = keywords
298
if tablewords is not None:
299
self.tablewords = tablewords
302
return "\n".join(self.rows())
304
if sys.version_info.major == 2:
305
__unicode__ = __str__
307
return str(self).encode(locale.getpreferredencoding())
310
format_string = self.row_formatting_string()
311
rows = [self.header_line(format_string)]
312
rows.extend(self.client_line(client, format_string)
313
for client in self.clients)
316
def row_formatting_string(self):
317
"Format string used to format table rows"
318
return " ".join("{{{key}:{width}}}".format(
319
width=max(len(self.tablewords[key]),
320
*(len(self.string_from_client(client, key))
321
for client in self.clients)),
323
for key in self.keywords)
325
def string_from_client(self, client, key):
326
return self.valuetostring(client[key], key)
329
def valuetostring(value, keyword):
330
if isinstance(value, dbus.Boolean):
331
return "Yes" if value else "No"
332
if keyword in ("Timeout", "Interval", "ApprovalDelay",
333
"ApprovalDuration", "ExtendedTimeout"):
334
return milliseconds_to_string(value)
337
def header_line(self, format_string):
338
return format_string.format(**self.tablewords)
340
def client_line(self, client, format_string):
341
return format_string.format(
342
**{key: self.string_from_client(client, key)
343
for key in self.keywords})
346
## Classes for commands.
348
# Abstract classes first
349
class Command(object):
350
"""Abstract class for commands"""
351
def run(self, clients):
352
"""Normal commands should implement run_on_one_client(), but
353
commands which want to operate on all clients at the same time
354
can override this run() method instead."""
355
for client in clients:
356
self.run_on_one_client(client)
358
class PrintCmd(Command):
359
"""Abstract class for commands printing client details"""
360
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
361
"Created", "Interval", "Host", "KeyID",
362
"Fingerprint", "CheckerRunning", "LastEnabled",
363
"ApprovalPending", "ApprovedByDefault",
364
"LastApprovalRequest", "ApprovalDelay",
365
"ApprovalDuration", "Checker", "ExtendedTimeout",
366
"Expires", "LastCheckerStatus")
367
def run(self, clients):
368
print(self.output(clients))
370
class PropertyCmd(Command):
371
"""Abstract class for Actions for setting one client property"""
372
def run_on_one_client(self, client):
373
"""Set the Client's D-Bus property"""
374
client.Set(client_interface, self.property, self.value_to_set,
375
dbus_interface=dbus.PROPERTIES_IFACE)
377
class ValueArgumentMixIn(object):
378
"""Mixin class for commands taking a value as argument"""
379
def __init__(self, value):
380
self.value_to_set = value
382
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
383
"""Mixin class for commands taking a value argument as
386
def value_to_set(self):
389
def value_to_set(self, value):
390
"""When setting, convert value to a datetime.timedelta"""
391
self._vts = string_to_delta(value).total_seconds() * 1000
393
# Actual (non-abstract) command classes
395
class PrintTableCmd(PrintCmd):
396
def __init__(self, verbose=False):
397
self.verbose = verbose
398
def output(self, clients):
400
keywords = self.all_keywords
402
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
403
return str(TableOfClients(clients.values(), keywords))
405
class DumpJSONCmd(PrintCmd):
406
def output(self, clients):
407
data = {client["Name"]:
408
{key: self.dbus_boolean_to_bool(client[key])
409
for key in self.all_keywords}
410
for client in clients.values()}
411
return json.dumps(data, indent=4, separators=(',', ': '))
413
def dbus_boolean_to_bool(value):
414
if isinstance(value, dbus.Boolean):
418
class IsEnabledCmd(Command):
419
def run_on_one_client(self, client):
420
if self.is_enabled(client):
423
def is_enabled(self, client):
424
return client.Get(client_interface, "Enabled",
425
dbus_interface=dbus.PROPERTIES_IFACE)
427
class RemoveCmd(Command):
428
def __init__(self, mandos):
430
def run_on_one_client(self, client):
431
self.mandos.RemoveClient(client.__dbus_object_path__)
433
class ApproveCmd(Command):
434
def run_on_one_client(self, client):
435
client.Approve(dbus.Boolean(True),
436
dbus_interface=client_interface)
438
class DenyCmd(Command):
439
def run_on_one_client(self, client):
440
client.Approve(dbus.Boolean(False),
441
dbus_interface=client_interface)
443
class EnableCmd(PropertyCmd):
445
value_to_set = dbus.Boolean(True)
447
class DisableCmd(PropertyCmd):
449
value_to_set = dbus.Boolean(False)
451
class BumpTimeoutCmd(PropertyCmd):
452
property = "LastCheckedOK"
455
class StartCheckerCmd(PropertyCmd):
456
property = "CheckerRunning"
457
value_to_set = dbus.Boolean(True)
459
class StopCheckerCmd(PropertyCmd):
460
property = "CheckerRunning"
461
value_to_set = dbus.Boolean(False)
463
class ApproveByDefaultCmd(PropertyCmd):
464
property = "ApprovedByDefault"
465
value_to_set = dbus.Boolean(True)
467
class DenyByDefaultCmd(PropertyCmd):
468
property = "ApprovedByDefault"
469
value_to_set = dbus.Boolean(False)
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
498
def has_actions(options):
499
return any((options.enable,
501
options.bump_timeout,
502
options.start_checker,
503
options.stop_checker,
506
options.checker is not None,
507
options.timeout is not None,
508
options.extended_timeout is not None,
509
options.interval is not None,
510
options.approved_by_default is not None,
511
options.approval_delay is not None,
512
options.approval_duration is not None,
513
options.host is not None,
514
options.secret is not None,
520
parser = argparse.ArgumentParser()
521
parser.add_argument("--version", action="version",
522
version="%(prog)s {}".format(version),
523
help="show version number and exit")
524
parser.add_argument("-a", "--all", action="store_true",
525
help="Select all clients")
526
parser.add_argument("-v", "--verbose", action="store_true",
527
help="Print all fields")
528
parser.add_argument("-j", "--dump-json", action="store_true",
529
help="Dump client data in JSON format")
530
enable_disable = parser.add_mutually_exclusive_group()
531
enable_disable.add_argument("-e", "--enable", action="store_true",
532
help="Enable client")
533
enable_disable.add_argument("-d", "--disable",
535
help="disable client")
536
parser.add_argument("-b", "--bump-timeout", action="store_true",
537
help="Bump timeout for client")
538
start_stop_checker = parser.add_mutually_exclusive_group()
539
start_stop_checker.add_argument("--start-checker",
541
help="Start checker for client")
542
start_stop_checker.add_argument("--stop-checker",
544
help="Stop checker for client")
545
parser.add_argument("-V", "--is-enabled", action="store_true",
546
help="Check if client is enabled")
547
parser.add_argument("-r", "--remove", action="store_true",
548
help="Remove client")
549
parser.add_argument("-c", "--checker",
550
help="Set checker command for client")
551
parser.add_argument("-t", "--timeout",
552
help="Set timeout for client")
553
parser.add_argument("--extended-timeout",
554
help="Set extended timeout for client")
555
parser.add_argument("-i", "--interval",
556
help="Set checker interval for client")
557
approve_deny_default = parser.add_mutually_exclusive_group()
558
approve_deny_default.add_argument(
559
"--approve-by-default", action="store_true",
560
default=None, dest="approved_by_default",
561
help="Set client to be approved by default")
562
approve_deny_default.add_argument(
563
"--deny-by-default", action="store_false",
564
dest="approved_by_default",
565
help="Set client to be denied by default")
566
parser.add_argument("--approval-delay",
567
help="Set delay before client approve/deny")
568
parser.add_argument("--approval-duration",
569
help="Set duration of one client approval")
570
parser.add_argument("-H", "--host", help="Set host for client")
571
parser.add_argument("-s", "--secret",
572
type=argparse.FileType(mode="rb"),
573
help="Set password blob (file) for client")
574
approve_deny = parser.add_mutually_exclusive_group()
575
approve_deny.add_argument(
576
"-A", "--approve", action="store_true",
577
help="Approve any current client request")
578
approve_deny.add_argument("-D", "--deny", action="store_true",
579
help="Deny any current client request")
580
parser.add_argument("--check", action="store_true",
581
help="Run self-test")
582
parser.add_argument("client", nargs="*", help="Client name")
583
options = parser.parse_args()
385
def check_option_syntax(parser, options):
386
"""Apply additional restrictions on options, not expressible in
389
def has_actions(options):
390
return any((options.enable,
392
options.bump_timeout,
393
options.start_checker,
394
options.stop_checker,
397
options.checker is not None,
398
options.timeout is not None,
399
options.extended_timeout is not None,
400
options.interval is not None,
401
options.approved_by_default is not None,
402
options.approval_delay is not None,
403
options.approval_duration is not None,
404
options.host is not None,
405
options.secret is not None,
585
409
if has_actions(options) and not (options.client or options.all):
586
410
parser.error("Options require clients names or --all.")
592
416
if options.all and not has_actions(options):
593
417
parser.error("--all requires an action.")
594
418
if options.is_enabled and len(options.client) > 1:
595
parser.error("--is-enabled requires exactly one client")
419
parser.error("--is-enabled requires exactly one client")
421
options.remove = False
422
if has_actions(options) and not options.deny:
423
parser.error("--remove can only be combined with --deny")
424
options.remove = True
427
def get_mandos_dbus_object(bus):
428
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
dbus_busname, server_dbus_path)
430
with if_dbus_exception_log_with_exception_and_exit(
431
"Could not connect to Mandos server: %s"):
432
mandos_dbus_object = bus.get_object(dbus_busname,
434
return mandos_dbus_object
437
@contextlib.contextmanager
438
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
598
bus = dbus.SystemBus()
599
mandos_dbus_objc = bus.get_object(busname, server_path)
600
except dbus.exceptions.DBusException:
601
log.critical("Could not connect to Mandos server")
441
except dbus.exceptions.DBusException as e:
442
log.critical(*(args + (e,)), **kwargs)
604
mandos_serv = dbus.Interface(mandos_dbus_objc,
605
dbus_interface=server_interface)
606
mandos_serv_object_manager = dbus.Interface(
607
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
446
def get_managed_objects(object_manager):
447
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
with if_dbus_exception_log_with_exception_and_exit(
450
"Failed to access Mandos server through D-Bus:\n%s"):
451
with SilenceLogger("dbus.proxies"):
452
managed_objects = object_manager.GetManagedObjects()
453
return managed_objects
456
class SilenceLogger(object):
457
"Simple context manager to silence a particular logger"
458
def __init__(self, loggername):
459
self.logger = logging.getLogger(loggername)
462
self.logger.addFilter(self.nullfilter)
465
class NullFilter(logging.Filter):
466
def filter(self, record):
469
nullfilter = NullFilter()
471
def __exit__(self, exc_type, exc_val, exc_tb):
472
self.logger.removeFilter(self.nullfilter)
475
def commands_from_options(options):
479
if options.is_enabled:
480
commands.append(command.IsEnabled())
483
commands.append(command.Approve())
486
commands.append(command.Deny())
489
commands.append(command.Remove())
611
491
if options.dump_json:
612
commands.append(DumpJSONCmd())
492
commands.append(command.DumpJSON())
614
494
if options.enable:
615
commands.append(EnableCmd())
495
commands.append(command.Enable())
617
497
if options.disable:
618
commands.append(DisableCmd())
498
commands.append(command.Disable())
620
500
if options.bump_timeout:
621
commands.append(BumpTimeoutCmd(options.bump_timeout))
501
commands.append(command.BumpTimeout())
623
503
if options.start_checker:
624
commands.append(StartCheckerCmd())
504
commands.append(command.StartChecker())
626
506
if options.stop_checker:
627
commands.append(StopCheckerCmd())
629
if options.is_enabled:
630
commands.append(IsEnabledCmd())
633
commands.append(RemoveCmd(mandos_serv))
507
commands.append(command.StopChecker())
509
if options.approved_by_default is not None:
510
if options.approved_by_default:
511
commands.append(command.ApproveByDefault())
513
commands.append(command.DenyByDefault())
635
515
if options.checker is not None:
636
commands.append(SetCheckerCmd())
516
commands.append(command.SetChecker(options.checker))
518
if options.host is not None:
519
commands.append(command.SetHost(options.host))
521
if options.secret is not None:
522
commands.append(command.SetSecret(options.secret))
638
524
if options.timeout is not None:
639
commands.append(SetTimeoutCmd(options.timeout))
525
commands.append(command.SetTimeout(options.timeout))
641
527
if options.extended_timeout:
643
SetExtendedTimeoutCmd(options.extended_timeout))
529
command.SetExtendedTimeout(options.extended_timeout))
645
531
if options.interval is not None:
646
command.append(SetIntervalCmd(options.interval))
648
if options.approved_by_default is not None:
649
if options.approved_by_default:
650
command.append(ApproveByDefaultCmd())
652
command.append(DenyByDefaultCmd())
532
commands.append(command.SetInterval(options.interval))
654
534
if options.approval_delay is not None:
655
command.append(SetApprovalDelayCmd(options.approval_delay))
536
command.SetApprovalDelay(options.approval_delay))
657
538
if options.approval_duration is not None:
659
SetApprovalDurationCmd(options.approval_duration))
661
if options.host is not None:
662
command.append(SetHostCmd(options.host))
664
if options.secret is not None:
665
command.append(SetSecretCmd(options.secret))
668
commands.append(ApproveCmd())
671
commands.append(DenyCmd())
540
command.SetApprovalDuration(options.approval_duration))
673
542
# If no command option has been given, show table of clients,
674
543
# optionally verbosely
676
commands.append(PrintTableCmd(verbose=options.verbose))
678
# block stderr since dbus library prints to stderr
679
null = os.open(os.path.devnull, os.O_RDWR)
680
stderrcopy = os.dup(sys.stderr.fileno())
681
os.dup2(null, sys.stderr.fileno())
685
mandos_clients = {path: ifs_and_props[client_interface]
686
for path, ifs_and_props in
687
mandos_serv_object_manager
688
.GetManagedObjects().items()
689
if client_interface in ifs_and_props}
692
os.dup2(stderrcopy, sys.stderr.fileno())
694
except dbus.exceptions.DBusException as e:
695
log.critical("Failed to access Mandos server through D-Bus:"
699
# Compile dict of (clients: properties) to process
702
if options.all or not options.client:
703
clients = {bus.get_object(busname, path): properties
704
for path, properties in mandos_clients.items()}
706
for name in options.client:
707
for path, client in mandos_clients.items():
708
if client["Name"] == name:
709
client_objc = bus.get_object(busname, path)
710
clients[client_objc] = client
713
log.critical("Client not found on server: %r", name)
716
# Run all commands on clients
717
for command in commands:
545
commands.append(command.PrintTable(verbose=options.verbose))
550
class command(object):
551
"""A namespace for command classes"""
554
"""Abstract base class for commands"""
555
def run(self, clients, bus=None, mandos=None):
556
"""Normal commands should implement run_on_one_client(),
557
but commands which want to operate on all clients at the same time can
558
override this run() method instead.
561
for clientpath, properties in clients.items():
562
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
563
dbus_busname, str(clientpath))
564
client = bus.get_object(dbus_busname, clientpath)
565
self.run_on_one_client(client, properties)
568
class IsEnabled(Base):
569
def run(self, clients, bus=None, mandos=None):
570
client, properties = next(iter(clients.items()))
571
if self.is_enabled(client, properties):
574
def is_enabled(self, client, properties):
575
return properties["Enabled"]
579
def run_on_one_client(self, client, properties):
580
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
581
client.__dbus_object_path__,
582
client_dbus_interface)
583
client.Approve(dbus.Boolean(True),
584
dbus_interface=client_dbus_interface)
588
def run_on_one_client(self, client, properties):
589
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
590
client.__dbus_object_path__,
591
client_dbus_interface)
592
client.Approve(dbus.Boolean(False),
593
dbus_interface=client_dbus_interface)
597
def run_on_one_client(self, client, properties):
598
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)",
599
dbus_busname, server_dbus_path,
600
server_dbus_interface,
601
str(client.__dbus_object_path__))
602
self.mandos.RemoveClient(client.__dbus_object_path__)
606
"""Abstract class for commands outputting client details"""
607
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
608
"Created", "Interval", "Host", "KeyID",
609
"Fingerprint", "CheckerRunning",
610
"LastEnabled", "ApprovalPending",
611
"ApprovedByDefault", "LastApprovalRequest",
612
"ApprovalDelay", "ApprovalDuration",
613
"Checker", "ExtendedTimeout", "Expires",
616
def run(self, clients, bus=None, mandos=None):
617
print(self.output(clients.values()))
619
def output(self, clients):
620
raise NotImplementedError()
623
class DumpJSON(Output):
624
def output(self, clients):
625
data = {client["Name"]:
626
{key: self.dbus_boolean_to_bool(client[key])
627
for key in self.all_keywords}
628
for client in clients}
629
return json.dumps(data, indent=4, separators=(',', ': '))
632
def dbus_boolean_to_bool(value):
633
if isinstance(value, dbus.Boolean):
638
class PrintTable(Output):
639
def __init__(self, verbose=False):
640
self.verbose = verbose
642
def output(self, clients):
643
default_keywords = ("Name", "Enabled", "Timeout",
645
keywords = default_keywords
647
keywords = self.all_keywords
648
return str(self.TableOfClients(clients, keywords))
650
class TableOfClients(object):
653
"Enabled": "Enabled",
654
"Timeout": "Timeout",
655
"LastCheckedOK": "Last Successful Check",
656
"LastApprovalRequest": "Last Approval Request",
657
"Created": "Created",
658
"Interval": "Interval",
660
"Fingerprint": "Fingerprint",
662
"CheckerRunning": "Check Is Running",
663
"LastEnabled": "Last Enabled",
664
"ApprovalPending": "Approval Is Pending",
665
"ApprovedByDefault": "Approved By Default",
666
"ApprovalDelay": "Approval Delay",
667
"ApprovalDuration": "Approval Duration",
668
"Checker": "Checker",
669
"ExtendedTimeout": "Extended Timeout",
670
"Expires": "Expires",
671
"LastCheckerStatus": "Last Checker Status",
674
def __init__(self, clients, keywords):
675
self.clients = clients
676
self.keywords = keywords
679
return "\n".join(self.rows())
681
if sys.version_info.major == 2:
682
__unicode__ = __str__
684
return str(self).encode(
685
locale.getpreferredencoding())
688
format_string = self.row_formatting_string()
689
rows = [self.header_line(format_string)]
690
rows.extend(self.client_line(client, format_string)
691
for client in self.clients)
694
def row_formatting_string(self):
695
"Format string used to format table rows"
696
return " ".join("{{{key}:{width}}}".format(
697
width=max(len(self.tableheaders[key]),
698
*(len(self.string_from_client(client,
700
for client in self.clients)),
702
for key in self.keywords)
704
def string_from_client(self, client, key):
705
return self.valuetostring(client[key], key)
708
def valuetostring(cls, value, keyword):
709
if isinstance(value, dbus.Boolean):
710
return "Yes" if value else "No"
711
if keyword in ("Timeout", "Interval", "ApprovalDelay",
712
"ApprovalDuration", "ExtendedTimeout"):
713
return cls.milliseconds_to_string(value)
716
def header_line(self, format_string):
717
return format_string.format(**self.tableheaders)
719
def client_line(self, client, format_string):
720
return format_string.format(
721
**{key: self.string_from_client(client, key)
722
for key in self.keywords})
725
def milliseconds_to_string(ms):
726
td = datetime.timedelta(0, 0, 0, ms)
727
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
728
.format(days="{}T".format(td.days)
730
hours=td.seconds // 3600,
731
minutes=(td.seconds % 3600) // 60,
732
seconds=td.seconds % 60))
735
class Property(Base):
736
"Abstract class for Actions for setting one client property"
738
def run_on_one_client(self, client, properties):
739
"""Set the Client's D-Bus property"""
740
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
741
client.__dbus_object_path__,
742
dbus.PROPERTIES_IFACE, client_dbus_interface,
743
self.propname, self.value_to_set
744
if not isinstance(self.value_to_set,
746
else bool(self.value_to_set))
747
client.Set(client_dbus_interface, self.propname,
749
dbus_interface=dbus.PROPERTIES_IFACE)
753
raise NotImplementedError()
756
class Enable(Property):
758
value_to_set = dbus.Boolean(True)
761
class Disable(Property):
763
value_to_set = dbus.Boolean(False)
766
class BumpTimeout(Property):
767
propname = "LastCheckedOK"
771
class StartChecker(Property):
772
propname = "CheckerRunning"
773
value_to_set = dbus.Boolean(True)
776
class StopChecker(Property):
777
propname = "CheckerRunning"
778
value_to_set = dbus.Boolean(False)
781
class ApproveByDefault(Property):
782
propname = "ApprovedByDefault"
783
value_to_set = dbus.Boolean(True)
786
class DenyByDefault(Property):
787
propname = "ApprovedByDefault"
788
value_to_set = dbus.Boolean(False)
791
class PropertyValue(Property):
792
"Abstract class for Property recieving a value as argument"
793
def __init__(self, value):
794
self.value_to_set = value
797
class SetChecker(PropertyValue):
801
class SetHost(PropertyValue):
805
class SetSecret(PropertyValue):
809
def value_to_set(self):
813
def value_to_set(self, value):
814
"""When setting, read data from supplied file object"""
815
self._vts = value.read()
819
class MillisecondsPropertyValueArgument(PropertyValue):
820
"""Abstract class for PropertyValue taking a value argument as
821
a datetime.timedelta() but should store it as milliseconds."""
824
def value_to_set(self):
828
def value_to_set(self, value):
829
"When setting, convert value from a datetime.timedelta"
830
self._vts = int(round(value.total_seconds() * 1000))
833
class SetTimeout(MillisecondsPropertyValueArgument):
837
class SetExtendedTimeout(MillisecondsPropertyValueArgument):
838
propname = "ExtendedTimeout"
841
class SetInterval(MillisecondsPropertyValueArgument):
842
propname = "Interval"
845
class SetApprovalDelay(MillisecondsPropertyValueArgument):
846
propname = "ApprovalDelay"
849
class SetApprovalDuration(MillisecondsPropertyValueArgument):
850
propname = "ApprovalDuration"
721
class Test_milliseconds_to_string(unittest.TestCase):
723
self.assertEqual(milliseconds_to_string(93785000),
725
def test_no_days(self):
726
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
727
def test_all_zero(self):
728
self.assertEqual(milliseconds_to_string(0), "00:00:00")
729
def test_no_fractional_seconds(self):
730
self.assertEqual(milliseconds_to_string(400), "00:00:00")
731
self.assertEqual(milliseconds_to_string(900), "00:00:00")
732
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
734
class Test_string_to_delta(unittest.TestCase):
735
def test_handles_basic_rfc3339(self):
854
class TestCaseWithAssertLogs(unittest.TestCase):
855
"""unittest.TestCase.assertLogs only exists in Python 3.4"""
857
if not hasattr(unittest.TestCase, "assertLogs"):
858
@contextlib.contextmanager
859
def assertLogs(self, logger, level=logging.INFO):
860
capturing_handler = self.CapturingLevelHandler(level)
861
old_level = logger.level
862
old_propagate = logger.propagate
863
logger.addHandler(capturing_handler)
864
logger.setLevel(level)
865
logger.propagate = False
867
yield capturing_handler.watcher
869
logger.propagate = old_propagate
870
logger.removeHandler(capturing_handler)
871
logger.setLevel(old_level)
872
self.assertGreater(len(capturing_handler.watcher.records),
875
class CapturingLevelHandler(logging.Handler):
876
def __init__(self, level, *args, **kwargs):
877
logging.Handler.__init__(self, *args, **kwargs)
878
self.watcher = self.LoggingWatcher([], [])
879
def emit(self, record):
880
self.watcher.records.append(record)
881
self.watcher.output.append(self.format(record))
883
LoggingWatcher = collections.namedtuple("LoggingWatcher",
888
class Test_string_to_delta(TestCaseWithAssertLogs):
889
# Just test basic RFC 3339 functionality here, the doc string for
890
# rfc3339_duration_to_delta() already has more comprehensive
891
# tests, which is run by doctest.
893
def test_rfc3339_zero_seconds(self):
894
self.assertEqual(string_to_delta("PT0S"),
895
datetime.timedelta())
897
def test_rfc3339_zero_days(self):
898
self.assertEqual(string_to_delta("P0D"),
899
datetime.timedelta())
901
def test_rfc3339_one_second(self):
902
self.assertEqual(string_to_delta("PT1S"),
903
datetime.timedelta(0, 1))
905
def test_rfc3339_two_hours(self):
736
906
self.assertEqual(string_to_delta("PT2H"),
737
907
datetime.timedelta(0, 7200))
738
909
def test_falls_back_to_pre_1_6_1_with_warning(self):
739
# assertLogs only exists in Python 3.4
740
if hasattr(self, "assertLogs"):
741
with self.assertLogs(log, logging.WARNING):
742
value = string_to_delta("2h")
910
with self.assertLogs(log, logging.WARNING):
744
911
value = string_to_delta("2h")
745
912
self.assertEqual(value, datetime.timedelta(0, 7200))
747
class Test_TableOfClients(unittest.TestCase):
753
"Bool": "A D-BUS Boolean",
754
"NonDbusBoolean": "A Non-D-BUS Boolean",
755
"Integer": "An Integer",
756
"Timeout": "Timedelta 1",
757
"Interval": "Timedelta 2",
758
"ApprovalDelay": "Timedelta 3",
759
"ApprovalDuration": "Timedelta 4",
760
"ExtendedTimeout": "Timedelta 5",
761
"String": "A String",
763
self.keywords = ["Attr1", "AttrTwo"]
769
"Bool": dbus.Boolean(False),
770
"NonDbusBoolean": False,
774
"ApprovalDelay": 2000,
775
"ApprovalDuration": 3000,
776
"ExtendedTimeout": 4000,
783
"Bool": dbus.Boolean(True),
784
"NonDbusBoolean": True,
787
"Interval": 93786000,
788
"ApprovalDelay": 93787000,
789
"ApprovalDuration": 93788000,
790
"ExtendedTimeout": 93789000,
791
"String": "A huge string which will not fit," * 10,
794
def test_short_header(self):
795
text = str(TableOfClients(self.clients, self.keywords,
802
self.assertEqual(text, expected_text)
803
def test_booleans(self):
804
keywords = ["Bool", "NonDbusBoolean"]
805
text = str(TableOfClients(self.clients, keywords,
808
A D-BUS Boolean A Non-D-BUS Boolean
812
self.assertEqual(text, expected_text)
813
def test_milliseconds_detection(self):
814
keywords = ["Integer", "Timeout", "Interval", "ApprovalDelay",
815
"ApprovalDuration", "ExtendedTimeout"]
816
text = str(TableOfClients(self.clients, keywords,
819
An Integer Timedelta 1 Timedelta 2 Timedelta 3 Timedelta 4 Timedelta 5
820
0 00:00:00 00:00:01 00:00:02 00:00:03 00:00:04
821
1 1T02:03:05 1T02:03:06 1T02:03:07 1T02:03:08 1T02:03:09
823
self.assertEqual(text, expected_text)
824
def test_empty_and_long_string_values(self):
825
keywords = ["String"]
826
text = str(TableOfClients(self.clients, keywords,
831
A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,A huge string which will not fit,
833
self.assertEqual(text, expected_text)
915
class Test_check_option_syntax(unittest.TestCase):
917
self.parser = argparse.ArgumentParser()
918
add_command_line_options(self.parser)
920
def test_actions_requires_client_or_all(self):
921
for action, value in self.actions.items():
922
options = self.parser.parse_args()
923
setattr(options, action, value)
924
with self.assertParseError():
925
self.check_option_syntax(options)
927
# This mostly corresponds to the definition from has_actions() in
928
# check_option_syntax()
930
# The actual values set here are not that important, but we do
931
# at least stick to the correct types, even though they are
935
"bump_timeout": True,
936
"start_checker": True,
937
"stop_checker": True,
941
"timeout": datetime.timedelta(),
942
"extended_timeout": datetime.timedelta(),
943
"interval": datetime.timedelta(),
944
"approved_by_default": True,
945
"approval_delay": datetime.timedelta(),
946
"approval_duration": datetime.timedelta(),
948
"secret": io.BytesIO(b"x"),
953
@contextlib.contextmanager
954
def assertParseError(self):
955
with self.assertRaises(SystemExit) as e:
956
with self.redirect_stderr_to_devnull():
958
# Exit code from argparse is guaranteed to be "2". Reference:
959
# https://docs.python.org/3/library
960
# /argparse.html#exiting-methods
961
self.assertEqual(e.exception.code, 2)
964
@contextlib.contextmanager
965
def redirect_stderr_to_devnull():
966
null = os.open(os.path.devnull, os.O_RDWR)
967
stderrcopy = os.dup(sys.stderr.fileno())
968
os.dup2(null, sys.stderr.fileno())
974
os.dup2(stderrcopy, sys.stderr.fileno())
977
def check_option_syntax(self, options):
978
check_option_syntax(self.parser, options)
980
def test_actions_all_conflicts_with_verbose(self):
981
for action, value in self.actions.items():
982
options = self.parser.parse_args()
983
setattr(options, action, value)
985
options.verbose = True
986
with self.assertParseError():
987
self.check_option_syntax(options)
989
def test_actions_with_client_conflicts_with_verbose(self):
990
for action, value in self.actions.items():
991
options = self.parser.parse_args()
992
setattr(options, action, value)
993
options.verbose = True
994
options.client = ["foo"]
995
with self.assertParseError():
996
self.check_option_syntax(options)
998
def test_dump_json_conflicts_with_verbose(self):
999
options = self.parser.parse_args()
1000
options.dump_json = True
1001
options.verbose = True
1002
with self.assertParseError():
1003
self.check_option_syntax(options)
1005
def test_dump_json_conflicts_with_action(self):
1006
for action, value in self.actions.items():
1007
options = self.parser.parse_args()
1008
setattr(options, action, value)
1009
options.dump_json = True
1010
with self.assertParseError():
1011
self.check_option_syntax(options)
1013
def test_all_can_not_be_alone(self):
1014
options = self.parser.parse_args()
1016
with self.assertParseError():
1017
self.check_option_syntax(options)
1019
def test_all_is_ok_with_any_action(self):
1020
for action, value in self.actions.items():
1021
options = self.parser.parse_args()
1022
setattr(options, action, value)
1024
self.check_option_syntax(options)
1026
def test_any_action_is_ok_with_one_client(self):
1027
for action, value in self.actions.items():
1028
options = self.parser.parse_args()
1029
setattr(options, action, value)
1030
options.client = ["foo"]
1031
self.check_option_syntax(options)
1033
def test_one_client_with_all_actions_except_is_enabled(self):
1034
options = self.parser.parse_args()
1035
for action, value in self.actions.items():
1036
if action == "is_enabled":
1038
setattr(options, action, value)
1039
options.client = ["foo"]
1040
self.check_option_syntax(options)
1042
def test_two_clients_with_all_actions_except_is_enabled(self):
1043
options = self.parser.parse_args()
1044
for action, value in self.actions.items():
1045
if action == "is_enabled":
1047
setattr(options, action, value)
1048
options.client = ["foo", "barbar"]
1049
self.check_option_syntax(options)
1051
def test_two_clients_are_ok_with_actions_except_is_enabled(self):
1052
for action, value in self.actions.items():
1053
if action == "is_enabled":
1055
options = self.parser.parse_args()
1056
setattr(options, action, value)
1057
options.client = ["foo", "barbar"]
1058
self.check_option_syntax(options)
1060
def test_is_enabled_fails_without_client(self):
1061
options = self.parser.parse_args()
1062
options.is_enabled = True
1063
with self.assertParseError():
1064
self.check_option_syntax(options)
1066
def test_is_enabled_fails_with_two_clients(self):
1067
options = self.parser.parse_args()
1068
options.is_enabled = True
1069
options.client = ["foo", "barbar"]
1070
with self.assertParseError():
1071
self.check_option_syntax(options)
1073
def test_remove_can_only_be_combined_with_action_deny(self):
1074
for action, value in self.actions.items():
1075
if action in {"remove", "deny"}:
1077
options = self.parser.parse_args()
1078
setattr(options, action, value)
1080
options.remove = True
1081
with self.assertParseError():
1082
self.check_option_syntax(options)
1085
class Test_get_mandos_dbus_object(TestCaseWithAssertLogs):
1086
def test_calls_and_returns_get_object_on_bus(self):
1087
class MockBus(object):
1089
def get_object(mockbus_self, busname, dbus_path):
1090
# Note that "self" is still the testcase instance,
1091
# this MockBus instance is in "mockbus_self".
1092
self.assertEqual(busname, dbus_busname)
1093
self.assertEqual(dbus_path, server_dbus_path)
1094
mockbus_self.called = True
1097
mockbus = get_mandos_dbus_object(bus=MockBus())
1098
self.assertIsInstance(mockbus, MockBus)
1099
self.assertTrue(mockbus.called)
1101
def test_logs_and_exits_on_dbus_error(self):
1102
class MockBusFailing(object):
1103
def get_object(self, busname, dbus_path):
1104
raise dbus.exceptions.DBusException("Test")
1106
with self.assertLogs(log, logging.CRITICAL):
1107
with self.assertRaises(SystemExit) as e:
1108
bus = get_mandos_dbus_object(bus=MockBusFailing())
1110
if isinstance(e.exception.code, int):
1111
self.assertNotEqual(e.exception.code, 0)
1113
self.assertIsNotNone(e.exception.code)
1116
class Test_get_managed_objects(TestCaseWithAssertLogs):
1117
def test_calls_and_returns_GetManagedObjects(self):
1118
managed_objects = {"/clients/foo": { "Name": "foo"}}
1119
class MockObjectManager(object):
1120
def GetManagedObjects(self):
1121
return managed_objects
1122
retval = get_managed_objects(MockObjectManager())
1123
self.assertDictEqual(managed_objects, retval)
1125
def test_logs_and_exits_on_dbus_error(self):
1126
dbus_logger = logging.getLogger("dbus.proxies")
1128
class MockObjectManagerFailing(object):
1129
def GetManagedObjects(self):
1130
dbus_logger.error("Test")
1131
raise dbus.exceptions.DBusException("Test")
1133
class CountingHandler(logging.Handler):
1135
def emit(self, record):
1138
counting_handler = CountingHandler()
1140
dbus_logger.addHandler(counting_handler)
1143
with self.assertLogs(log, logging.CRITICAL) as watcher:
1144
with self.assertRaises(SystemExit) as e:
1145
get_managed_objects(MockObjectManagerFailing())
1147
dbus_logger.removeFilter(counting_handler)
1149
# Make sure the dbus logger was suppressed
1150
self.assertEqual(counting_handler.count, 0)
1152
# Test that the dbus_logger still works
1153
with self.assertLogs(dbus_logger, logging.ERROR):
1154
dbus_logger.error("Test")
1156
if isinstance(e.exception.code, int):
1157
self.assertNotEqual(e.exception.code, 0)
1159
self.assertIsNotNone(e.exception.code)
1162
class Test_commands_from_options(unittest.TestCase):
1164
self.parser = argparse.ArgumentParser()
1165
add_command_line_options(self.parser)
1167
def test_is_enabled(self):
1168
self.assert_command_from_args(["--is-enabled", "foo"],
1171
def assert_command_from_args(self, args, command_cls,
1173
"""Assert that parsing ARGS should result in an instance of
1174
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1175
options = self.parser.parse_args(args)
1176
check_option_syntax(self.parser, options)
1177
commands = commands_from_options(options)
1178
self.assertEqual(len(commands), 1)
1179
command = commands[0]
1180
self.assertIsInstance(command, command_cls)
1181
for key, value in cmd_attrs.items():
1182
self.assertEqual(getattr(command, key), value)
1184
def test_is_enabled_short(self):
1185
self.assert_command_from_args(["-V", "foo"],
1188
def test_approve(self):
1189
self.assert_command_from_args(["--approve", "foo"],
1192
def test_approve_short(self):
1193
self.assert_command_from_args(["-A", "foo"], command.Approve)
1195
def test_deny(self):
1196
self.assert_command_from_args(["--deny", "foo"], command.Deny)
1198
def test_deny_short(self):
1199
self.assert_command_from_args(["-D", "foo"], command.Deny)
1201
def test_remove(self):
1202
self.assert_command_from_args(["--remove", "foo"],
1205
def test_deny_before_remove(self):
1206
options = self.parser.parse_args(["--deny", "--remove",
1208
check_option_syntax(self.parser, options)
1209
commands = commands_from_options(options)
1210
self.assertEqual(len(commands), 2)
1211
self.assertIsInstance(commands[0], command.Deny)
1212
self.assertIsInstance(commands[1], command.Remove)
1214
def test_deny_before_remove_reversed(self):
1215
options = self.parser.parse_args(["--remove", "--deny",
1217
check_option_syntax(self.parser, options)
1218
commands = commands_from_options(options)
1219
self.assertEqual(len(commands), 2)
1220
self.assertIsInstance(commands[0], command.Deny)
1221
self.assertIsInstance(commands[1], command.Remove)
1223
def test_remove_short(self):
1224
self.assert_command_from_args(["-r", "foo"], command.Remove)
1226
def test_dump_json(self):
1227
self.assert_command_from_args(["--dump-json"],
1230
def test_enable(self):
1231
self.assert_command_from_args(["--enable", "foo"],
1234
def test_enable_short(self):
1235
self.assert_command_from_args(["-e", "foo"], command.Enable)
1237
def test_disable(self):
1238
self.assert_command_from_args(["--disable", "foo"],
1241
def test_disable_short(self):
1242
self.assert_command_from_args(["-d", "foo"], command.Disable)
1244
def test_bump_timeout(self):
1245
self.assert_command_from_args(["--bump-timeout", "foo"],
1246
command.BumpTimeout)
1248
def test_bump_timeout_short(self):
1249
self.assert_command_from_args(["-b", "foo"],
1250
command.BumpTimeout)
1252
def test_start_checker(self):
1253
self.assert_command_from_args(["--start-checker", "foo"],
1254
command.StartChecker)
1256
def test_stop_checker(self):
1257
self.assert_command_from_args(["--stop-checker", "foo"],
1258
command.StopChecker)
1260
def test_approve_by_default(self):
1261
self.assert_command_from_args(["--approve-by-default", "foo"],
1262
command.ApproveByDefault)
1264
def test_deny_by_default(self):
1265
self.assert_command_from_args(["--deny-by-default", "foo"],
1266
command.DenyByDefault)
1268
def test_checker(self):
1269
self.assert_command_from_args(["--checker", ":", "foo"],
1273
def test_checker_empty(self):
1274
self.assert_command_from_args(["--checker", "", "foo"],
1278
def test_checker_short(self):
1279
self.assert_command_from_args(["-c", ":", "foo"],
1283
def test_host(self):
1284
self.assert_command_from_args(["--host", "foo.example.org",
1285
"foo"], command.SetHost,
1286
value_to_set="foo.example.org")
1288
def test_host_short(self):
1289
self.assert_command_from_args(["-H", "foo.example.org",
1290
"foo"], command.SetHost,
1291
value_to_set="foo.example.org")
1293
def test_secret_devnull(self):
1294
self.assert_command_from_args(["--secret", os.path.devnull,
1295
"foo"], command.SetSecret,
1298
def test_secret_tempfile(self):
1299
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1300
value = b"secret\0xyzzy\nbar"
1303
self.assert_command_from_args(["--secret", f.name,
1304
"foo"], command.SetSecret,
1307
def test_secret_devnull_short(self):
1308
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1312
def test_secret_tempfile_short(self):
1313
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1314
value = b"secret\0xyzzy\nbar"
1317
self.assert_command_from_args(["-s", f.name, "foo"],
1321
def test_timeout(self):
1322
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1324
value_to_set=300000)
1326
def test_timeout_short(self):
1327
self.assert_command_from_args(["-t", "PT5M", "foo"],
1329
value_to_set=300000)
1331
def test_extended_timeout(self):
1332
self.assert_command_from_args(["--extended-timeout", "PT15M",
1334
command.SetExtendedTimeout,
1335
value_to_set=900000)
1337
def test_interval(self):
1338
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1339
command.SetInterval,
1340
value_to_set=120000)
1342
def test_interval_short(self):
1343
self.assert_command_from_args(["-i", "PT2M", "foo"],
1344
command.SetInterval,
1345
value_to_set=120000)
1347
def test_approval_delay(self):
1348
self.assert_command_from_args(["--approval-delay", "PT30S",
1350
command.SetApprovalDelay,
1353
def test_approval_duration(self):
1354
self.assert_command_from_args(["--approval-duration", "PT1S",
1356
command.SetApprovalDuration,
1359
def test_print_table(self):
1360
self.assert_command_from_args([], command.PrintTable,
1363
def test_print_table_verbose(self):
1364
self.assert_command_from_args(["--verbose"],
1368
def test_print_table_verbose_short(self):
1369
self.assert_command_from_args(["-v"], command.PrintTable,
1373
class TestCommand(unittest.TestCase):
1374
"""Abstract class for tests of command classes"""
1378
class MockClient(object):
1379
def __init__(self, name, **attributes):
1380
self.__dbus_object_path__ = "/clients/{}".format(name)
1381
self.attributes = attributes
1382
self.attributes["Name"] = name
1384
def Set(self, interface, propname, value, dbus_interface):
1385
testcase.assertEqual(interface, client_dbus_interface)
1386
testcase.assertEqual(dbus_interface,
1387
dbus.PROPERTIES_IFACE)
1388
self.attributes[propname] = value
1389
def Approve(self, approve, dbus_interface):
1390
testcase.assertEqual(dbus_interface,
1391
client_dbus_interface)
1392
self.calls.append(("Approve", (approve,
1394
self.client = MockClient(
1396
KeyID=("92ed150794387c03ce684574b1139a65"
1397
"94a34f895daaaf09fd8ea90a27cddb12"),
1399
Host="foo.example.org",
1400
Enabled=dbus.Boolean(True),
1402
LastCheckedOK="2019-02-03T00:00:00",
1403
Created="2019-01-02T00:00:00",
1405
Fingerprint=("778827225BA7DE539C5A"
1406
"7CFA59CFF7CDBD9A5920"),
1407
CheckerRunning=dbus.Boolean(False),
1408
LastEnabled="2019-01-03T00:00:00",
1409
ApprovalPending=dbus.Boolean(False),
1410
ApprovedByDefault=dbus.Boolean(True),
1411
LastApprovalRequest="",
1413
ApprovalDuration=1000,
1414
Checker="fping -q -- %(host)s",
1415
ExtendedTimeout=900000,
1416
Expires="2019-02-04T00:00:00",
1417
LastCheckerStatus=0)
1418
self.other_client = MockClient(
1420
KeyID=("0558568eedd67d622f5c83b35a115f79"
1421
"6ab612cff5ad227247e46c2b020f441c"),
1422
Secret=b"secretbar",
1424
Enabled=dbus.Boolean(True),
1426
LastCheckedOK="2019-02-04T00:00:00",
1427
Created="2019-01-03T00:00:00",
1429
Fingerprint=("3E393AEAEFB84C7E89E2"
1430
"F547B3A107558FCA3A27"),
1431
CheckerRunning=dbus.Boolean(True),
1432
LastEnabled="2019-01-04T00:00:00",
1433
ApprovalPending=dbus.Boolean(False),
1434
ApprovedByDefault=dbus.Boolean(False),
1435
LastApprovalRequest="2019-01-03T00:00:00",
1436
ApprovalDelay=30000,
1437
ApprovalDuration=93785000,
1439
ExtendedTimeout=900000,
1440
Expires="2019-02-05T00:00:00",
1441
LastCheckerStatus=-2)
1442
self.clients = collections.OrderedDict(
1444
("/clients/foo", self.client.attributes),
1445
("/clients/barbar", self.other_client.attributes),
1447
self.one_client = {"/clients/foo": self.client.attributes}
1453
def get_object(client_bus_name, path):
1454
self.assertEqual(client_bus_name, dbus_busname)
1456
# Note: "self" here is the TestCmd instance, not
1457
# the Bus instance, since this is a static method!
1458
"/clients/foo": self.client,
1459
"/clients/barbar": self.other_client,
1464
class TestBaseCommands(TestCommand):
1466
def test_IsEnabled_exits_successfully(self):
1467
with self.assertRaises(SystemExit) as e:
1468
command.IsEnabled().run(self.one_client)
1469
if e.exception.code is not None:
1470
self.assertEqual(e.exception.code, 0)
1472
self.assertIsNone(e.exception.code)
1474
def test_IsEnabled_exits_with_failure(self):
1475
self.client.attributes["Enabled"] = dbus.Boolean(False)
1476
with self.assertRaises(SystemExit) as e:
1477
command.IsEnabled().run(self.one_client)
1478
if isinstance(e.exception.code, int):
1479
self.assertNotEqual(e.exception.code, 0)
1481
self.assertIsNotNone(e.exception.code)
1483
def test_Approve(self):
1484
command.Approve().run(self.clients, self.bus)
1485
for clientpath in self.clients:
1486
client = self.bus.get_object(dbus_busname, clientpath)
1487
self.assertIn(("Approve", (True, client_dbus_interface)),
1490
def test_Deny(self):
1491
command.Deny().run(self.clients, self.bus)
1492
for clientpath in self.clients:
1493
client = self.bus.get_object(dbus_busname, clientpath)
1494
self.assertIn(("Approve", (False, client_dbus_interface)),
1497
def test_Remove(self):
1498
class MockMandos(object):
1501
def RemoveClient(self, dbus_path):
1502
self.calls.append(("RemoveClient", (dbus_path,)))
1503
mandos = MockMandos()
1504
command.Remove().run(self.clients, self.bus, mandos)
1505
for clientpath in self.clients:
1506
self.assertIn(("RemoveClient", (clientpath,)),
1512
"KeyID": ("92ed150794387c03ce684574b1139a65"
1513
"94a34f895daaaf09fd8ea90a27cddb12"),
1514
"Host": "foo.example.org",
1517
"LastCheckedOK": "2019-02-03T00:00:00",
1518
"Created": "2019-01-02T00:00:00",
1520
"Fingerprint": ("778827225BA7DE539C5A"
1521
"7CFA59CFF7CDBD9A5920"),
1522
"CheckerRunning": False,
1523
"LastEnabled": "2019-01-03T00:00:00",
1524
"ApprovalPending": False,
1525
"ApprovedByDefault": True,
1526
"LastApprovalRequest": "",
1528
"ApprovalDuration": 1000,
1529
"Checker": "fping -q -- %(host)s",
1530
"ExtendedTimeout": 900000,
1531
"Expires": "2019-02-04T00:00:00",
1532
"LastCheckerStatus": 0,
1536
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
1537
"6ab612cff5ad227247e46c2b020f441c"),
1538
"Host": "192.0.2.3",
1541
"LastCheckedOK": "2019-02-04T00:00:00",
1542
"Created": "2019-01-03T00:00:00",
1544
"Fingerprint": ("3E393AEAEFB84C7E89E2"
1545
"F547B3A107558FCA3A27"),
1546
"CheckerRunning": True,
1547
"LastEnabled": "2019-01-04T00:00:00",
1548
"ApprovalPending": False,
1549
"ApprovedByDefault": False,
1550
"LastApprovalRequest": "2019-01-03T00:00:00",
1551
"ApprovalDelay": 30000,
1552
"ApprovalDuration": 93785000,
1554
"ExtendedTimeout": 900000,
1555
"Expires": "2019-02-05T00:00:00",
1556
"LastCheckerStatus": -2,
1560
def test_DumpJSON_normal(self):
1561
output = command.DumpJSON().output(self.clients.values())
1562
json_data = json.loads(output)
1563
self.assertDictEqual(json_data, self.expected_json)
1565
def test_DumpJSON_one_client(self):
1566
output = command.DumpJSON().output(self.one_client.values())
1567
json_data = json.loads(output)
1568
expected_json = {"foo": self.expected_json["foo"]}
1569
self.assertDictEqual(json_data, expected_json)
1571
def test_PrintTable_normal(self):
1572
output = command.PrintTable().output(self.clients.values())
1573
expected_output = "\n".join((
1574
"Name Enabled Timeout Last Successful Check",
1575
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1576
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1578
self.assertEqual(output, expected_output)
1580
def test_PrintTable_verbose(self):
1581
output = command.PrintTable(verbose=True).output(
1582
self.clients.values())
1597
"Last Successful Check ",
1598
"2019-02-03T00:00:00 ",
1599
"2019-02-04T00:00:00 ",
1602
"2019-01-02T00:00:00 ",
1603
"2019-01-03T00:00:00 ",
1615
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1617
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1621
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1622
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1624
"Check Is Running ",
1629
"2019-01-03T00:00:00 ",
1630
"2019-01-04T00:00:00 ",
1632
"Approval Is Pending ",
1636
"Approved By Default ",
1640
"Last Approval Request ",
1642
"2019-01-03T00:00:00 ",
1648
"Approval Duration ",
1653
"fping -q -- %(host)s ",
1656
"Extended Timeout ",
1661
"2019-02-04T00:00:00 ",
1662
"2019-02-05T00:00:00 ",
1664
"Last Checker Status",
1669
num_lines = max(len(rows) for rows in columns)
1670
expected_output = "\n".join("".join(rows[line]
1671
for rows in columns)
1672
for line in range(num_lines))
1673
self.assertEqual(output, expected_output)
1675
def test_PrintTable_one_client(self):
1676
output = command.PrintTable().output(self.one_client.values())
1677
expected_output = "\n".join((
1678
"Name Enabled Timeout Last Successful Check",
1679
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1681
self.assertEqual(output, expected_output)
1684
class TestPropertyCmd(TestCommand):
1685
"""Abstract class for tests of command.Property classes"""
1687
if not hasattr(self, "command"):
1689
values_to_get = getattr(self, "values_to_get",
1691
for value_to_set, value_to_get in zip(self.values_to_set,
1693
for clientpath in self.clients:
1694
client = self.bus.get_object(dbus_busname, clientpath)
1695
old_value = client.attributes[self.propname]
1696
client.attributes[self.propname] = self.Unique()
1697
self.run_command(value_to_set, self.clients)
1698
for clientpath in self.clients:
1699
client = self.bus.get_object(dbus_busname, clientpath)
1700
value = client.attributes[self.propname]
1701
self.assertNotIsInstance(value, self.Unique)
1702
self.assertEqual(value, value_to_get)
1704
class Unique(object):
1705
"""Class for objects which exist only to be unique objects,
1706
since unittest.mock.sentinel only exists in Python 3.3"""
1708
def run_command(self, value, clients):
1709
self.command().run(clients, self.bus)
1712
class TestEnableCmd(TestPropertyCmd):
1713
command = command.Enable
1714
propname = "Enabled"
1715
values_to_set = [dbus.Boolean(True)]
1718
class TestDisableCmd(TestPropertyCmd):
1719
command = command.Disable
1720
propname = "Enabled"
1721
values_to_set = [dbus.Boolean(False)]
1724
class TestBumpTimeoutCmd(TestPropertyCmd):
1725
command = command.BumpTimeout
1726
propname = "LastCheckedOK"
1727
values_to_set = [""]
1730
class TestStartCheckerCmd(TestPropertyCmd):
1731
command = command.StartChecker
1732
propname = "CheckerRunning"
1733
values_to_set = [dbus.Boolean(True)]
1736
class TestStopCheckerCmd(TestPropertyCmd):
1737
command = command.StopChecker
1738
propname = "CheckerRunning"
1739
values_to_set = [dbus.Boolean(False)]
1742
class TestApproveByDefaultCmd(TestPropertyCmd):
1743
command = command.ApproveByDefault
1744
propname = "ApprovedByDefault"
1745
values_to_set = [dbus.Boolean(True)]
1748
class TestDenyByDefaultCmd(TestPropertyCmd):
1749
command = command.DenyByDefault
1750
propname = "ApprovedByDefault"
1751
values_to_set = [dbus.Boolean(False)]
1754
class TestPropertyValueCmd(TestPropertyCmd):
1755
"""Abstract class for tests of PropertyValueCmd classes"""
1758
if type(self) is TestPropertyValueCmd:
1760
return super(TestPropertyValueCmd, self).runTest()
1762
def run_command(self, value, clients):
1763
self.command(value).run(clients, self.bus)
1766
class TestSetCheckerCmd(TestPropertyValueCmd):
1767
command = command.SetChecker
1768
propname = "Checker"
1769
values_to_set = ["", ":", "fping -q -- %s"]
1772
class TestSetHostCmd(TestPropertyValueCmd):
1773
command = command.SetHost
1775
values_to_set = ["192.0.2.3", "foo.example.org"]
1778
class TestSetSecretCmd(TestPropertyValueCmd):
1779
command = command.SetSecret
1781
values_to_set = [io.BytesIO(b""),
1782
io.BytesIO(b"secret\0xyzzy\nbar")]
1783
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1786
class TestSetTimeoutCmd(TestPropertyValueCmd):
1787
command = command.SetTimeout
1788
propname = "Timeout"
1789
values_to_set = [datetime.timedelta(),
1790
datetime.timedelta(minutes=5),
1791
datetime.timedelta(seconds=1),
1792
datetime.timedelta(weeks=1),
1793
datetime.timedelta(weeks=52)]
1794
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1797
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1798
command = command.SetExtendedTimeout
1799
propname = "ExtendedTimeout"
1800
values_to_set = [datetime.timedelta(),
1801
datetime.timedelta(minutes=5),
1802
datetime.timedelta(seconds=1),
1803
datetime.timedelta(weeks=1),
1804
datetime.timedelta(weeks=52)]
1805
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1808
class TestSetIntervalCmd(TestPropertyValueCmd):
1809
command = command.SetInterval
1810
propname = "Interval"
1811
values_to_set = [datetime.timedelta(),
1812
datetime.timedelta(minutes=5),
1813
datetime.timedelta(seconds=1),
1814
datetime.timedelta(weeks=1),
1815
datetime.timedelta(weeks=52)]
1816
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1819
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1820
command = command.SetApprovalDelay
1821
propname = "ApprovalDelay"
1822
values_to_set = [datetime.timedelta(),
1823
datetime.timedelta(minutes=5),
1824
datetime.timedelta(seconds=1),
1825
datetime.timedelta(weeks=1),
1826
datetime.timedelta(weeks=52)]
1827
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1830
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1831
command = command.SetApprovalDuration
1832
propname = "ApprovalDuration"
1833
values_to_set = [datetime.timedelta(),
1834
datetime.timedelta(minutes=5),
1835
datetime.timedelta(seconds=1),
1836
datetime.timedelta(weeks=1),
1837
datetime.timedelta(weeks=52)]
1838
values_to_get = [0, 300000, 1000, 604800000, 31449600000]