519
def add_command_line_options(parser):
520
def commands_and_clients_from_options(args=None):
523
parser = argparse.ArgumentParser()
520
524
parser.add_argument("--version", action="version",
521
525
version="%(prog)s {}".format(version),
522
526
help="show version number and exit")
579
583
parser.add_argument("--check", action="store_true",
580
584
help="Run self-test")
581
585
parser.add_argument("client", nargs="*", help="Client name")
584
def commands_and_clients_from_options(options):
586
options = parser.parse_args(args=args)
588
if has_actions(options) and not (options.client or options.all):
589
parser.error("Options require clients names or --all.")
590
if options.verbose and has_actions(options):
591
parser.error("--verbose can only be used alone.")
592
if options.dump_json and (options.verbose
593
or has_actions(options)):
594
parser.error("--dump-json can only be used alone.")
595
if options.all and not has_actions(options):
596
parser.error("--all requires an action.")
597
if options.is_enabled and len(options.client) > 1:
598
parser.error("--is-enabled requires exactly one client")
659
parser = argparse.ArgumentParser()
661
add_command_line_options(parser)
663
options = parser.parse_args()
665
if has_actions(options) and not (options.client or options.all):
666
parser.error("Options require clients names or --all.")
667
if options.verbose and has_actions(options):
668
parser.error("--verbose can only be used alone.")
669
if options.dump_json and (options.verbose
670
or has_actions(options)):
671
parser.error("--dump-json can only be used alone.")
672
if options.all and not has_actions(options):
673
parser.error("--all requires an action.")
674
if options.is_enabled and len(options.client) > 1:
675
parser.error("--is-enabled requires exactly one client")
677
commands, clientnames = commands_and_clients_from_options(options)
673
commands, clientnames = commands_and_clients_from_options()
680
676
bus = dbus.SystemBus()
694
690
def filter(self, record):
696
692
dbus_filter = NullFilter()
693
dbus_logger.addFilter(dbus_filter)
698
dbus_logger.addFilter(dbus_filter)
699
mandos_clients = {path: ifs_and_props[client_interface]
700
for path, ifs_and_props in
701
mandos_serv_object_manager
702
.GetManagedObjects().items()
703
if client_interface in ifs_and_props}
696
mandos_clients = {path: ifs_and_props[client_interface]
697
for path, ifs_and_props in
698
mandos_serv_object_manager
699
.GetManagedObjects().items()
700
if client_interface in ifs_and_props}
702
# restore dbus logger
703
dbus_logger.removeFilter(dbus_filter)
704
704
except dbus.exceptions.DBusException as e:
705
705
log.critical("Failed to access Mandos server through D-Bus:"
709
# restore dbus logger
710
dbus_logger.removeFilter(dbus_filter)
712
709
# Compile dict of (clients: properties) to process
780
777
self.__dbus_object_path__ = "objpath_{}".format(name)
781
778
self.attributes = attributes
782
779
self.attributes["Name"] = name
784
def Set(self, interface, property, value, dbus_interface):
780
def Set(interface, property, value,
781
properties_interface):
785
782
testcase.assertEqual(interface, client_interface)
786
testcase.assertEqual(dbus_interface,
783
testcase.assertEqual(properties_interface,
787
784
dbus.PROPERTIES_IFACE)
788
785
self.attributes[property] = value
789
self.calls.append(("Set", (interface, property, value,
791
def Get(self, interface, property, dbus_interface):
786
def Get(interface, property, properties_interface):
792
787
testcase.assertEqual(interface, client_interface)
793
testcase.assertEqual(dbus_interface,
788
testcase.assertEqual(properties_interface,
794
789
dbus.PROPERTIES_IFACE)
795
self.calls.append(("Get", (interface, property,
797
790
return self.attributes[property]
798
791
def __getitem__(self, key):
799
792
return self.attributes[key]
800
def __setitem__(self, key, value):
801
self.attributes[key] = value
802
793
self.clients = collections.OrderedDict([
939
930
expected_json = {"foo": self.expected_json["foo"]}
940
931
self.assertDictEqual(json_data, expected_json)
942
class TestIsEnabledCmd(TestCmd):
943
def test_is_enabled(self):
944
self.assertTrue(all(IsEnabledCmd().is_enabled(client)
945
for client in self.clients.values()))
946
def test_is_enabled_does_get_attribute(self):
947
client = self.clients["foo"]
948
self.assertTrue(IsEnabledCmd().is_enabled(client))
949
self.assertListEqual(client.calls,
951
("se.recompile.Mandos.Client",
953
"org.freedesktop.DBus.Properties"))])
954
def test_is_enabled_run_exits_successfully(self):
955
client = self.clients["foo"]
956
with self.assertRaises(SystemExit) as e:
957
IsEnabledCmd().run(None, [client])
958
if e.exception.code is not None:
959
self.assertEqual(e.exception.code, 0)
961
self.assertIsNone(e.exception.code)
962
def test_is_enabled_run_exits_with_failure(self):
963
client = self.clients["foo"]
964
client["Enabled"] = dbus.Boolean(False)
965
with self.assertRaises(SystemExit) as e:
966
IsEnabledCmd().run(None, [client])
967
if isinstance(e.exception.code, int):
968
self.assertNotEqual(e.exception.code, 0)
970
self.assertIsNotNone(e.exception.code)
973
class TestRemoveCmd(TestCmd):
974
def test_remove(self):
975
client = self.clients["foo"]
976
class MockMandos(object):
979
def RemoveClient(self, dbus_path):
980
self.calls.append(("RemoveClient", (dbus_path,)))
981
mandos = MockMandos()
982
RemoveCmd().run(mandos, [client])
983
self.assertEqual(len(mandos.calls), 1)
984
self.assertListEqual(mandos.calls,
986
(client.__dbus_object_path__,))])
990
934
def should_only_run_tests():
991
935
parser = argparse.ArgumentParser(add_help=False)