223
267
value += datetime.timedelta(0, 0, 0, int(num))
226
def print_clients(clients, keywords):
227
def valuetostring(value, keyword):
228
if type(value) is dbus.Boolean:
229
return "Yes" if value else "No"
230
if keyword in ("Timeout", "Interval", "ApprovalDelay",
231
"ApprovalDuration", "ExtendedTimeout"):
232
return milliseconds_to_string(value)
233
return unicode(value)
235
# Create format string to print table rows
236
format_string = " ".join("{{{key}:{width}}}".format(
237
width = max(len(tablewords[key]),
238
max(len(valuetostring(client[key],
242
key = key) for key in keywords)
244
print(format_string.format(**tablewords))
245
for client in clients:
246
print(format_string.format(**dict((key,
247
valuetostring(client[key],
249
for key in keywords)))
271
## Classes for commands.
273
# Abstract classes first
274
class Command(object):
275
"""Abstract class for commands"""
276
def run(self, mandos, clients):
277
"""Normal commands should implement run_on_one_client(), but
278
commands which want to operate on all clients at the same time
279
can override this run() method instead."""
281
for client, properties in clients.items():
282
self.run_on_one_client(client, properties)
284
class PrintCmd(Command):
285
"""Abstract class for commands printing client details"""
286
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
287
"Created", "Interval", "Host", "KeyID",
288
"Fingerprint", "CheckerRunning", "LastEnabled",
289
"ApprovalPending", "ApprovedByDefault",
290
"LastApprovalRequest", "ApprovalDelay",
291
"ApprovalDuration", "Checker", "ExtendedTimeout",
292
"Expires", "LastCheckerStatus")
293
def run(self, mandos, clients):
294
print(self.output(clients))
296
class PropertyCmd(Command):
297
"""Abstract class for Actions for setting one client property"""
298
def run_on_one_client(self, client, properties):
299
"""Set the Client's D-Bus property"""
300
client.Set(client_interface, self.property, self.value_to_set,
301
dbus_interface=dbus.PROPERTIES_IFACE)
303
class ValueArgumentMixIn(object):
304
"""Mixin class for commands taking a value as argument"""
305
def __init__(self, value):
306
self.value_to_set = value
308
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
309
"""Mixin class for commands taking a value argument as
312
def value_to_set(self):
315
def value_to_set(self, value):
316
"""When setting, convert value to a datetime.timedelta"""
317
self._vts = string_to_delta(value).total_seconds() * 1000
319
# Actual (non-abstract) command classes
321
class PrintTableCmd(PrintCmd):
322
def __init__(self, verbose=False):
323
self.verbose = verbose
325
def output(self, clients):
326
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
327
keywords = default_keywords
329
keywords = self.all_keywords
330
return str(self.TableOfClients(clients.values(), keywords))
332
class TableOfClients(object):
335
"Enabled": "Enabled",
336
"Timeout": "Timeout",
337
"LastCheckedOK": "Last Successful Check",
338
"LastApprovalRequest": "Last Approval Request",
339
"Created": "Created",
340
"Interval": "Interval",
342
"Fingerprint": "Fingerprint",
344
"CheckerRunning": "Check Is Running",
345
"LastEnabled": "Last Enabled",
346
"ApprovalPending": "Approval Is Pending",
347
"ApprovedByDefault": "Approved By Default",
348
"ApprovalDelay": "Approval Delay",
349
"ApprovalDuration": "Approval Duration",
350
"Checker": "Checker",
351
"ExtendedTimeout": "Extended Timeout",
352
"Expires": "Expires",
353
"LastCheckerStatus": "Last Checker Status",
356
def __init__(self, clients, keywords, tableheaders=None):
357
self.clients = clients
358
self.keywords = keywords
359
if tableheaders is not None:
360
self.tableheaders = tableheaders
363
return "\n".join(self.rows())
365
if sys.version_info.major == 2:
366
__unicode__ = __str__
368
return str(self).encode(locale.getpreferredencoding())
371
format_string = self.row_formatting_string()
372
rows = [self.header_line(format_string)]
373
rows.extend(self.client_line(client, format_string)
374
for client in self.clients)
377
def row_formatting_string(self):
378
"Format string used to format table rows"
379
return " ".join("{{{key}:{width}}}".format(
380
width=max(len(self.tableheaders[key]),
381
*(len(self.string_from_client(client, key))
382
for client in self.clients)),
384
for key in self.keywords)
386
def string_from_client(self, client, key):
387
return self.valuetostring(client[key], key)
390
def valuetostring(value, keyword):
391
if isinstance(value, dbus.Boolean):
392
return "Yes" if value else "No"
393
if keyword in ("Timeout", "Interval", "ApprovalDelay",
394
"ApprovalDuration", "ExtendedTimeout"):
395
return milliseconds_to_string(value)
398
def header_line(self, format_string):
399
return format_string.format(**self.tableheaders)
401
def client_line(self, client, format_string):
402
return format_string.format(
403
**{key: self.string_from_client(client, key)
404
for key in self.keywords})
408
class DumpJSONCmd(PrintCmd):
409
def output(self, clients):
410
data = {client["Name"]:
411
{key: self.dbus_boolean_to_bool(client[key])
412
for key in self.all_keywords}
413
for client in clients.values()}
414
return json.dumps(data, indent=4, separators=(',', ': '))
416
def dbus_boolean_to_bool(value):
417
if isinstance(value, dbus.Boolean):
421
class IsEnabledCmd(Command):
422
def run_on_one_client(self, client, properties):
423
if self.is_enabled(client, properties):
426
def is_enabled(self, client, properties):
427
return bool(properties["Enabled"])
429
class RemoveCmd(Command):
430
def run_on_one_client(self, client, properties):
431
self.mandos.RemoveClient(client.__dbus_object_path__)
433
class ApproveCmd(Command):
434
def run_on_one_client(self, client, properties):
435
client.Approve(dbus.Boolean(True),
436
dbus_interface=client_interface)
438
class DenyCmd(Command):
439
def run_on_one_client(self, client, properties):
440
client.Approve(dbus.Boolean(False),
441
dbus_interface=client_interface)
443
class EnableCmd(PropertyCmd):
445
value_to_set = dbus.Boolean(True)
447
class DisableCmd(PropertyCmd):
449
value_to_set = dbus.Boolean(False)
451
class BumpTimeoutCmd(PropertyCmd):
452
property = "LastCheckedOK"
455
class StartCheckerCmd(PropertyCmd):
456
property = "CheckerRunning"
457
value_to_set = dbus.Boolean(True)
459
class StopCheckerCmd(PropertyCmd):
460
property = "CheckerRunning"
461
value_to_set = dbus.Boolean(False)
463
class ApproveByDefaultCmd(PropertyCmd):
464
property = "ApprovedByDefault"
465
value_to_set = dbus.Boolean(True)
467
class DenyByDefaultCmd(PropertyCmd):
468
property = "ApprovedByDefault"
469
value_to_set = dbus.Boolean(False)
471
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
474
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
477
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
480
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
483
class SetExtendedTimeoutCmd(PropertyCmd,
484
MillisecondsValueArgumentMixIn):
485
property = "ExtendedTimeout"
487
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
488
property = "Interval"
490
class SetApprovalDelayCmd(PropertyCmd,
491
MillisecondsValueArgumentMixIn):
492
property = "ApprovalDelay"
494
class SetApprovalDurationCmd(PropertyCmd,
495
MillisecondsValueArgumentMixIn):
496
property = "ApprovalDuration"
251
498
def has_actions(options):
252
499
return any((options.enable,
299
552
help="Set extended timeout for client")
300
553
parser.add_argument("-i", "--interval",
301
554
help="Set checker interval for client")
302
parser.add_argument("--approve-by-default", action="store_true",
303
default=None, dest="approved_by_default",
304
help="Set client to be approved by default")
305
parser.add_argument("--deny-by-default", action="store_false",
306
dest="approved_by_default",
307
help="Set client to be denied by default")
555
approve_deny_default = parser.add_mutually_exclusive_group()
556
approve_deny_default.add_argument(
557
"--approve-by-default", action="store_true",
558
default=None, dest="approved_by_default",
559
help="Set client to be approved by default")
560
approve_deny_default.add_argument(
561
"--deny-by-default", action="store_false",
562
dest="approved_by_default",
563
help="Set client to be denied by default")
308
564
parser.add_argument("--approval-delay",
309
565
help="Set delay before client approve/deny")
310
566
parser.add_argument("--approval-duration",
311
567
help="Set duration of one client approval")
312
568
parser.add_argument("-H", "--host", help="Set host for client")
313
parser.add_argument("-s", "--secret", type=file,
569
parser.add_argument("-s", "--secret",
570
type=argparse.FileType(mode="rb"),
314
571
help="Set password blob (file) for client")
315
parser.add_argument("-A", "--approve", action="store_true",
316
help="Approve any current client request")
317
parser.add_argument("-D", "--deny", action="store_true",
318
help="Deny any current client request")
572
approve_deny = parser.add_mutually_exclusive_group()
573
approve_deny.add_argument(
574
"-A", "--approve", action="store_true",
575
help="Approve any current client request")
576
approve_deny.add_argument("-D", "--deny", action="store_true",
577
help="Deny any current client request")
319
578
parser.add_argument("--check", action="store_true",
320
579
help="Run self-test")
321
580
parser.add_argument("client", nargs="*", help="Client name")
583
def commands_from_options(options):
587
if options.dump_json:
588
commands.append(DumpJSONCmd())
591
commands.append(EnableCmd())
594
commands.append(DisableCmd())
596
if options.bump_timeout:
597
commands.append(BumpTimeoutCmd())
599
if options.start_checker:
600
commands.append(StartCheckerCmd())
602
if options.stop_checker:
603
commands.append(StopCheckerCmd())
605
if options.is_enabled:
606
commands.append(IsEnabledCmd())
609
commands.append(RemoveCmd())
611
if options.checker is not None:
612
commands.append(SetCheckerCmd())
614
if options.timeout is not None:
615
commands.append(SetTimeoutCmd(options.timeout))
617
if options.extended_timeout:
619
SetExtendedTimeoutCmd(options.extended_timeout))
621
if options.interval is not None:
622
command.append(SetIntervalCmd(options.interval))
624
if options.approved_by_default is not None:
625
if options.approved_by_default:
626
command.append(ApproveByDefaultCmd())
628
command.append(DenyByDefaultCmd())
630
if options.approval_delay is not None:
631
command.append(SetApprovalDelayCmd(options.approval_delay))
633
if options.approval_duration is not None:
635
SetApprovalDurationCmd(options.approval_duration))
637
if options.host is not None:
638
command.append(SetHostCmd(options.host))
640
if options.secret is not None:
641
command.append(SetSecretCmd(options.secret))
644
commands.append(ApproveCmd())
647
commands.append(DenyCmd())
649
# If no command option has been given, show table of clients,
650
# optionally verbosely
652
commands.append(PrintTableCmd(verbose=options.verbose))
658
parser = argparse.ArgumentParser()
660
add_command_line_options(parser)
322
662
options = parser.parse_args()
324
664
if has_actions(options) and not (options.client or options.all):
325
665
parser.error("Options require clients names or --all.")
326
666
if options.verbose and has_actions(options):
327
parser.error("--verbose can only be used alone or with"
667
parser.error("--verbose can only be used alone.")
668
if options.dump_json and (options.verbose
669
or has_actions(options)):
670
parser.error("--dump-json can only be used alone.")
329
671
if options.all and not has_actions(options):
330
672
parser.error("--all requires an action.")
333
fail_count, test_count = doctest.testmod()
334
sys.exit(os.EX_OK if fail_count == 0 else 1)
673
if options.is_enabled and len(options.client) > 1:
674
parser.error("--is-enabled requires exactly one client")
676
clientnames = options.client
337
679
bus = dbus.SystemBus()
338
680
mandos_dbus_objc = bus.get_object(busname, server_path)
339
681
except dbus.exceptions.DBusException:
340
print("Could not connect to Mandos server",
682
log.critical("Could not connect to Mandos server")
344
685
mandos_serv = dbus.Interface(mandos_dbus_objc,
345
dbus_interface = server_interface)
347
#block stderr since dbus library prints to stderr
348
null = os.open(os.path.devnull, os.O_RDWR)
349
stderrcopy = os.dup(sys.stderr.fileno())
350
os.dup2(null, sys.stderr.fileno())
686
dbus_interface=server_interface)
687
mandos_serv_object_manager = dbus.Interface(
688
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
690
# Filter out log message from dbus module
691
dbus_logger = logging.getLogger("dbus.proxies")
692
class NullFilter(logging.Filter):
693
def filter(self, record):
695
dbus_filter = NullFilter()
354
mandos_clients = mandos_serv.GetAllClientsWithProperties()
357
os.dup2(stderrcopy, sys.stderr.fileno())
359
except dbus.exceptions.DBusException:
360
print("Access denied: Accessing mandos server through dbus.",
697
dbus_logger.addFilter(dbus_filter)
698
mandos_clients = {path: ifs_and_props[client_interface]
699
for path, ifs_and_props in
700
mandos_serv_object_manager
701
.GetManagedObjects().items()
702
if client_interface in ifs_and_props}
703
except dbus.exceptions.DBusException as e:
704
log.critical("Failed to access Mandos server through D-Bus:"
708
# restore dbus logger
709
dbus_logger.removeFilter(dbus_filter)
364
711
# Compile dict of (clients: properties) to process
367
if options.all or not options.client:
368
clients = dict((bus.get_object(busname, path), properties)
369
for path, properties in
370
mandos_clients.iteritems())
715
clients = {bus.get_object(busname, path): properties
716
for path, properties in mandos_clients.items()}
372
for name in options.client:
373
for path, client in mandos_clients.iteritems():
718
for name in clientnames:
719
for path, client in mandos_clients.items():
374
720
if client["Name"] == name:
375
721
client_objc = bus.get_object(busname, path)
376
722
clients[client_objc] = client
379
print("Client not found on server: {0!r}"
380
.format(name), file=sys.stderr)
725
log.critical("Client not found on server: %r", name)
383
if not has_actions(options) and clients:
385
keywords = ("Name", "Enabled", "Timeout",
386
"LastCheckedOK", "Created", "Interval",
387
"Host", "Fingerprint", "CheckerRunning",
388
"LastEnabled", "ApprovalPending",
390
"LastApprovalRequest", "ApprovalDelay",
391
"ApprovalDuration", "Checker",
394
keywords = defaultkeywords
396
print_clients(clients.values(), keywords)
398
# Process each client in the list by all selected options
399
for client in clients:
400
def set_client_prop(prop, value):
401
"""Set a Client D-Bus property"""
402
client.Set(client_interface, prop, value,
403
dbus_interface=dbus.PROPERTIES_IFACE)
404
def set_client_prop_ms(prop, value):
405
"""Set a Client D-Bus property, converted
406
from a string to milliseconds."""
407
set_client_prop(prop,
408
timedelta_to_milliseconds
409
(string_to_delta(value)))
411
mandos_serv.RemoveClient(client.__dbus_object_path__)
413
set_client_prop("Enabled", dbus.Boolean(True))
415
set_client_prop("Enabled", dbus.Boolean(False))
416
if options.bump_timeout:
417
set_client_prop("LastCheckedOK", "")
418
if options.start_checker:
419
set_client_prop("CheckerRunning", dbus.Boolean(True))
420
if options.stop_checker:
421
set_client_prop("CheckerRunning", dbus.Boolean(False))
422
if options.is_enabled:
423
sys.exit(0 if client.Get(client_interface,
426
dbus.PROPERTIES_IFACE)
428
if options.checker is not None:
429
set_client_prop("Checker", options.checker)
430
if options.host is not None:
431
set_client_prop("Host", options.host)
432
if options.interval is not None:
433
set_client_prop_ms("Interval", options.interval)
434
if options.approval_delay is not None:
435
set_client_prop_ms("ApprovalDelay",
436
options.approval_delay)
437
if options.approval_duration is not None:
438
set_client_prop_ms("ApprovalDuration",
439
options.approval_duration)
440
if options.timeout is not None:
441
set_client_prop_ms("Timeout", options.timeout)
442
if options.extended_timeout is not None:
443
set_client_prop_ms("ExtendedTimeout",
444
options.extended_timeout)
445
if options.secret is not None:
446
set_client_prop("Secret",
447
dbus.ByteArray(options.secret.read()))
448
if options.approved_by_default is not None:
449
set_client_prop("ApprovedByDefault",
451
.approved_by_default))
453
client.Approve(dbus.Boolean(True),
454
dbus_interface=client_interface)
456
client.Approve(dbus.Boolean(False),
457
dbus_interface=client_interface)
728
# Run all commands on clients
729
commands = commands_from_options(options)
730
for command in commands:
731
command.run(mandos_serv, clients)
734
class Test_milliseconds_to_string(unittest.TestCase):
736
self.assertEqual(milliseconds_to_string(93785000),
738
def test_no_days(self):
739
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
740
def test_all_zero(self):
741
self.assertEqual(milliseconds_to_string(0), "00:00:00")
742
def test_no_fractional_seconds(self):
743
self.assertEqual(milliseconds_to_string(400), "00:00:00")
744
self.assertEqual(milliseconds_to_string(900), "00:00:00")
745
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
747
class Test_string_to_delta(unittest.TestCase):
748
def test_handles_basic_rfc3339(self):
749
self.assertEqual(string_to_delta("PT2H"),
750
datetime.timedelta(0, 7200))
751
def test_falls_back_to_pre_1_6_1_with_warning(self):
752
# assertLogs only exists in Python 3.4
753
if hasattr(self, "assertLogs"):
754
with self.assertLogs(log, logging.WARNING):
755
value = string_to_delta("2h")
757
class WarningFilter(logging.Filter):
758
"""Don't show, but record the presence of, warnings"""
759
def filter(self, record):
760
is_warning = record.levelno >= logging.WARNING
761
self.found = is_warning or getattr(self, "found",
763
return not is_warning
764
warning_filter = WarningFilter()
765
log.addFilter(warning_filter)
767
value = string_to_delta("2h")
769
log.removeFilter(warning_filter)
770
self.assertTrue(getattr(warning_filter, "found", False))
771
self.assertEqual(value, datetime.timedelta(0, 7200))
774
class TestCmd(unittest.TestCase):
775
"""Abstract class for tests of command classes"""
778
class MockClient(object):
779
def __init__(self, name, **attributes):
780
self.__dbus_object_path__ = "objpath_{}".format(name)
781
self.attributes = attributes
782
self.attributes["Name"] = name
784
def Set(self, interface, property, value, dbus_interface):
785
testcase.assertEqual(interface, client_interface)
786
testcase.assertEqual(dbus_interface,
787
dbus.PROPERTIES_IFACE)
788
self.attributes[property] = value
789
self.calls.append(("Set", (interface, property, value,
791
def Get(self, interface, property, dbus_interface):
792
testcase.assertEqual(interface, client_interface)
793
testcase.assertEqual(dbus_interface,
794
dbus.PROPERTIES_IFACE)
795
self.calls.append(("Get", (interface, property,
797
return self.attributes[property]
798
def Approve(self, approve, dbus_interface):
799
testcase.assertEqual(dbus_interface, client_interface)
800
self.calls.append(("Approve", (approve,
802
self.client = MockClient(
804
KeyID=("92ed150794387c03ce684574b1139a65"
805
"94a34f895daaaf09fd8ea90a27cddb12"),
807
Host="foo.example.org",
808
Enabled=dbus.Boolean(True),
810
LastCheckedOK="2019-02-03T00:00:00",
811
Created="2019-01-02T00:00:00",
813
Fingerprint=("778827225BA7DE539C5A"
814
"7CFA59CFF7CDBD9A5920"),
815
CheckerRunning=dbus.Boolean(False),
816
LastEnabled="2019-01-03T00:00:00",
817
ApprovalPending=dbus.Boolean(False),
818
ApprovedByDefault=dbus.Boolean(True),
819
LastApprovalRequest="",
821
ApprovalDuration=1000,
822
Checker="fping -q -- %(host)s",
823
ExtendedTimeout=900000,
824
Expires="2019-02-04T00:00:00",
826
self.other_client = MockClient(
828
KeyID=("0558568eedd67d622f5c83b35a115f79"
829
"6ab612cff5ad227247e46c2b020f441c"),
832
Enabled=dbus.Boolean(True),
834
LastCheckedOK="2019-02-04T00:00:00",
835
Created="2019-01-03T00:00:00",
837
Fingerprint=("3E393AEAEFB84C7E89E2"
838
"F547B3A107558FCA3A27"),
839
CheckerRunning=dbus.Boolean(True),
840
LastEnabled="2019-01-04T00:00:00",
841
ApprovalPending=dbus.Boolean(False),
842
ApprovedByDefault=dbus.Boolean(False),
843
LastApprovalRequest="2019-01-03T00:00:00",
845
ApprovalDuration=1000,
847
ExtendedTimeout=900000,
848
Expires="2019-02-05T00:00:00",
849
LastCheckerStatus=-2)
850
self.clients = collections.OrderedDict(
852
(self.client, self.client.attributes),
853
(self.other_client, self.other_client.attributes),
855
self.one_client = {self.client: self.client.attributes}
857
class TestPrintTableCmd(TestCmd):
858
def test_normal(self):
859
output = PrintTableCmd().output(self.clients)
860
expected_output = """
861
Name Enabled Timeout Last Successful Check
862
foo Yes 00:05:00 2019-02-03T00:00:00
863
barbar Yes 00:05:00 2019-02-04T00:00:00
865
self.assertEqual(output, expected_output)
866
def test_verbose(self):
867
output = PrintTableCmd(verbose=True).output(self.clients)
868
expected_output = """
869
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
870
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
871
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
873
self.assertEqual(output, expected_output)
874
def test_one_client(self):
875
output = PrintTableCmd().output(self.one_client)
876
expected_output = """
877
Name Enabled Timeout Last Successful Check
878
foo Yes 00:05:00 2019-02-03T00:00:00
880
self.assertEqual(output, expected_output)
882
class TestDumpJSONCmd(TestCmd):
884
self.expected_json = {
887
"KeyID": ("92ed150794387c03ce684574b1139a65"
888
"94a34f895daaaf09fd8ea90a27cddb12"),
889
"Host": "foo.example.org",
892
"LastCheckedOK": "2019-02-03T00:00:00",
893
"Created": "2019-01-02T00:00:00",
895
"Fingerprint": ("778827225BA7DE539C5A"
896
"7CFA59CFF7CDBD9A5920"),
897
"CheckerRunning": False,
898
"LastEnabled": "2019-01-03T00:00:00",
899
"ApprovalPending": False,
900
"ApprovedByDefault": True,
901
"LastApprovalRequest": "",
903
"ApprovalDuration": 1000,
904
"Checker": "fping -q -- %(host)s",
905
"ExtendedTimeout": 900000,
906
"Expires": "2019-02-04T00:00:00",
907
"LastCheckerStatus": 0,
911
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
912
"6ab612cff5ad227247e46c2b020f441c"),
916
"LastCheckedOK": "2019-02-04T00:00:00",
917
"Created": "2019-01-03T00:00:00",
919
"Fingerprint": ("3E393AEAEFB84C7E89E2"
920
"F547B3A107558FCA3A27"),
921
"CheckerRunning": True,
922
"LastEnabled": "2019-01-04T00:00:00",
923
"ApprovalPending": False,
924
"ApprovedByDefault": False,
925
"LastApprovalRequest": "2019-01-03T00:00:00",
926
"ApprovalDelay": 30000,
927
"ApprovalDuration": 1000,
929
"ExtendedTimeout": 900000,
930
"Expires": "2019-02-05T00:00:00",
931
"LastCheckerStatus": -2,
934
return super(TestDumpJSONCmd, self).setUp()
935
def test_normal(self):
936
json_data = json.loads(DumpJSONCmd().output(self.clients))
937
self.assertDictEqual(json_data, self.expected_json)
938
def test_one_client(self):
939
clients = self.one_client
940
json_data = json.loads(DumpJSONCmd().output(clients))
941
expected_json = {"foo": self.expected_json["foo"]}
942
self.assertDictEqual(json_data, expected_json)
944
class TestIsEnabledCmd(TestCmd):
945
def test_is_enabled(self):
946
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
947
for client, properties in self.clients.items()))
948
def test_is_enabled_run_exits_successfully(self):
949
with self.assertRaises(SystemExit) as e:
950
IsEnabledCmd().run(None, self.one_client)
951
if e.exception.code is not None:
952
self.assertEqual(e.exception.code, 0)
954
self.assertIsNone(e.exception.code)
955
def test_is_enabled_run_exits_with_failure(self):
956
self.client.attributes["Enabled"] = dbus.Boolean(False)
957
with self.assertRaises(SystemExit) as e:
958
IsEnabledCmd().run(None, self.one_client)
959
if isinstance(e.exception.code, int):
960
self.assertNotEqual(e.exception.code, 0)
962
self.assertIsNotNone(e.exception.code)
964
class TestRemoveCmd(TestCmd):
965
def test_remove(self):
966
class MockMandos(object):
969
def RemoveClient(self, dbus_path):
970
self.calls.append(("RemoveClient", (dbus_path,)))
971
mandos = MockMandos()
972
super(TestRemoveCmd, self).setUp()
973
RemoveCmd().run(mandos, self.clients)
974
self.assertEqual(len(mandos.calls), 2)
975
for client in self.clients:
976
self.assertIn(("RemoveClient",
977
(client.__dbus_object_path__,)),
980
class TestApproveCmd(TestCmd):
981
def test_approve(self):
982
ApproveCmd().run(None, self.clients)
983
for client in self.clients:
984
self.assertIn(("Approve", (True, client_interface)),
987
class TestDenyCmd(TestCmd):
989
DenyCmd().run(None, self.clients)
990
for client in self.clients:
991
self.assertIn(("Approve", (False, client_interface)),
994
class TestEnableCmd(TestCmd):
995
def test_enable(self):
996
for client in self.clients:
997
client.attributes["Enabled"] = False
999
EnableCmd().run(None, self.clients)
1001
for client in self.clients:
1002
self.assertTrue(client.attributes["Enabled"])
1004
class TestDisableCmd(TestCmd):
1005
def test_disable(self):
1006
DisableCmd().run(None, self.clients)
1008
for client in self.clients:
1009
self.assertFalse(client.attributes["Enabled"])
1013
def should_only_run_tests():
1014
parser = argparse.ArgumentParser(add_help=False)
1015
parser.add_argument("--check", action='store_true')
1016
args, unknown_args = parser.parse_known_args()
1017
run_tests = args.check
1019
# Remove --check argument from sys.argv
1020
sys.argv[1:] = unknown_args
1023
# Add all tests from doctest strings
1024
def load_tests(loader, tests, none):
1026
tests.addTests(doctest.DocTestSuite())
459
1029
if __name__ == "__main__":
1030
if should_only_run_tests():
1031
# Call using ./tdd-python-script --check [--verbose]