267
223
value += datetime.timedelta(0, 0, 0, int(num))
271
## Classes for commands.
273
# Abstract classes first
274
class Command(object):
275
"""Abstract class for commands"""
276
def run(self, mandos, clients):
277
"""Normal commands should implement run_on_one_client(), but
278
commands which want to operate on all clients at the same time
279
can override this run() method instead."""
281
for client in clients:
282
self.run_on_one_client(client)
284
class PrintCmd(Command):
285
"""Abstract class for commands printing client details"""
286
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
287
"Created", "Interval", "Host", "KeyID",
288
"Fingerprint", "CheckerRunning", "LastEnabled",
289
"ApprovalPending", "ApprovedByDefault",
290
"LastApprovalRequest", "ApprovalDelay",
291
"ApprovalDuration", "Checker", "ExtendedTimeout",
292
"Expires", "LastCheckerStatus")
293
def run(self, mandos, clients):
294
print(self.output(clients))
296
class PropertyCmd(Command):
297
"""Abstract class for Actions for setting one client property"""
298
def run_on_one_client(self, client):
299
"""Set the Client's D-Bus property"""
300
client.Set(client_interface, self.property, self.value_to_set,
301
dbus_interface=dbus.PROPERTIES_IFACE)
303
class ValueArgumentMixIn(object):
304
"""Mixin class for commands taking a value as argument"""
305
def __init__(self, value):
306
self.value_to_set = value
308
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
309
"""Mixin class for commands taking a value argument as
312
def value_to_set(self):
315
def value_to_set(self, value):
316
"""When setting, convert value to a datetime.timedelta"""
317
self._vts = string_to_delta(value).total_seconds() * 1000
319
# Actual (non-abstract) command classes
321
class PrintTableCmd(PrintCmd):
322
def __init__(self, verbose=False):
323
self.verbose = verbose
325
def output(self, clients):
327
keywords = self.all_keywords
329
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
330
return str(self.TableOfClients(clients.values(), keywords))
332
class TableOfClients(object):
335
"Enabled": "Enabled",
336
"Timeout": "Timeout",
337
"LastCheckedOK": "Last Successful Check",
338
"LastApprovalRequest": "Last Approval Request",
339
"Created": "Created",
340
"Interval": "Interval",
342
"Fingerprint": "Fingerprint",
344
"CheckerRunning": "Check Is Running",
345
"LastEnabled": "Last Enabled",
346
"ApprovalPending": "Approval Is Pending",
347
"ApprovedByDefault": "Approved By Default",
348
"ApprovalDelay": "Approval Delay",
349
"ApprovalDuration": "Approval Duration",
350
"Checker": "Checker",
351
"ExtendedTimeout": "Extended Timeout",
352
"Expires": "Expires",
353
"LastCheckerStatus": "Last Checker Status",
356
def __init__(self, clients, keywords, tableheaders=None):
357
self.clients = clients
358
self.keywords = keywords
359
if tableheaders is not None:
360
self.tableheaders = tableheaders
363
return "\n".join(self.rows())
365
if sys.version_info.major == 2:
366
__unicode__ = __str__
368
return str(self).encode(locale.getpreferredencoding())
371
format_string = self.row_formatting_string()
372
rows = [self.header_line(format_string)]
373
rows.extend(self.client_line(client, format_string)
374
for client in self.clients)
377
def row_formatting_string(self):
378
"Format string used to format table rows"
379
return " ".join("{{{key}:{width}}}".format(
380
width=max(len(self.tableheaders[key]),
381
*(len(self.string_from_client(client, key))
382
for client in self.clients)),
384
for key in self.keywords)
386
def string_from_client(self, client, key):
387
return self.valuetostring(client[key], key)
390
def valuetostring(value, keyword):
391
if isinstance(value, dbus.Boolean):
392
return "Yes" if value else "No"
393
if keyword in ("Timeout", "Interval", "ApprovalDelay",
394
"ApprovalDuration", "ExtendedTimeout"):
395
return milliseconds_to_string(value)
398
def header_line(self, format_string):
399
return format_string.format(**self.tableheaders)
401
def client_line(self, client, format_string):
402
return format_string.format(
403
**{key: self.string_from_client(client, key)
404
for key in self.keywords})
408
class DumpJSONCmd(PrintCmd):
409
def output(self, clients):
410
data = {client["Name"]:
411
{key: self.dbus_boolean_to_bool(client[key])
412
for key in self.all_keywords}
413
for client in clients.values()}
414
return json.dumps(data, indent=4, separators=(',', ': '))
416
def dbus_boolean_to_bool(value):
417
if isinstance(value, dbus.Boolean):
421
class IsEnabledCmd(Command):
422
def run_on_one_client(self, client):
423
if self.is_enabled(client):
426
def is_enabled(self, client):
427
return client.Get(client_interface, "Enabled",
428
dbus_interface=dbus.PROPERTIES_IFACE)
430
class RemoveCmd(Command):
431
def run_on_one_client(self, client):
432
self.mandos.RemoveClient(client.__dbus_object_path__)
434
class ApproveCmd(Command):
435
def run_on_one_client(self, client):
436
client.Approve(dbus.Boolean(True),
437
dbus_interface=client_interface)
439
class DenyCmd(Command):
440
def run_on_one_client(self, client):
441
client.Approve(dbus.Boolean(False),
442
dbus_interface=client_interface)
444
class EnableCmd(PropertyCmd):
446
value_to_set = dbus.Boolean(True)
448
class DisableCmd(PropertyCmd):
450
value_to_set = dbus.Boolean(False)
452
class BumpTimeoutCmd(PropertyCmd):
453
property = "LastCheckedOK"
456
class StartCheckerCmd(PropertyCmd):
457
property = "CheckerRunning"
458
value_to_set = dbus.Boolean(True)
460
class StopCheckerCmd(PropertyCmd):
461
property = "CheckerRunning"
462
value_to_set = dbus.Boolean(False)
464
class ApproveByDefaultCmd(PropertyCmd):
465
property = "ApprovedByDefault"
466
value_to_set = dbus.Boolean(True)
468
class DenyByDefaultCmd(PropertyCmd):
469
property = "ApprovedByDefault"
470
value_to_set = dbus.Boolean(False)
472
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
475
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
478
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
481
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
484
class SetExtendedTimeoutCmd(PropertyCmd,
485
MillisecondsValueArgumentMixIn):
486
property = "ExtendedTimeout"
488
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
489
property = "Interval"
491
class SetApprovalDelayCmd(PropertyCmd,
492
MillisecondsValueArgumentMixIn):
493
property = "ApprovalDelay"
495
class SetApprovalDurationCmd(PropertyCmd,
496
MillisecondsValueArgumentMixIn):
497
property = "ApprovalDuration"
226
def print_clients(clients, keywords):
227
def valuetostring(value, keyword):
228
if type(value) is dbus.Boolean:
229
return "Yes" if value else "No"
230
if keyword in ("Timeout", "Interval", "ApprovalDelay",
231
"ApprovalDuration", "ExtendedTimeout"):
232
return milliseconds_to_string(value)
233
return unicode(value)
235
# Create format string to print table rows
236
format_string = " ".join("{{{key}:{width}}}".format(
237
width = max(len(tablewords[key]),
238
max(len(valuetostring(client[key],
242
key = key) for key in keywords)
244
print(format_string.format(**tablewords))
245
for client in clients:
246
print(format_string.format(**dict((key,
247
valuetostring(client[key],
249
for key in keywords)))
499
251
def has_actions(options):
500
252
return any((options.enable,
553
299
help="Set extended timeout for client")
554
300
parser.add_argument("-i", "--interval",
555
301
help="Set checker interval for client")
556
approve_deny_default = parser.add_mutually_exclusive_group()
557
approve_deny_default.add_argument(
558
"--approve-by-default", action="store_true",
559
default=None, dest="approved_by_default",
560
help="Set client to be approved by default")
561
approve_deny_default.add_argument(
562
"--deny-by-default", action="store_false",
563
dest="approved_by_default",
564
help="Set client to be denied by default")
302
parser.add_argument("--approve-by-default", action="store_true",
303
default=None, dest="approved_by_default",
304
help="Set client to be approved by default")
305
parser.add_argument("--deny-by-default", action="store_false",
306
dest="approved_by_default",
307
help="Set client to be denied by default")
565
308
parser.add_argument("--approval-delay",
566
309
help="Set delay before client approve/deny")
567
310
parser.add_argument("--approval-duration",
568
311
help="Set duration of one client approval")
569
312
parser.add_argument("-H", "--host", help="Set host for client")
570
parser.add_argument("-s", "--secret",
571
type=argparse.FileType(mode="rb"),
313
parser.add_argument("-s", "--secret", type=file,
572
314
help="Set password blob (file) for client")
573
approve_deny = parser.add_mutually_exclusive_group()
574
approve_deny.add_argument(
575
"-A", "--approve", action="store_true",
576
help="Approve any current client request")
577
approve_deny.add_argument("-D", "--deny", action="store_true",
578
help="Deny any current client request")
315
parser.add_argument("-A", "--approve", action="store_true",
316
help="Approve any current client request")
317
parser.add_argument("-D", "--deny", action="store_true",
318
help="Deny any current client request")
579
319
parser.add_argument("--check", action="store_true",
580
320
help="Run self-test")
581
321
parser.add_argument("client", nargs="*", help="Client name")
584
def commands_and_clients_from_options(options):
588
if options.dump_json:
589
commands.append(DumpJSONCmd())
592
commands.append(EnableCmd())
595
commands.append(DisableCmd())
597
if options.bump_timeout:
598
commands.append(BumpTimeoutCmd(options.bump_timeout))
600
if options.start_checker:
601
commands.append(StartCheckerCmd())
603
if options.stop_checker:
604
commands.append(StopCheckerCmd())
606
if options.is_enabled:
607
commands.append(IsEnabledCmd())
610
commands.append(RemoveCmd())
612
if options.checker is not None:
613
commands.append(SetCheckerCmd())
615
if options.timeout is not None:
616
commands.append(SetTimeoutCmd(options.timeout))
618
if options.extended_timeout:
620
SetExtendedTimeoutCmd(options.extended_timeout))
622
if options.interval is not None:
623
command.append(SetIntervalCmd(options.interval))
625
if options.approved_by_default is not None:
626
if options.approved_by_default:
627
command.append(ApproveByDefaultCmd())
629
command.append(DenyByDefaultCmd())
631
if options.approval_delay is not None:
632
command.append(SetApprovalDelayCmd(options.approval_delay))
634
if options.approval_duration is not None:
636
SetApprovalDurationCmd(options.approval_duration))
638
if options.host is not None:
639
command.append(SetHostCmd(options.host))
641
if options.secret is not None:
642
command.append(SetSecretCmd(options.secret))
645
commands.append(ApproveCmd())
648
commands.append(DenyCmd())
650
# If no command option has been given, show table of clients,
651
# optionally verbosely
653
commands.append(PrintTableCmd(verbose=options.verbose))
655
return commands, options.client
659
parser = argparse.ArgumentParser()
661
add_command_line_options(parser)
663
322
options = parser.parse_args()
665
324
if has_actions(options) and not (options.client or options.all):
666
325
parser.error("Options require clients names or --all.")
667
326
if options.verbose and has_actions(options):
668
parser.error("--verbose can only be used alone.")
669
if options.dump_json and (options.verbose
670
or has_actions(options)):
671
parser.error("--dump-json can only be used alone.")
327
parser.error("--verbose can only be used alone or with"
672
329
if options.all and not has_actions(options):
673
330
parser.error("--all requires an action.")
674
if options.is_enabled and len(options.client) > 1:
675
parser.error("--is-enabled requires exactly one client")
677
commands, clientnames = commands_and_clients_from_options(options)
333
fail_count, test_count = doctest.testmod()
334
sys.exit(0 if fail_count == 0 else 1)
680
337
bus = dbus.SystemBus()
681
338
mandos_dbus_objc = bus.get_object(busname, server_path)
682
339
except dbus.exceptions.DBusException:
683
log.critical("Could not connect to Mandos server")
340
print("Could not connect to Mandos server",
686
344
mandos_serv = dbus.Interface(mandos_dbus_objc,
687
dbus_interface=server_interface)
688
mandos_serv_object_manager = dbus.Interface(
689
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
691
# Filter out log message from dbus module
692
dbus_logger = logging.getLogger("dbus.proxies")
693
class NullFilter(logging.Filter):
694
def filter(self, record):
696
dbus_filter = NullFilter()
345
dbus_interface = server_interface)
347
#block stderr since dbus library prints to stderr
348
null = os.open(os.path.devnull, os.O_RDWR)
349
stderrcopy = os.dup(sys.stderr.fileno())
350
os.dup2(null, sys.stderr.fileno())
698
dbus_logger.addFilter(dbus_filter)
699
mandos_clients = {path: ifs_and_props[client_interface]
700
for path, ifs_and_props in
701
mandos_serv_object_manager
702
.GetManagedObjects().items()
703
if client_interface in ifs_and_props}
704
except dbus.exceptions.DBusException as e:
705
log.critical("Failed to access Mandos server through D-Bus:"
354
mandos_clients = mandos_serv.GetAllClientsWithProperties()
357
os.dup2(stderrcopy, sys.stderr.fileno())
359
except dbus.exceptions.DBusException:
360
print("Access denied: Accessing mandos server through dbus.",
709
# restore dbus logger
710
dbus_logger.removeFilter(dbus_filter)
712
364
# Compile dict of (clients: properties) to process
716
clients = {bus.get_object(busname, path): properties
717
for path, properties in mandos_clients.items()}
367
if options.all or not options.client:
368
clients = dict((bus.get_object(busname, path), properties)
369
for path, properties in
370
mandos_clients.iteritems())
719
for name in clientnames:
720
for path, client in mandos_clients.items():
372
for name in options.client:
373
for path, client in mandos_clients.iteritems():
721
374
if client["Name"] == name:
722
375
client_objc = bus.get_object(busname, path)
723
376
clients[client_objc] = client
726
log.critical("Client not found on server: %r", name)
379
print("Client not found on server: {0!r}"
380
.format(name), file=sys.stderr)
729
# Run all commands on clients
730
for command in commands:
731
command.run(mandos_serv, clients)
734
class Test_milliseconds_to_string(unittest.TestCase):
736
self.assertEqual(milliseconds_to_string(93785000),
738
def test_no_days(self):
739
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
740
def test_all_zero(self):
741
self.assertEqual(milliseconds_to_string(0), "00:00:00")
742
def test_no_fractional_seconds(self):
743
self.assertEqual(milliseconds_to_string(400), "00:00:00")
744
self.assertEqual(milliseconds_to_string(900), "00:00:00")
745
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
747
class Test_string_to_delta(unittest.TestCase):
748
def test_handles_basic_rfc3339(self):
749
self.assertEqual(string_to_delta("PT2H"),
750
datetime.timedelta(0, 7200))
751
def test_falls_back_to_pre_1_6_1_with_warning(self):
752
# assertLogs only exists in Python 3.4
753
if hasattr(self, "assertLogs"):
754
with self.assertLogs(log, logging.WARNING):
755
value = string_to_delta("2h")
757
class WarningFilter(logging.Filter):
758
"""Don't show, but record the presence of, warnings"""
759
def filter(self, record):
760
is_warning = record.levelno >= logging.WARNING
761
self.found = is_warning or getattr(self, "found",
763
return not is_warning
764
warning_filter = WarningFilter()
765
log.addFilter(warning_filter)
767
value = string_to_delta("2h")
769
log.removeFilter(warning_filter)
770
self.assertTrue(getattr(warning_filter, "found", False))
771
self.assertEqual(value, datetime.timedelta(0, 7200))
774
class TestCmd(unittest.TestCase):
775
"""Abstract class for tests of command classes"""
778
class MockClient(object):
779
def __init__(self, name, **attributes):
780
self.__dbus_object_path__ = "objpath_{}".format(name)
781
self.attributes = attributes
782
self.attributes["Name"] = name
784
def Set(self, interface, property, value, dbus_interface):
785
testcase.assertEqual(interface, client_interface)
786
testcase.assertEqual(dbus_interface,
787
dbus.PROPERTIES_IFACE)
788
self.attributes[property] = value
789
self.calls.append(("Set", (interface, property, value,
791
def Get(self, interface, property, dbus_interface):
792
testcase.assertEqual(interface, client_interface)
793
testcase.assertEqual(dbus_interface,
794
dbus.PROPERTIES_IFACE)
795
self.calls.append(("Get", (interface, property,
797
return self.attributes[property]
798
def Approve(self, approve, dbus_interface):
799
testcase.assertEqual(dbus_interface, client_interface)
800
self.calls.append(("Approve", (approve,
802
def __getitem__(self, key):
803
return self.attributes[key]
804
def __setitem__(self, key, value):
805
self.attributes[key] = value
806
self.clients = collections.OrderedDict([
810
KeyID=("92ed150794387c03ce684574b1139a65"
811
"94a34f895daaaf09fd8ea90a27cddb12"),
813
Host="foo.example.org",
814
Enabled=dbus.Boolean(True),
816
LastCheckedOK="2019-02-03T00:00:00",
817
Created="2019-01-02T00:00:00",
819
Fingerprint=("778827225BA7DE539C5A"
820
"7CFA59CFF7CDBD9A5920"),
821
CheckerRunning=dbus.Boolean(False),
822
LastEnabled="2019-01-03T00:00:00",
823
ApprovalPending=dbus.Boolean(False),
824
ApprovedByDefault=dbus.Boolean(True),
825
LastApprovalRequest="",
827
ApprovalDuration=1000,
828
Checker="fping -q -- %(host)s",
829
ExtendedTimeout=900000,
830
Expires="2019-02-04T00:00:00",
831
LastCheckerStatus=0)),
835
KeyID=("0558568eedd67d622f5c83b35a115f79"
836
"6ab612cff5ad227247e46c2b020f441c"),
839
Enabled=dbus.Boolean(True),
841
LastCheckedOK="2019-02-04T00:00:00",
842
Created="2019-01-03T00:00:00",
844
Fingerprint=("3E393AEAEFB84C7E89E2"
845
"F547B3A107558FCA3A27"),
846
CheckerRunning=dbus.Boolean(True),
847
LastEnabled="2019-01-04T00:00:00",
848
ApprovalPending=dbus.Boolean(False),
849
ApprovedByDefault=dbus.Boolean(False),
850
LastApprovalRequest="2019-01-03T00:00:00",
852
ApprovalDuration=1000,
854
ExtendedTimeout=900000,
855
Expires="2019-02-05T00:00:00",
856
LastCheckerStatus=-2)),
858
self.client = self.clients["foo"]
860
class TestPrintTableCmd(TestCmd):
861
def test_normal(self):
862
output = PrintTableCmd().output(self.clients)
863
expected_output = """
864
Name Enabled Timeout Last Successful Check
865
foo Yes 00:05:00 2019-02-03T00:00:00
866
barbar Yes 00:05:00 2019-02-04T00:00:00
868
self.assertEqual(output, expected_output)
869
def test_verbose(self):
870
output = PrintTableCmd(verbose=True).output(self.clients)
871
expected_output = """
872
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
873
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
874
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
876
self.assertEqual(output, expected_output)
877
def test_one_client(self):
878
output = PrintTableCmd().output({"foo": self.client})
879
expected_output = """
880
Name Enabled Timeout Last Successful Check
881
foo Yes 00:05:00 2019-02-03T00:00:00
883
self.assertEqual(output, expected_output)
885
class TestDumpJSONCmd(TestCmd):
887
self.expected_json = {
890
"KeyID": ("92ed150794387c03ce684574b1139a65"
891
"94a34f895daaaf09fd8ea90a27cddb12"),
892
"Host": "foo.example.org",
895
"LastCheckedOK": "2019-02-03T00:00:00",
896
"Created": "2019-01-02T00:00:00",
898
"Fingerprint": ("778827225BA7DE539C5A"
899
"7CFA59CFF7CDBD9A5920"),
900
"CheckerRunning": False,
901
"LastEnabled": "2019-01-03T00:00:00",
902
"ApprovalPending": False,
903
"ApprovedByDefault": True,
904
"LastApprovalRequest": "",
906
"ApprovalDuration": 1000,
907
"Checker": "fping -q -- %(host)s",
908
"ExtendedTimeout": 900000,
909
"Expires": "2019-02-04T00:00:00",
910
"LastCheckerStatus": 0,
914
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
915
"6ab612cff5ad227247e46c2b020f441c"),
919
"LastCheckedOK": "2019-02-04T00:00:00",
920
"Created": "2019-01-03T00:00:00",
922
"Fingerprint": ("3E393AEAEFB84C7E89E2"
923
"F547B3A107558FCA3A27"),
924
"CheckerRunning": True,
925
"LastEnabled": "2019-01-04T00:00:00",
926
"ApprovalPending": False,
927
"ApprovedByDefault": False,
928
"LastApprovalRequest": "2019-01-03T00:00:00",
929
"ApprovalDelay": 30000,
930
"ApprovalDuration": 1000,
932
"ExtendedTimeout": 900000,
933
"Expires": "2019-02-05T00:00:00",
934
"LastCheckerStatus": -2,
937
return super(TestDumpJSONCmd, self).setUp()
938
def test_normal(self):
939
json_data = json.loads(DumpJSONCmd().output(self.clients))
940
self.assertDictEqual(json_data, self.expected_json)
941
def test_one_client(self):
942
clients = {"foo": self.client}
943
json_data = json.loads(DumpJSONCmd().output(clients))
944
expected_json = {"foo": self.expected_json["foo"]}
945
self.assertDictEqual(json_data, expected_json)
947
class TestIsEnabledCmd(TestCmd):
948
def test_is_enabled(self):
949
self.assertTrue(all(IsEnabledCmd().is_enabled(client)
950
for client in self.clients.values()))
951
def test_is_enabled_does_get_attribute(self):
952
self.assertTrue(IsEnabledCmd().is_enabled(self.client))
953
self.assertListEqual(self.client.calls,
955
("se.recompile.Mandos.Client",
957
"org.freedesktop.DBus.Properties"))])
958
def test_is_enabled_run_exits_successfully(self):
959
with self.assertRaises(SystemExit) as e:
960
IsEnabledCmd().run(None, [self.client])
961
if e.exception.code is not None:
962
self.assertEqual(e.exception.code, 0)
964
self.assertIsNone(e.exception.code)
965
def test_is_enabled_run_exits_with_failure(self):
966
self.client["Enabled"] = dbus.Boolean(False)
967
with self.assertRaises(SystemExit) as e:
968
IsEnabledCmd().run(None, [self.client])
969
if isinstance(e.exception.code, int):
970
self.assertNotEqual(e.exception.code, 0)
972
self.assertIsNotNone(e.exception.code)
974
class TestRemoveCmd(TestCmd):
975
def test_remove(self):
976
class MockMandos(object):
979
def RemoveClient(self, dbus_path):
980
self.calls.append(("RemoveClient", (dbus_path,)))
981
mandos = MockMandos()
982
RemoveCmd().run(mandos, [self.client])
983
self.assertEqual(len(mandos.calls), 1)
984
self.assertListEqual(mandos.calls,
986
(self.client.__dbus_object_path__,))])
988
class TestApproveCmd(TestCmd):
989
def test_approve(self):
990
ApproveCmd().run(None, [self.client])
991
self.assertListEqual(self.client.calls,
992
[("Approve", (True, client_interface))])
993
class TestDenyCmd(TestCmd):
994
def test_approve(self):
995
DenyCmd().run(None, [self.client])
996
self.assertListEqual(self.client.calls,
997
[("Approve", (False, client_interface))])
1001
def should_only_run_tests():
1002
parser = argparse.ArgumentParser(add_help=False)
1003
parser.add_argument("--check", action='store_true')
1004
args, unknown_args = parser.parse_known_args()
1005
run_tests = args.check
1007
# Remove --check argument from sys.argv
1008
sys.argv[1:] = unknown_args
1011
# Add all tests from doctest strings
1012
def load_tests(loader, tests, none):
1014
tests.addTests(doctest.DocTestSuite())
383
if not has_actions(options) and clients:
385
keywords = ("Name", "Enabled", "Timeout",
386
"LastCheckedOK", "Created", "Interval",
387
"Host", "Fingerprint", "CheckerRunning",
388
"LastEnabled", "ApprovalPending",
390
"LastApprovalRequest", "ApprovalDelay",
391
"ApprovalDuration", "Checker",
394
keywords = defaultkeywords
396
print_clients(clients.values(), keywords)
398
# Process each client in the list by all selected options
399
for client in clients:
400
def set_client_prop(prop, value):
401
"""Set a Client D-Bus property"""
402
client.Set(client_interface, prop, value,
403
dbus_interface=dbus.PROPERTIES_IFACE)
404
def set_client_prop_ms(prop, value):
405
"""Set a Client D-Bus property, converted
406
from a string to milliseconds."""
407
set_client_prop(prop,
408
timedelta_to_milliseconds
409
(string_to_delta(value)))
411
mandos_serv.RemoveClient(client.__dbus_object_path__)
413
set_client_prop("Enabled", dbus.Boolean(True))
415
set_client_prop("Enabled", dbus.Boolean(False))
416
if options.bump_timeout:
417
set_client_prop("LastCheckedOK", "")
418
if options.start_checker:
419
set_client_prop("CheckerRunning", dbus.Boolean(True))
420
if options.stop_checker:
421
set_client_prop("CheckerRunning", dbus.Boolean(False))
422
if options.is_enabled:
423
sys.exit(0 if client.Get(client_interface,
426
dbus.PROPERTIES_IFACE)
428
if options.checker is not None:
429
set_client_prop("Checker", options.checker)
430
if options.host is not None:
431
set_client_prop("Host", options.host)
432
if options.interval is not None:
433
set_client_prop_ms("Interval", options.interval)
434
if options.approval_delay is not None:
435
set_client_prop_ms("ApprovalDelay",
436
options.approval_delay)
437
if options.approval_duration is not None:
438
set_client_prop_ms("ApprovalDuration",
439
options.approval_duration)
440
if options.timeout is not None:
441
set_client_prop_ms("Timeout", options.timeout)
442
if options.extended_timeout is not None:
443
set_client_prop_ms("ExtendedTimeout",
444
options.extended_timeout)
445
if options.secret is not None:
446
set_client_prop("Secret",
447
dbus.ByteArray(options.secret.read()))
448
if options.approved_by_default is not None:
449
set_client_prop("ApprovedByDefault",
451
.approved_by_default))
453
client.Approve(dbus.Boolean(True),
454
dbus_interface=client_interface)
456
client.Approve(dbus.Boolean(False),
457
dbus_interface=client_interface)
1017
459
if __name__ == "__main__":
1018
if should_only_run_tests():
1019
# Call using ./tdd-python-script --check [--verbose]