274
## Classes for commands.
276
# Abstract classes first
277
class Command(object):
278
"""Abstract class for commands"""
279
def run(self, clients, bus=None, mandos=None):
280
"""Normal commands should implement run_on_one_client(), but
281
commands which want to operate on all clients at the same time
282
can override this run() method instead."""
284
for clientpath, properties in clients.items():
285
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
286
busname, str(clientpath))
287
client = bus.get_object(busname, clientpath)
288
self.run_on_one_client(client, properties)
290
class PrintCmd(Command):
291
"""Abstract class for commands printing client details"""
292
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
293
"Created", "Interval", "Host", "KeyID",
294
"Fingerprint", "CheckerRunning", "LastEnabled",
295
"ApprovalPending", "ApprovedByDefault",
296
"LastApprovalRequest", "ApprovalDelay",
297
"ApprovalDuration", "Checker", "ExtendedTimeout",
298
"Expires", "LastCheckerStatus")
299
def run(self, clients, bus=None, mandos=None):
300
print(self.output(clients.values()))
301
def output(self, clients):
302
raise NotImplementedError()
304
class PropertyCmd(Command):
305
"""Abstract class for Actions for setting one client property"""
306
def run_on_one_client(self, client, properties):
307
"""Set the Client's D-Bus property"""
308
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
309
client.__dbus_object_path__,
310
dbus.PROPERTIES_IFACE, client_interface,
311
self.propname, self.value_to_set
312
if not isinstance(self.value_to_set, dbus.Boolean)
313
else bool(self.value_to_set))
314
client.Set(client_interface, self.propname, self.value_to_set,
315
dbus_interface=dbus.PROPERTIES_IFACE)
318
raise NotImplementedError()
320
class PropertyValueCmd(PropertyCmd):
321
"""Abstract class for PropertyCmd recieving a value as argument"""
322
def __init__(self, value):
323
self.value_to_set = value
325
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
326
"""Abstract class for PropertyValueCmd taking a value argument as
327
a datetime.timedelta() but should store it as milliseconds."""
329
def value_to_set(self):
332
def value_to_set(self, value):
333
"""When setting, convert value from a datetime.timedelta"""
334
self._vts = int(round(value.total_seconds() * 1000))
336
# Actual (non-abstract) command classes
338
class PrintTableCmd(PrintCmd):
339
def __init__(self, verbose=False):
340
self.verbose = verbose
342
def output(self, clients):
343
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
344
keywords = default_keywords
346
keywords = self.all_keywords
347
return str(self.TableOfClients(clients, keywords))
349
class TableOfClients(object):
352
"Enabled": "Enabled",
353
"Timeout": "Timeout",
354
"LastCheckedOK": "Last Successful Check",
355
"LastApprovalRequest": "Last Approval Request",
356
"Created": "Created",
357
"Interval": "Interval",
359
"Fingerprint": "Fingerprint",
361
"CheckerRunning": "Check Is Running",
362
"LastEnabled": "Last Enabled",
363
"ApprovalPending": "Approval Is Pending",
364
"ApprovedByDefault": "Approved By Default",
365
"ApprovalDelay": "Approval Delay",
366
"ApprovalDuration": "Approval Duration",
367
"Checker": "Checker",
368
"ExtendedTimeout": "Extended Timeout",
369
"Expires": "Expires",
370
"LastCheckerStatus": "Last Checker Status",
373
def __init__(self, clients, keywords, tableheaders=None):
374
self.clients = clients
375
self.keywords = keywords
376
if tableheaders is not None:
377
self.tableheaders = tableheaders
380
return "\n".join(self.rows())
382
if sys.version_info.major == 2:
383
__unicode__ = __str__
385
return str(self).encode(locale.getpreferredencoding())
388
format_string = self.row_formatting_string()
389
rows = [self.header_line(format_string)]
390
rows.extend(self.client_line(client, format_string)
391
for client in self.clients)
394
def row_formatting_string(self):
395
"Format string used to format table rows"
396
return " ".join("{{{key}:{width}}}".format(
397
width=max(len(self.tableheaders[key]),
398
*(len(self.string_from_client(client, key))
399
for client in self.clients)),
401
for key in self.keywords)
403
def string_from_client(self, client, key):
404
return self.valuetostring(client[key], key)
407
def valuetostring(value, keyword):
408
if isinstance(value, dbus.Boolean):
409
return "Yes" if value else "No"
410
if keyword in ("Timeout", "Interval", "ApprovalDelay",
411
"ApprovalDuration", "ExtendedTimeout"):
412
return milliseconds_to_string(value)
415
def header_line(self, format_string):
416
return format_string.format(**self.tableheaders)
418
def client_line(self, client, format_string):
419
return format_string.format(
420
**{key: self.string_from_client(client, key)
421
for key in self.keywords})
425
class DumpJSONCmd(PrintCmd):
426
def output(self, clients):
427
data = {client["Name"]:
428
{key: self.dbus_boolean_to_bool(client[key])
429
for key in self.all_keywords}
430
for client in clients.values()}
431
return json.dumps(data, indent=4, separators=(',', ': '))
433
def dbus_boolean_to_bool(value):
434
if isinstance(value, dbus.Boolean):
438
class IsEnabledCmd(Command):
439
def run(self, clients, bus=None, mandos=None):
440
client, properties = next(iter(clients.items()))
441
if self.is_enabled(client, properties):
444
def is_enabled(self, client, properties):
445
return properties["Enabled"]
447
class RemoveCmd(Command):
448
def run_on_one_client(self, client, properties):
449
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
450
server_path, server_interface,
451
str(client.__dbus_object_path__))
452
self.mandos.RemoveClient(client.__dbus_object_path__)
454
class ApproveCmd(Command):
455
def run_on_one_client(self, client, properties):
456
log.debug("D-Bus: %s:%s:%s.Approve(True)", busname,
457
client.__dbus_object_path__, client_interface)
458
client.Approve(dbus.Boolean(True),
459
dbus_interface=client_interface)
461
class DenyCmd(Command):
462
def run_on_one_client(self, client, properties):
463
log.debug("D-Bus: %s:%s:%s.Approve(False)", busname,
464
client.__dbus_object_path__, client_interface)
465
client.Approve(dbus.Boolean(False),
466
dbus_interface=client_interface)
468
class EnableCmd(PropertyCmd):
470
value_to_set = dbus.Boolean(True)
472
class DisableCmd(PropertyCmd):
474
value_to_set = dbus.Boolean(False)
476
class BumpTimeoutCmd(PropertyCmd):
477
propname = "LastCheckedOK"
480
class StartCheckerCmd(PropertyCmd):
481
propname = "CheckerRunning"
482
value_to_set = dbus.Boolean(True)
484
class StopCheckerCmd(PropertyCmd):
485
propname = "CheckerRunning"
486
value_to_set = dbus.Boolean(False)
488
class ApproveByDefaultCmd(PropertyCmd):
489
propname = "ApprovedByDefault"
490
value_to_set = dbus.Boolean(True)
492
class DenyByDefaultCmd(PropertyCmd):
493
propname = "ApprovedByDefault"
494
value_to_set = dbus.Boolean(False)
496
class SetCheckerCmd(PropertyValueCmd):
499
class SetHostCmd(PropertyValueCmd):
502
class SetSecretCmd(PropertyValueCmd):
505
def value_to_set(self):
508
def value_to_set(self, value):
509
"""When setting, read data from supplied file object"""
510
self._vts = value.read()
513
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
516
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
517
propname = "ExtendedTimeout"
519
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "Interval"
522
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "ApprovalDelay"
525
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDuration"
528
def add_command_line_options(parser):
236
def print_clients(clients, keywords):
237
def valuetostring(value, keyword):
238
if type(value) is dbus.Boolean:
239
return "Yes" if value else "No"
240
if keyword in ("Timeout", "Interval", "ApprovalDelay",
241
"ApprovalDuration", "ExtendedTimeout"):
242
return milliseconds_to_string(value)
245
# Create format string to print table rows
246
format_string = " ".join("{{{key}:{width}}}".format(
247
width=max(len(tablewords[key]),
248
max(len(valuetostring(client[key], key))
249
for client in clients)),
253
print(format_string.format(**tablewords))
254
for client in clients:
256
.format(**{key: valuetostring(client[key], key)
257
for key in keywords}))
260
def has_actions(options):
261
return any((options.enable,
263
options.bump_timeout,
264
options.start_checker,
265
options.stop_checker,
268
options.checker is not None,
269
options.timeout is not None,
270
options.extended_timeout is not None,
271
options.interval is not None,
272
options.approved_by_default is not None,
273
options.approval_delay is not None,
274
options.approval_duration is not None,
275
options.host is not None,
276
options.secret is not None,
282
parser = argparse.ArgumentParser()
529
283
parser.add_argument("--version", action="version",
530
284
version="%(prog)s {}".format(version),
531
285
help="show version number and exit")
535
289
help="Print all fields")
536
290
parser.add_argument("-j", "--dump-json", action="store_true",
537
291
help="Dump client data in JSON format")
538
enable_disable = parser.add_mutually_exclusive_group()
539
enable_disable.add_argument("-e", "--enable", action="store_true",
540
help="Enable client")
541
enable_disable.add_argument("-d", "--disable",
543
help="disable client")
292
parser.add_argument("-e", "--enable", action="store_true",
293
help="Enable client")
294
parser.add_argument("-d", "--disable", action="store_true",
295
help="disable client")
544
296
parser.add_argument("-b", "--bump-timeout", action="store_true",
545
297
help="Bump timeout for client")
546
start_stop_checker = parser.add_mutually_exclusive_group()
547
start_stop_checker.add_argument("--start-checker",
549
help="Start checker for client")
550
start_stop_checker.add_argument("--stop-checker",
552
help="Stop checker for client")
298
parser.add_argument("--start-checker", action="store_true",
299
help="Start checker for client")
300
parser.add_argument("--stop-checker", action="store_true",
301
help="Stop checker for client")
553
302
parser.add_argument("-V", "--is-enabled", action="store_true",
554
303
help="Check if client is enabled")
555
304
parser.add_argument("-r", "--remove", action="store_true",
556
305
help="Remove client")
557
306
parser.add_argument("-c", "--checker",
558
307
help="Set checker command for client")
559
parser.add_argument("-t", "--timeout", type=string_to_delta,
308
parser.add_argument("-t", "--timeout",
560
309
help="Set timeout for client")
561
parser.add_argument("--extended-timeout", type=string_to_delta,
310
parser.add_argument("--extended-timeout",
562
311
help="Set extended timeout for client")
563
parser.add_argument("-i", "--interval", type=string_to_delta,
312
parser.add_argument("-i", "--interval",
564
313
help="Set checker interval for client")
565
approve_deny_default = parser.add_mutually_exclusive_group()
566
approve_deny_default.add_argument(
567
"--approve-by-default", action="store_true",
568
default=None, dest="approved_by_default",
569
help="Set client to be approved by default")
570
approve_deny_default.add_argument(
571
"--deny-by-default", action="store_false",
572
dest="approved_by_default",
573
help="Set client to be denied by default")
574
parser.add_argument("--approval-delay", type=string_to_delta,
314
parser.add_argument("--approve-by-default", action="store_true",
315
default=None, dest="approved_by_default",
316
help="Set client to be approved by default")
317
parser.add_argument("--deny-by-default", action="store_false",
318
dest="approved_by_default",
319
help="Set client to be denied by default")
320
parser.add_argument("--approval-delay",
575
321
help="Set delay before client approve/deny")
576
parser.add_argument("--approval-duration", type=string_to_delta,
322
parser.add_argument("--approval-duration",
577
323
help="Set duration of one client approval")
578
324
parser.add_argument("-H", "--host", help="Set host for client")
579
325
parser.add_argument("-s", "--secret",
580
326
type=argparse.FileType(mode="rb"),
581
327
help="Set password blob (file) for client")
582
approve_deny = parser.add_mutually_exclusive_group()
583
approve_deny.add_argument(
584
"-A", "--approve", action="store_true",
585
help="Approve any current client request")
586
approve_deny.add_argument("-D", "--deny", action="store_true",
587
help="Deny any current client request")
588
parser.add_argument("--debug", action="store_true",
589
help="Debug mode (show D-Bus commands)")
328
parser.add_argument("-A", "--approve", action="store_true",
329
help="Approve any current client request")
330
parser.add_argument("-D", "--deny", action="store_true",
331
help="Deny any current client request")
590
332
parser.add_argument("--check", action="store_true",
591
333
help="Run self-test")
592
334
parser.add_argument("client", nargs="*", help="Client name")
595
def commands_from_options(options):
599
if options.dump_json:
600
commands.append(DumpJSONCmd())
603
commands.append(EnableCmd())
606
commands.append(DisableCmd())
608
if options.bump_timeout:
609
commands.append(BumpTimeoutCmd())
611
if options.start_checker:
612
commands.append(StartCheckerCmd())
614
if options.stop_checker:
615
commands.append(StopCheckerCmd())
617
if options.is_enabled:
618
commands.append(IsEnabledCmd())
620
if options.checker is not None:
621
commands.append(SetCheckerCmd(options.checker))
623
if options.timeout is not None:
624
commands.append(SetTimeoutCmd(options.timeout))
626
if options.extended_timeout:
628
SetExtendedTimeoutCmd(options.extended_timeout))
630
if options.interval is not None:
631
commands.append(SetIntervalCmd(options.interval))
633
if options.approved_by_default is not None:
634
if options.approved_by_default:
635
commands.append(ApproveByDefaultCmd())
637
commands.append(DenyByDefaultCmd())
639
if options.approval_delay is not None:
640
commands.append(SetApprovalDelayCmd(options.approval_delay))
642
if options.approval_duration is not None:
644
SetApprovalDurationCmd(options.approval_duration))
646
if options.host is not None:
647
commands.append(SetHostCmd(options.host))
649
if options.secret is not None:
650
commands.append(SetSecretCmd(options.secret))
653
commands.append(ApproveCmd())
656
commands.append(DenyCmd())
659
commands.append(RemoveCmd())
661
# If no command option has been given, show table of clients,
662
# optionally verbosely
664
commands.append(PrintTableCmd(verbose=options.verbose))
669
def check_option_syntax(parser, options):
670
"""Apply additional restrictions on options, not expressible in
673
def has_actions(options):
674
return any((options.enable,
676
options.bump_timeout,
677
options.start_checker,
678
options.stop_checker,
681
options.checker is not None,
682
options.timeout is not None,
683
options.extended_timeout is not None,
684
options.interval is not None,
685
options.approved_by_default is not None,
686
options.approval_delay is not None,
687
options.approval_duration is not None,
688
options.host is not None,
689
options.secret is not None,
335
options = parser.parse_args()
693
337
if has_actions(options) and not (options.client or options.all):
694
338
parser.error("Options require clients names or --all.")
736
361
mandos_serv_object_manager = dbus.Interface(
737
362
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
739
# Filter out log message from dbus module
740
dbus_logger = logging.getLogger("dbus.proxies")
741
class NullFilter(logging.Filter):
742
def filter(self, record):
744
dbus_filter = NullFilter()
364
# block stderr since dbus library prints to stderr
365
null = os.open(os.path.devnull, os.O_RDWR)
366
stderrcopy = os.dup(sys.stderr.fileno())
367
os.dup2(null, sys.stderr.fileno())
746
dbus_logger.addFilter(dbus_filter)
747
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
748
server_path, dbus.OBJECT_MANAGER_IFACE)
749
mandos_clients = {path: ifs_and_props[client_interface]
750
for path, ifs_and_props in
751
mandos_serv_object_manager
752
.GetManagedObjects().items()
753
if client_interface in ifs_and_props}
371
mandos_clients = {path: ifs_and_props[client_interface]
372
for path, ifs_and_props in
373
mandos_serv_object_manager
374
.GetManagedObjects().items()
375
if client_interface in ifs_and_props}
378
os.dup2(stderrcopy, sys.stderr.fileno())
754
380
except dbus.exceptions.DBusException as e:
755
log.critical("Failed to access Mandos server through D-Bus:"
381
print("Access denied: "
382
"Accessing mandos server through D-Bus: {}".format(e),
759
# restore dbus logger
760
dbus_logger.removeFilter(dbus_filter)
762
386
# Compile dict of (clients: properties) to process
766
clients = {objpath: properties
767
for objpath, properties in mandos_clients.items()}
389
if options.all or not options.client:
390
clients = {bus.get_object(busname, path): properties
391
for path, properties in mandos_clients.items()}
769
for name in clientnames:
770
for objpath, properties in mandos_clients.items():
771
if properties["Name"] == name:
772
clients[objpath] = properties
393
for name in options.client:
394
for path, client in mandos_clients.items():
395
if client["Name"] == name:
396
client_objc = bus.get_object(busname, path)
397
clients[client_objc] = client
775
log.critical("Client not found on server: %r", name)
400
print("Client not found on server: {!r}"
401
.format(name), file=sys.stderr)
778
# Run all commands on clients
779
commands = commands_from_options(options)
780
for command in commands:
781
command.run(clients, bus, mandos_serv)
784
class Test_milliseconds_to_string(unittest.TestCase):
786
self.assertEqual(milliseconds_to_string(93785000),
788
def test_no_days(self):
789
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
790
def test_all_zero(self):
791
self.assertEqual(milliseconds_to_string(0), "00:00:00")
792
def test_no_fractional_seconds(self):
793
self.assertEqual(milliseconds_to_string(400), "00:00:00")
794
self.assertEqual(milliseconds_to_string(900), "00:00:00")
795
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
797
class Test_string_to_delta(unittest.TestCase):
798
def test_handles_basic_rfc3339(self):
799
self.assertEqual(string_to_delta("PT0S"),
800
datetime.timedelta())
801
self.assertEqual(string_to_delta("P0D"),
802
datetime.timedelta())
803
self.assertEqual(string_to_delta("PT1S"),
804
datetime.timedelta(0, 1))
805
self.assertEqual(string_to_delta("PT2H"),
806
datetime.timedelta(0, 7200))
807
def test_falls_back_to_pre_1_6_1_with_warning(self):
808
# assertLogs only exists in Python 3.4
809
if hasattr(self, "assertLogs"):
810
with self.assertLogs(log, logging.WARNING):
811
value = string_to_delta("2h")
813
class WarningFilter(logging.Filter):
814
"""Don't show, but record the presence of, warnings"""
815
def filter(self, record):
816
is_warning = record.levelno >= logging.WARNING
817
self.found = is_warning or getattr(self, "found",
819
return not is_warning
820
warning_filter = WarningFilter()
821
log.addFilter(warning_filter)
823
value = string_to_delta("2h")
825
log.removeFilter(warning_filter)
826
self.assertTrue(getattr(warning_filter, "found", False))
827
self.assertEqual(value, datetime.timedelta(0, 7200))
830
class TestCmd(unittest.TestCase):
831
"""Abstract class for tests of command classes"""
834
class MockClient(object):
835
def __init__(self, name, **attributes):
836
self.__dbus_object_path__ = "/clients/{}".format(name)
837
self.attributes = attributes
838
self.attributes["Name"] = name
840
def Set(self, interface, propname, value, dbus_interface):
841
testcase.assertEqual(interface, client_interface)
842
testcase.assertEqual(dbus_interface,
843
dbus.PROPERTIES_IFACE)
844
self.attributes[propname] = value
845
def Get(self, interface, propname, dbus_interface):
846
testcase.assertEqual(interface, client_interface)
847
testcase.assertEqual(dbus_interface,
848
dbus.PROPERTIES_IFACE)
849
return self.attributes[propname]
850
def Approve(self, approve, dbus_interface):
851
testcase.assertEqual(dbus_interface, client_interface)
852
self.calls.append(("Approve", (approve,
854
self.client = MockClient(
856
KeyID=("92ed150794387c03ce684574b1139a65"
857
"94a34f895daaaf09fd8ea90a27cddb12"),
859
Host="foo.example.org",
860
Enabled=dbus.Boolean(True),
862
LastCheckedOK="2019-02-03T00:00:00",
863
Created="2019-01-02T00:00:00",
865
Fingerprint=("778827225BA7DE539C5A"
866
"7CFA59CFF7CDBD9A5920"),
867
CheckerRunning=dbus.Boolean(False),
868
LastEnabled="2019-01-03T00:00:00",
869
ApprovalPending=dbus.Boolean(False),
870
ApprovedByDefault=dbus.Boolean(True),
871
LastApprovalRequest="",
873
ApprovalDuration=1000,
874
Checker="fping -q -- %(host)s",
875
ExtendedTimeout=900000,
876
Expires="2019-02-04T00:00:00",
878
self.other_client = MockClient(
880
KeyID=("0558568eedd67d622f5c83b35a115f79"
881
"6ab612cff5ad227247e46c2b020f441c"),
884
Enabled=dbus.Boolean(True),
886
LastCheckedOK="2019-02-04T00:00:00",
887
Created="2019-01-03T00:00:00",
889
Fingerprint=("3E393AEAEFB84C7E89E2"
890
"F547B3A107558FCA3A27"),
891
CheckerRunning=dbus.Boolean(True),
892
LastEnabled="2019-01-04T00:00:00",
893
ApprovalPending=dbus.Boolean(False),
894
ApprovedByDefault=dbus.Boolean(False),
895
LastApprovalRequest="2019-01-03T00:00:00",
897
ApprovalDuration=1000,
899
ExtendedTimeout=900000,
900
Expires="2019-02-05T00:00:00",
901
LastCheckerStatus=-2)
902
self.clients = collections.OrderedDict(
904
("/clients/foo", self.client.attributes),
905
("/clients/barbar", self.other_client.attributes),
907
self.one_client = {"/clients/foo": self.client.attributes}
912
def get_object(client_bus_name, path):
913
self.assertEqual(client_bus_name, busname)
915
"/clients/foo": self.client,
916
"/clients/barbar": self.other_client,
920
class TestPrintTableCmd(TestCmd):
921
def test_normal(self):
922
output = PrintTableCmd().output(self.clients.values())
923
expected_output = """
924
Name Enabled Timeout Last Successful Check
925
foo Yes 00:05:00 2019-02-03T00:00:00
926
barbar Yes 00:05:00 2019-02-04T00:00:00
928
self.assertEqual(output, expected_output)
929
def test_verbose(self):
930
output = PrintTableCmd(verbose=True).output(
931
self.clients.values())
932
expected_output = """
933
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
934
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
935
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
937
self.assertEqual(output, expected_output)
938
def test_one_client(self):
939
output = PrintTableCmd().output(self.one_client.values())
940
expected_output = """
941
Name Enabled Timeout Last Successful Check
942
foo Yes 00:05:00 2019-02-03T00:00:00
944
self.assertEqual(output, expected_output)
946
class TestDumpJSONCmd(TestCmd):
948
self.expected_json = {
951
"KeyID": ("92ed150794387c03ce684574b1139a65"
952
"94a34f895daaaf09fd8ea90a27cddb12"),
953
"Host": "foo.example.org",
956
"LastCheckedOK": "2019-02-03T00:00:00",
957
"Created": "2019-01-02T00:00:00",
959
"Fingerprint": ("778827225BA7DE539C5A"
960
"7CFA59CFF7CDBD9A5920"),
961
"CheckerRunning": False,
962
"LastEnabled": "2019-01-03T00:00:00",
963
"ApprovalPending": False,
964
"ApprovedByDefault": True,
965
"LastApprovalRequest": "",
967
"ApprovalDuration": 1000,
968
"Checker": "fping -q -- %(host)s",
969
"ExtendedTimeout": 900000,
970
"Expires": "2019-02-04T00:00:00",
971
"LastCheckerStatus": 0,
975
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
976
"6ab612cff5ad227247e46c2b020f441c"),
980
"LastCheckedOK": "2019-02-04T00:00:00",
981
"Created": "2019-01-03T00:00:00",
983
"Fingerprint": ("3E393AEAEFB84C7E89E2"
984
"F547B3A107558FCA3A27"),
985
"CheckerRunning": True,
986
"LastEnabled": "2019-01-04T00:00:00",
987
"ApprovalPending": False,
988
"ApprovedByDefault": False,
989
"LastApprovalRequest": "2019-01-03T00:00:00",
990
"ApprovalDelay": 30000,
991
"ApprovalDuration": 1000,
993
"ExtendedTimeout": 900000,
994
"Expires": "2019-02-05T00:00:00",
995
"LastCheckerStatus": -2,
998
return super(TestDumpJSONCmd, self).setUp()
999
def test_normal(self):
1000
json_data = json.loads(DumpJSONCmd().output(self.clients))
1001
self.assertDictEqual(json_data, self.expected_json)
1002
def test_one_client(self):
1003
clients = self.one_client
1004
json_data = json.loads(DumpJSONCmd().output(clients))
1005
expected_json = {"foo": self.expected_json["foo"]}
1006
self.assertDictEqual(json_data, expected_json)
1008
class TestIsEnabledCmd(TestCmd):
1009
def test_is_enabled(self):
1010
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
1011
for client, properties in self.clients.items()))
1012
def test_is_enabled_run_exits_successfully(self):
1013
with self.assertRaises(SystemExit) as e:
1014
IsEnabledCmd().run(self.one_client)
1015
if e.exception.code is not None:
1016
self.assertEqual(e.exception.code, 0)
1018
self.assertIsNone(e.exception.code)
1019
def test_is_enabled_run_exits_with_failure(self):
1020
self.client.attributes["Enabled"] = dbus.Boolean(False)
1021
with self.assertRaises(SystemExit) as e:
1022
IsEnabledCmd().run(self.one_client)
1023
if isinstance(e.exception.code, int):
1024
self.assertNotEqual(e.exception.code, 0)
1026
self.assertIsNotNone(e.exception.code)
1028
class TestRemoveCmd(TestCmd):
1029
def test_remove(self):
1030
class MockMandos(object):
1033
def RemoveClient(self, dbus_path):
1034
self.calls.append(("RemoveClient", (dbus_path,)))
1035
mandos = MockMandos()
1036
super(TestRemoveCmd, self).setUp()
1037
RemoveCmd().run(self.clients, self.bus, mandos)
1038
self.assertEqual(len(mandos.calls), 2)
1039
for clientpath in self.clients:
1040
self.assertIn(("RemoveClient", (clientpath,)),
1043
class TestApproveCmd(TestCmd):
1044
def test_approve(self):
1045
ApproveCmd().run(self.clients, self.bus)
1046
for clientpath in self.clients:
1047
client = self.bus.get_object(busname, clientpath)
1048
self.assertIn(("Approve", (True, client_interface)),
1051
class TestDenyCmd(TestCmd):
1052
def test_deny(self):
1053
DenyCmd().run(self.clients, self.bus)
1054
for clientpath in self.clients:
1055
client = self.bus.get_object(busname, clientpath)
1056
self.assertIn(("Approve", (False, client_interface)),
1059
class TestEnableCmd(TestCmd):
1060
def test_enable(self):
1061
for clientpath in self.clients:
1062
client = self.bus.get_object(busname, clientpath)
1063
client.attributes["Enabled"] = False
1065
EnableCmd().run(self.clients, self.bus)
1067
for clientpath in self.clients:
1068
client = self.bus.get_object(busname, clientpath)
1069
self.assertTrue(client.attributes["Enabled"])
1071
class TestDisableCmd(TestCmd):
1072
def test_disable(self):
1073
DisableCmd().run(self.clients, self.bus)
1074
for clientpath in self.clients:
1075
client = self.bus.get_object(busname, clientpath)
1076
self.assertFalse(client.attributes["Enabled"])
1078
class Unique(object):
1079
"""Class for objects which exist only to be unique objects, since
1080
unittest.mock.sentinel only exists in Python 3.3"""
1082
class TestPropertyCmd(TestCmd):
1083
"""Abstract class for tests of PropertyCmd classes"""
1085
if not hasattr(self, "command"):
1087
values_to_get = getattr(self, "values_to_get",
1089
for value_to_set, value_to_get in zip(self.values_to_set,
1091
for clientpath in self.clients:
1092
client = self.bus.get_object(busname, clientpath)
1093
old_value = client.attributes[self.propname]
1094
self.assertNotIsInstance(old_value, Unique)
1095
client.attributes[self.propname] = Unique()
1096
self.run_command(value_to_set, self.clients)
1097
for clientpath in self.clients:
1098
client = self.bus.get_object(busname, clientpath)
1099
value = client.attributes[self.propname]
1100
self.assertNotIsInstance(value, Unique)
1101
self.assertEqual(value, value_to_get)
1102
def run_command(self, value, clients):
1103
self.command().run(clients, self.bus)
1105
class TestBumpTimeoutCmd(TestPropertyCmd):
1106
command = BumpTimeoutCmd
1107
propname = "LastCheckedOK"
1108
values_to_set = [""]
1110
class TestStartCheckerCmd(TestPropertyCmd):
1111
command = StartCheckerCmd
1112
propname = "CheckerRunning"
1113
values_to_set = [dbus.Boolean(True)]
1115
class TestStopCheckerCmd(TestPropertyCmd):
1116
command = StopCheckerCmd
1117
propname = "CheckerRunning"
1118
values_to_set = [dbus.Boolean(False)]
1120
class TestApproveByDefaultCmd(TestPropertyCmd):
1121
command = ApproveByDefaultCmd
1122
propname = "ApprovedByDefault"
1123
values_to_set = [dbus.Boolean(True)]
1125
class TestDenyByDefaultCmd(TestPropertyCmd):
1126
command = DenyByDefaultCmd
1127
propname = "ApprovedByDefault"
1128
values_to_set = [dbus.Boolean(False)]
1130
class TestPropertyValueCmd(TestPropertyCmd):
1131
"""Abstract class for tests of PropertyValueCmd classes"""
1133
if type(self) is TestPropertyValueCmd:
1135
return super(TestPropertyValueCmd, self).runTest()
1136
def run_command(self, value, clients):
1137
self.command(value).run(clients, self.bus)
1139
class TestSetCheckerCmd(TestPropertyValueCmd):
1140
command = SetCheckerCmd
1141
propname = "Checker"
1142
values_to_set = ["", ":", "fping -q -- %s"]
1144
class TestSetHostCmd(TestPropertyValueCmd):
1145
command = SetHostCmd
1147
values_to_set = ["192.0.2.3", "foo.example.org"]
1149
class TestSetSecretCmd(TestPropertyValueCmd):
1150
command = SetSecretCmd
1152
values_to_set = [io.BytesIO(b""),
1153
io.BytesIO(b"secret\0xyzzy\nbar")]
1154
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1156
class TestSetTimeoutCmd(TestPropertyValueCmd):
1157
command = SetTimeoutCmd
1158
propname = "Timeout"
1159
values_to_set = [datetime.timedelta(),
1160
datetime.timedelta(minutes=5),
1161
datetime.timedelta(seconds=1),
1162
datetime.timedelta(weeks=1),
1163
datetime.timedelta(weeks=52)]
1164
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1166
class TestSetExtendedTimeoutCmd(TestPropertyValueCmd):
1167
command = SetExtendedTimeoutCmd
1168
propname = "ExtendedTimeout"
1169
values_to_set = [datetime.timedelta(),
1170
datetime.timedelta(minutes=5),
1171
datetime.timedelta(seconds=1),
1172
datetime.timedelta(weeks=1),
1173
datetime.timedelta(weeks=52)]
1174
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1176
class TestSetIntervalCmd(TestPropertyValueCmd):
1177
command = SetIntervalCmd
1178
propname = "Interval"
1179
values_to_set = [datetime.timedelta(),
1180
datetime.timedelta(minutes=5),
1181
datetime.timedelta(seconds=1),
1182
datetime.timedelta(weeks=1),
1183
datetime.timedelta(weeks=52)]
1184
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1186
class TestSetApprovalDelayCmd(TestPropertyValueCmd):
1187
command = SetApprovalDelayCmd
1188
propname = "ApprovalDelay"
1189
values_to_set = [datetime.timedelta(),
1190
datetime.timedelta(minutes=5),
1191
datetime.timedelta(seconds=1),
1192
datetime.timedelta(weeks=1),
1193
datetime.timedelta(weeks=52)]
1194
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1196
class TestSetApprovalDurationCmd(TestPropertyValueCmd):
1197
command = SetApprovalDurationCmd
1198
propname = "ApprovalDuration"
1199
values_to_set = [datetime.timedelta(),
1200
datetime.timedelta(minutes=5),
1201
datetime.timedelta(seconds=1),
1202
datetime.timedelta(weeks=1),
1203
datetime.timedelta(weeks=52)]
1204
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1206
class Test_command_from_options(unittest.TestCase):
1208
self.parser = argparse.ArgumentParser()
1209
add_command_line_options(self.parser)
1210
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1211
"""Assert that parsing ARGS should result in an instance of
1212
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1213
options = self.parser.parse_args(args)
1214
check_option_syntax(self.parser, options)
1215
commands = commands_from_options(options)
1216
self.assertEqual(len(commands), 1)
1217
command = commands[0]
1218
self.assertIsInstance(command, command_cls)
1219
for key, value in cmd_attrs.items():
1220
self.assertEqual(getattr(command, key), value)
1221
def test_print_table(self):
1222
self.assert_command_from_args([], PrintTableCmd,
1225
def test_print_table_verbose(self):
1226
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1229
def test_print_table_verbose_short(self):
1230
self.assert_command_from_args(["-v"], PrintTableCmd,
1233
def test_enable(self):
1234
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1236
def test_enable_short(self):
1237
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1239
def test_disable(self):
1240
self.assert_command_from_args(["--disable", "foo"],
1243
def test_disable_short(self):
1244
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1246
def test_bump_timeout(self):
1247
self.assert_command_from_args(["--bump-timeout", "foo"],
1250
def test_bump_timeout_short(self):
1251
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1253
def test_start_checker(self):
1254
self.assert_command_from_args(["--start-checker", "foo"],
1257
def test_stop_checker(self):
1258
self.assert_command_from_args(["--stop-checker", "foo"],
1261
def test_remove(self):
1262
self.assert_command_from_args(["--remove", "foo"],
1265
def test_remove_short(self):
1266
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1268
def test_checker(self):
1269
self.assert_command_from_args(["--checker", ":", "foo"],
1270
SetCheckerCmd, value_to_set=":")
1272
def test_checker_empty(self):
1273
self.assert_command_from_args(["--checker", "", "foo"],
1274
SetCheckerCmd, value_to_set="")
1276
def test_checker_short(self):
1277
self.assert_command_from_args(["-c", ":", "foo"],
1278
SetCheckerCmd, value_to_set=":")
1280
def test_timeout(self):
1281
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1283
value_to_set=300000)
1285
def test_timeout_short(self):
1286
self.assert_command_from_args(["-t", "PT5M", "foo"],
1288
value_to_set=300000)
1290
def test_extended_timeout(self):
1291
self.assert_command_from_args(["--extended-timeout", "PT15M",
1293
SetExtendedTimeoutCmd,
1294
value_to_set=900000)
1296
def test_interval(self):
1297
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1299
value_to_set=120000)
1301
def test_interval_short(self):
1302
self.assert_command_from_args(["-i", "PT2M", "foo"],
1304
value_to_set=120000)
1306
def test_approve_by_default(self):
1307
self.assert_command_from_args(["--approve-by-default", "foo"],
1308
ApproveByDefaultCmd)
1310
def test_deny_by_default(self):
1311
self.assert_command_from_args(["--deny-by-default", "foo"],
1314
def test_approval_delay(self):
1315
self.assert_command_from_args(["--approval-delay", "PT30S",
1316
"foo"], SetApprovalDelayCmd,
1319
def test_approval_duration(self):
1320
self.assert_command_from_args(["--approval-duration", "PT1S",
1321
"foo"], SetApprovalDurationCmd,
1324
def test_host(self):
1325
self.assert_command_from_args(["--host", "foo.example.org",
1327
value_to_set="foo.example.org")
1329
def test_host_short(self):
1330
self.assert_command_from_args(["-H", "foo.example.org",
1332
value_to_set="foo.example.org")
1334
def test_secret_devnull(self):
1335
self.assert_command_from_args(["--secret", os.path.devnull,
1336
"foo"], SetSecretCmd,
1339
def test_secret_tempfile(self):
1340
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1341
value = b"secret\0xyzzy\nbar"
1344
self.assert_command_from_args(["--secret", f.name,
1345
"foo"], SetSecretCmd,
1348
def test_secret_devnull_short(self):
1349
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1350
SetSecretCmd, value_to_set=b"")
1352
def test_secret_tempfile_short(self):
1353
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1354
value = b"secret\0xyzzy\nbar"
1357
self.assert_command_from_args(["-s", f.name, "foo"],
1361
def test_approve(self):
1362
self.assert_command_from_args(["--approve", "foo"],
1365
def test_approve_short(self):
1366
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1368
def test_deny(self):
1369
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1371
def test_deny_short(self):
1372
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1374
def test_dump_json(self):
1375
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1377
def test_is_enabled(self):
1378
self.assert_command_from_args(["--is-enabled", "foo"],
1381
def test_is_enabled_short(self):
1382
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1384
def test_deny_before_remove(self):
1385
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1386
check_option_syntax(self.parser, options)
1387
commands = commands_from_options(options)
1388
self.assertEqual(len(commands), 2)
1389
self.assertIsInstance(commands[0], DenyCmd)
1390
self.assertIsInstance(commands[1], RemoveCmd)
1392
def test_deny_before_remove_reversed(self):
1393
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1394
check_option_syntax(self.parser, options)
1395
commands = commands_from_options(options)
1396
self.assertEqual(len(commands), 2)
1397
self.assertIsInstance(commands[0], DenyCmd)
1398
self.assertIsInstance(commands[1], RemoveCmd)
1401
class Test_check_option_syntax(unittest.TestCase):
1402
# This mostly corresponds to the definition from has_actions() in
1403
# check_option_syntax()
1405
# The actual values set here are not that important, but we do
1406
# at least stick to the correct types, even though they are
1410
"bump_timeout": True,
1411
"start_checker": True,
1412
"stop_checker": True,
1416
"timeout": datetime.timedelta(),
1417
"extended_timeout": datetime.timedelta(),
1418
"interval": datetime.timedelta(),
1419
"approved_by_default": True,
1420
"approval_delay": datetime.timedelta(),
1421
"approval_duration": datetime.timedelta(),
1423
"secret": io.BytesIO(b"x"),
1429
self.parser = argparse.ArgumentParser()
1430
add_command_line_options(self.parser)
1432
@contextlib.contextmanager
1433
def assertParseError(self):
1434
with self.assertRaises(SystemExit) as e:
1435
with self.temporarily_suppress_stderr():
1437
# Exit code from argparse is guaranteed to be "2". Reference:
1438
# https://docs.python.org/3/library/argparse.html#exiting-methods
1439
self.assertEqual(e.exception.code, 2)
1442
@contextlib.contextmanager
1443
def temporarily_suppress_stderr():
1444
null = os.open(os.path.devnull, os.O_RDWR)
1445
stderrcopy = os.dup(sys.stderr.fileno())
1446
os.dup2(null, sys.stderr.fileno())
1452
os.dup2(stderrcopy, sys.stderr.fileno())
1453
os.close(stderrcopy)
1455
def check_option_syntax(self, options):
1456
check_option_syntax(self.parser, options)
1458
def test_actions_requires_client_or_all(self):
1459
for action, value in self.actions.items():
1460
options = self.parser.parse_args()
1461
setattr(options, action, value)
1462
with self.assertParseError():
1463
self.check_option_syntax(options)
1465
def test_actions_conflicts_with_verbose(self):
1466
for action, value in self.actions.items():
1467
options = self.parser.parse_args()
1468
setattr(options, action, value)
1469
options.verbose = True
1470
with self.assertParseError():
1471
self.check_option_syntax(options)
1473
def test_dump_json_conflicts_with_verbose(self):
1474
options = self.parser.parse_args()
1475
options.dump_json = True
1476
options.verbose = True
1477
with self.assertParseError():
1478
self.check_option_syntax(options)
1480
def test_dump_json_conflicts_with_action(self):
1481
for action, value in self.actions.items():
1482
options = self.parser.parse_args()
1483
setattr(options, action, value)
1484
options.dump_json = True
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_can_not_be_alone(self):
1489
options = self.parser.parse_args()
1491
with self.assertParseError():
1492
self.check_option_syntax(options)
1494
def test_all_is_ok_with_any_action(self):
1495
for action, value in self.actions.items():
1496
options = self.parser.parse_args()
1497
setattr(options, action, value)
1499
self.check_option_syntax(options)
1501
def test_is_enabled_fails_without_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
with self.assertParseError():
1505
self.check_option_syntax(options)
1507
def test_is_enabled_works_with_one_client(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo"]
1511
self.check_option_syntax(options)
1513
def test_is_enabled_fails_with_two_clients(self):
1514
options = self.parser.parse_args()
1515
options.is_enabled = True
1516
options.client = ["foo", "barbar"]
1517
with self.assertParseError():
1518
self.check_option_syntax(options)
1520
def test_remove_can_only_be_combined_with_action_deny(self):
1521
for action, value in self.actions.items():
1522
if action in {"remove", "deny"}:
1524
options = self.parser.parse_args()
1525
setattr(options, action, value)
1527
options.remove = True
1528
with self.assertParseError():
1529
self.check_option_syntax(options)
1533
def should_only_run_tests():
1534
parser = argparse.ArgumentParser(add_help=False)
1535
parser.add_argument("--check", action='store_true')
1536
args, unknown_args = parser.parse_known_args()
1537
run_tests = args.check
1539
# Remove --check argument from sys.argv
1540
sys.argv[1:] = unknown_args
1543
# Add all tests from doctest strings
1544
def load_tests(loader, tests, none):
1546
tests.addTests(doctest.DocTestSuite())
404
if not has_actions(options) and clients:
405
if options.verbose or options.dump_json:
406
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
407
"Created", "Interval", "Host", "Fingerprint",
408
"CheckerRunning", "LastEnabled",
409
"ApprovalPending", "ApprovedByDefault",
410
"LastApprovalRequest", "ApprovalDelay",
411
"ApprovalDuration", "Checker",
412
"ExtendedTimeout", "Expires",
415
keywords = defaultkeywords
417
if options.dump_json:
418
json.dump({client["Name"]: {key:
420
if isinstance(client[key],
424
for client in clients.values()},
425
fp=sys.stdout, indent=4,
426
separators=(',', ': '))
429
print_clients(clients.values(), keywords)
431
# Process each client in the list by all selected options
432
for client in clients:
434
def set_client_prop(prop, value):
435
"""Set a Client D-Bus property"""
436
client.Set(client_interface, prop, value,
437
dbus_interface=dbus.PROPERTIES_IFACE)
439
def set_client_prop_ms(prop, value):
440
"""Set a Client D-Bus property, converted
441
from a string to milliseconds."""
442
set_client_prop(prop,
443
string_to_delta(value).total_seconds()
447
mandos_serv.RemoveClient(client.__dbus_object_path__)
449
set_client_prop("Enabled", dbus.Boolean(True))
451
set_client_prop("Enabled", dbus.Boolean(False))
452
if options.bump_timeout:
453
set_client_prop("LastCheckedOK", "")
454
if options.start_checker:
455
set_client_prop("CheckerRunning", dbus.Boolean(True))
456
if options.stop_checker:
457
set_client_prop("CheckerRunning", dbus.Boolean(False))
458
if options.is_enabled:
459
if client.Get(client_interface, "Enabled",
460
dbus_interface=dbus.PROPERTIES_IFACE):
464
if options.checker is not None:
465
set_client_prop("Checker", options.checker)
466
if options.host is not None:
467
set_client_prop("Host", options.host)
468
if options.interval is not None:
469
set_client_prop_ms("Interval", options.interval)
470
if options.approval_delay is not None:
471
set_client_prop_ms("ApprovalDelay",
472
options.approval_delay)
473
if options.approval_duration is not None:
474
set_client_prop_ms("ApprovalDuration",
475
options.approval_duration)
476
if options.timeout is not None:
477
set_client_prop_ms("Timeout", options.timeout)
478
if options.extended_timeout is not None:
479
set_client_prop_ms("ExtendedTimeout",
480
options.extended_timeout)
481
if options.secret is not None:
482
set_client_prop("Secret",
483
dbus.ByteArray(options.secret.read()))
484
if options.approved_by_default is not None:
485
set_client_prop("ApprovedByDefault",
487
.approved_by_default))
489
client.Approve(dbus.Boolean(True),
490
dbus_interface=client_interface)
492
client.Approve(dbus.Boolean(False),
493
dbus_interface=client_interface)
1549
496
if __name__ == "__main__":
1550
if should_only_run_tests():
1551
# Call using ./tdd-python-script --check [--verbose]