297
292
"LastApprovalRequest", "ApprovalDelay",
298
293
"ApprovalDuration", "Checker", "ExtendedTimeout",
299
294
"Expires", "LastCheckerStatus")
300
def run(self, clients, bus=None, mandos=None):
301
print(self.output(clients.values()))
302
def output(self, clients):
303
raise NotImplementedError()
295
def run(self, mandos, clients):
296
print(self.output(clients))
305
298
class PropertyCmd(Command):
306
299
"""Abstract class for Actions for setting one client property"""
307
300
def run_on_one_client(self, client, properties):
308
301
"""Set the Client's D-Bus property"""
309
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
310
client.__dbus_object_path__,
311
dbus.PROPERTIES_IFACE, client_dbus_interface,
312
self.propname, self.value_to_set
313
if not isinstance(self.value_to_set, dbus.Boolean)
314
else bool(self.value_to_set))
315
client.Set(client_dbus_interface, self.propname,
302
client.Set(client_interface, self.property, self.value_to_set,
317
303
dbus_interface=dbus.PROPERTIES_IFACE)
320
raise NotImplementedError()
322
class PropertyValueCmd(PropertyCmd):
323
"""Abstract class for PropertyCmd recieving a value as argument"""
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
324
307
def __init__(self, value):
325
308
self.value_to_set = value
327
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
328
"""Abstract class for PropertyValueCmd taking a value argument as
329
a datetime.timedelta() but should store it as milliseconds."""
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
331
314
def value_to_set(self):
333
316
@value_to_set.setter
334
317
def value_to_set(self, value):
335
"""When setting, convert value from a datetime.timedelta"""
318
"""When setting, convert value to a datetime.timedelta"""
336
319
self._vts = int(round(value.total_seconds() * 1000))
338
321
# Actual (non-abstract) command classes
441
423
class IsEnabledCmd(Command):
442
def run(self, clients, bus=None, mandos=None):
443
client, properties = next(iter(clients.items()))
424
def run_on_one_client(self, client, properties):
444
425
if self.is_enabled(client, properties):
447
428
def is_enabled(self, client, properties):
448
return properties["Enabled"]
429
return bool(properties["Enabled"])
450
431
class RemoveCmd(Command):
451
432
def run_on_one_client(self, client, properties):
452
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
453
server_dbus_path, server_dbus_interface,
454
str(client.__dbus_object_path__))
455
433
self.mandos.RemoveClient(client.__dbus_object_path__)
457
435
class ApproveCmd(Command):
458
436
def run_on_one_client(self, client, properties):
459
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
460
client.__dbus_object_path__, client_dbus_interface)
461
437
client.Approve(dbus.Boolean(True),
462
dbus_interface=client_dbus_interface)
438
dbus_interface=client_interface)
464
440
class DenyCmd(Command):
465
441
def run_on_one_client(self, client, properties):
466
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
467
client.__dbus_object_path__, client_dbus_interface)
468
442
client.Approve(dbus.Boolean(False),
469
dbus_interface=client_dbus_interface)
443
dbus_interface=client_interface)
471
445
class EnableCmd(PropertyCmd):
473
447
value_to_set = dbus.Boolean(True)
475
449
class DisableCmd(PropertyCmd):
477
451
value_to_set = dbus.Boolean(False)
479
453
class BumpTimeoutCmd(PropertyCmd):
480
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
481
455
value_to_set = ""
483
457
class StartCheckerCmd(PropertyCmd):
484
propname = "CheckerRunning"
458
property = "CheckerRunning"
485
459
value_to_set = dbus.Boolean(True)
487
461
class StopCheckerCmd(PropertyCmd):
488
propname = "CheckerRunning"
462
property = "CheckerRunning"
489
463
value_to_set = dbus.Boolean(False)
491
465
class ApproveByDefaultCmd(PropertyCmd):
492
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
493
467
value_to_set = dbus.Boolean(True)
495
469
class DenyByDefaultCmd(PropertyCmd):
496
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
497
471
value_to_set = dbus.Boolean(False)
499
class SetCheckerCmd(PropertyValueCmd):
502
class SetHostCmd(PropertyValueCmd):
505
class SetSecretCmd(PropertyValueCmd):
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
508
481
def value_to_set(self):
512
485
"""When setting, read data from supplied file object"""
513
486
self._vts = value.read()
516
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
520
propname = "ExtendedTimeout"
522
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
523
propname = "Interval"
525
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
526
propname = "ApprovalDelay"
528
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
529
propname = "ApprovalDuration"
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
531
508
def add_command_line_options(parser):
532
509
parser.add_argument("--version", action="version",
704
682
parser.error("--all requires an action.")
705
683
if options.is_enabled and len(options.client) > 1:
706
684
parser.error("--is-enabled requires exactly one client")
708
options.remove = False
709
if has_actions(options) and not options.deny:
710
parser.error("--remove can only be combined with --deny")
711
options.remove = True
715
parser = argparse.ArgumentParser()
717
add_command_line_options(parser)
719
options = parser.parse_args()
721
check_option_syntax(parser, options)
723
686
clientnames = options.client
726
log.setLevel(logging.DEBUG)
729
689
bus = dbus.SystemBus()
730
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
731
dbus_busname, server_dbus_path)
732
mandos_dbus_objc = bus.get_object(dbus_busname,
690
mandos_dbus_objc = bus.get_object(busname, server_path)
734
691
except dbus.exceptions.DBusException:
735
692
log.critical("Could not connect to Mandos server")
738
695
mandos_serv = dbus.Interface(mandos_dbus_objc,
739
dbus_interface=server_dbus_interface)
696
dbus_interface=server_interface)
740
697
mandos_serv_object_manager = dbus.Interface(
741
698
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
838
794
class MockClient(object):
839
795
def __init__(self, name, **attributes):
840
self.__dbus_object_path__ = "/clients/{}".format(name)
796
self.__dbus_object_path__ = "objpath_{}".format(name)
841
797
self.attributes = attributes
842
798
self.attributes["Name"] = name
844
def Set(self, interface, propname, value, dbus_interface):
845
testcase.assertEqual(interface, client_dbus_interface)
846
testcase.assertEqual(dbus_interface,
847
dbus.PROPERTIES_IFACE)
848
self.attributes[propname] = value
849
def Get(self, interface, propname, dbus_interface):
850
testcase.assertEqual(interface, client_dbus_interface)
851
testcase.assertEqual(dbus_interface,
852
dbus.PROPERTIES_IFACE)
853
return self.attributes[propname]
800
def Set(self, interface, property, value, dbus_interface):
801
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
803
dbus.PROPERTIES_IFACE)
804
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
return self.attributes[property]
854
810
def Approve(self, approve, dbus_interface):
855
testcase.assertEqual(dbus_interface,
856
client_dbus_interface)
811
testcase.assertEqual(dbus_interface, client_interface)
857
812
self.calls.append(("Approve", (approve,
858
813
dbus_interface)))
859
814
self.client = MockClient(
906
861
LastCheckerStatus=-2)
907
862
self.clients = collections.OrderedDict(
909
("/clients/foo", self.client.attributes),
910
("/clients/barbar", self.other_client.attributes),
864
(self.client, self.client.attributes),
865
(self.other_client, self.other_client.attributes),
912
self.one_client = {"/clients/foo": self.client.attributes}
917
def get_object(client_bus_name, path):
918
self.assertEqual(client_bus_name, dbus_busname)
920
"/clients/foo": self.client,
921
"/clients/barbar": self.other_client,
867
self.one_client = {self.client: self.client.attributes}
925
869
class TestPrintTableCmd(TestCmd):
926
870
def test_normal(self):
927
output = PrintTableCmd().output(self.clients.values())
928
expected_output = "\n".join((
929
"Name Enabled Timeout Last Successful Check",
930
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
931
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
871
output = PrintTableCmd().output(self.clients)
872
expected_output = """
873
Name Enabled Timeout Last Successful Check
874
foo Yes 00:05:00 2019-02-03T00:00:00
875
barbar Yes 00:05:00 2019-02-04T00:00:00
933
877
self.assertEqual(output, expected_output)
934
878
def test_verbose(self):
935
output = PrintTableCmd(verbose=True).output(
936
self.clients.values())
937
expected_output = "\n".join((
938
# First line (headers)
939
"Name Enabled Timeout Last Successful Check Created "
940
" Interval Host Key ID "
942
" Check Is Running Last Enabl"
943
"ed Approval Is Pending Approved By Default Last A"
944
"pproval Request Approval Delay Approval Duration Checker"
945
" Extended Timeout Expires Last "
947
# Second line (client "foo")
948
"foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02"
949
"T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684"
950
"574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7"
951
"DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03"
953
" 00:00:00 00:00:01 fping -"
954
"q -- %(host)s 00:15:00 2019-02-04T00:00:00 0 "
956
# Third line (client "barbar")
957
"barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03"
958
"T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c8"
959
"3b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB8"
960
"4C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04"
961
"T00:00:00 No No 2019-0"
962
"1-03T00:00:00 00:00:30 00:00:01 : "
963
" 00:15:00 2019-02-05T00:00:00 -2 "
879
output = PrintTableCmd(verbose=True).output(self.clients)
880
expected_output = """
881
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
882
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
883
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
966
885
self.assertEqual(output, expected_output)
967
886
def test_one_client(self):
968
output = PrintTableCmd().output(self.one_client.values())
887
output = PrintTableCmd().output(self.one_client)
969
888
expected_output = """
970
889
Name Enabled Timeout Last Successful Check
971
890
foo Yes 00:05:00 2019-02-03T00:00:00
1065
982
self.calls.append(("RemoveClient", (dbus_path,)))
1066
983
mandos = MockMandos()
1067
984
super(TestRemoveCmd, self).setUp()
1068
RemoveCmd().run(self.clients, self.bus, mandos)
985
RemoveCmd().run(mandos, self.clients)
1069
986
self.assertEqual(len(mandos.calls), 2)
1070
for clientpath in self.clients:
1071
self.assertIn(("RemoveClient", (clientpath,)),
987
for client in self.clients:
988
self.assertIn(("RemoveClient",
989
(client.__dbus_object_path__,)),
1074
992
class TestApproveCmd(TestCmd):
1075
993
def test_approve(self):
1076
ApproveCmd().run(self.clients, self.bus)
1077
for clientpath in self.clients:
1078
client = self.bus.get_object(dbus_busname, clientpath)
1079
self.assertIn(("Approve", (True, client_dbus_interface)),
994
ApproveCmd().run(None, self.clients)
995
for client in self.clients:
996
self.assertIn(("Approve", (True, client_interface)),
1082
999
class TestDenyCmd(TestCmd):
1083
1000
def test_deny(self):
1084
DenyCmd().run(self.clients, self.bus)
1085
for clientpath in self.clients:
1086
client = self.bus.get_object(dbus_busname, clientpath)
1087
self.assertIn(("Approve", (False, client_dbus_interface)),
1001
DenyCmd().run(None, self.clients)
1002
for client in self.clients:
1003
self.assertIn(("Approve", (False, client_interface)),
1090
1006
class TestEnableCmd(TestCmd):
1091
1007
def test_enable(self):
1092
for clientpath in self.clients:
1093
client = self.bus.get_object(dbus_busname, clientpath)
1008
for client in self.clients:
1094
1009
client.attributes["Enabled"] = False
1096
EnableCmd().run(self.clients, self.bus)
1011
EnableCmd().run(None, self.clients)
1098
for clientpath in self.clients:
1099
client = self.bus.get_object(dbus_busname, clientpath)
1013
for client in self.clients:
1100
1014
self.assertTrue(client.attributes["Enabled"])
1102
1016
class TestDisableCmd(TestCmd):
1103
1017
def test_disable(self):
1104
DisableCmd().run(self.clients, self.bus)
1105
for clientpath in self.clients:
1106
client = self.bus.get_object(dbus_busname, clientpath)
1018
DisableCmd().run(None, self.clients)
1020
for client in self.clients:
1107
1021
self.assertFalse(client.attributes["Enabled"])
1109
1023
class Unique(object):
1119
1033
self.values_to_set)
1120
1034
for value_to_set, value_to_get in zip(self.values_to_set,
1121
1035
values_to_get):
1122
for clientpath in self.clients:
1123
client = self.bus.get_object(dbus_busname, clientpath)
1124
old_value = client.attributes[self.propname]
1036
for client in self.clients:
1037
old_value = client.attributes[self.property]
1125
1038
self.assertNotIsInstance(old_value, Unique)
1126
client.attributes[self.propname] = Unique()
1039
client.attributes[self.property] = Unique()
1127
1040
self.run_command(value_to_set, self.clients)
1128
for clientpath in self.clients:
1129
client = self.bus.get_object(dbus_busname, clientpath)
1130
value = client.attributes[self.propname]
1041
for client in self.clients:
1042
value = client.attributes[self.property]
1131
1043
self.assertNotIsInstance(value, Unique)
1132
1044
self.assertEqual(value, value_to_get)
1133
1045
def run_command(self, value, clients):
1134
self.command().run(clients, self.bus)
1046
self.command().run(None, clients)
1136
1048
class TestBumpTimeoutCmd(TestPropertyCmd):
1137
1049
command = BumpTimeoutCmd
1138
propname = "LastCheckedOK"
1050
property = "LastCheckedOK"
1139
1051
values_to_set = [""]
1141
1053
class TestStartCheckerCmd(TestPropertyCmd):
1142
1054
command = StartCheckerCmd
1143
propname = "CheckerRunning"
1055
property = "CheckerRunning"
1144
1056
values_to_set = [dbus.Boolean(True)]
1146
1058
class TestStopCheckerCmd(TestPropertyCmd):
1147
1059
command = StopCheckerCmd
1148
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1149
1061
values_to_set = [dbus.Boolean(False)]
1151
1063
class TestApproveByDefaultCmd(TestPropertyCmd):
1152
1064
command = ApproveByDefaultCmd
1153
propname = "ApprovedByDefault"
1065
property = "ApprovedByDefault"
1154
1066
values_to_set = [dbus.Boolean(True)]
1156
1068
class TestDenyByDefaultCmd(TestPropertyCmd):
1157
1069
command = DenyByDefaultCmd
1158
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1159
1071
values_to_set = [dbus.Boolean(False)]
1161
class TestPropertyValueCmd(TestPropertyCmd):
1162
"""Abstract class for tests of PropertyValueCmd classes"""
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1074
"""Abstract class for tests of PropertyCmd classes using the
1075
ValueArgumentMixIn"""
1163
1076
def runTest(self):
1164
if type(self) is TestPropertyValueCmd:
1077
if type(self) is TestValueArgumentPropertyCmd:
1166
return super(TestPropertyValueCmd, self).runTest()
1079
return super(TestValueArgumentPropertyCmd, self).runTest()
1167
1080
def run_command(self, value, clients):
1168
self.command(value).run(clients, self.bus)
1081
self.command(value).run(None, clients)
1170
class TestSetCheckerCmd(TestPropertyValueCmd):
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1171
1084
command = SetCheckerCmd
1172
propname = "Checker"
1085
property = "Checker"
1173
1086
values_to_set = ["", ":", "fping -q -- %s"]
1175
class TestSetHostCmd(TestPropertyValueCmd):
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1176
1089
command = SetHostCmd
1178
1091
values_to_set = ["192.0.2.3", "foo.example.org"]
1180
class TestSetSecretCmd(TestPropertyValueCmd):
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1181
1094
command = SetSecretCmd
1183
values_to_set = [io.BytesIO(b""),
1096
values_to_set = [open("/dev/null", "rb"),
1184
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1185
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1187
class TestSetTimeoutCmd(TestPropertyValueCmd):
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1188
1101
command = SetTimeoutCmd
1189
propname = "Timeout"
1102
property = "Timeout"
1190
1103
values_to_set = [datetime.timedelta(),
1191
1104
datetime.timedelta(minutes=5),
1192
1105
datetime.timedelta(seconds=1),
1258
1169
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1261
def test_print_table_verbose_short(self):
1262
self.assert_command_from_args(["-v"], PrintTableCmd,
1265
1172
def test_enable(self):
1266
1173
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1268
def test_enable_short(self):
1269
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1271
1175
def test_disable(self):
1272
1176
self.assert_command_from_args(["--disable", "foo"],
1275
def test_disable_short(self):
1276
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1278
1179
def test_bump_timeout(self):
1279
1180
self.assert_command_from_args(["--bump-timeout", "foo"],
1280
1181
BumpTimeoutCmd)
1282
def test_bump_timeout_short(self):
1283
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1285
1183
def test_start_checker(self):
1286
1184
self.assert_command_from_args(["--start-checker", "foo"],
1287
1185
StartCheckerCmd)
1377
1253
"foo"], SetSecretCmd,
1378
1254
value_to_set=value)
1380
def test_secret_devnull_short(self):
1381
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1382
SetSecretCmd, value_to_set=b"")
1384
def test_secret_tempfile_short(self):
1385
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1386
value = b"secret\0xyzzy\nbar"
1389
self.assert_command_from_args(["-s", f.name, "foo"],
1393
1256
def test_approve(self):
1394
1257
self.assert_command_from_args(["--approve", "foo"],
1397
def test_approve_short(self):
1398
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1400
1260
def test_deny(self):
1401
1261
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1403
def test_deny_short(self):
1404
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1406
1263
def test_dump_json(self):
1407
1264
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1410
1267
self.assert_command_from_args(["--is-enabled", "foo"],
1413
def test_is_enabled_short(self):
1414
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1416
def test_deny_before_remove(self):
1417
options = self.parser.parse_args(["--deny", "--remove",
1419
check_option_syntax(self.parser, options)
1420
commands = commands_from_options(options)
1421
self.assertEqual(len(commands), 2)
1422
self.assertIsInstance(commands[0], DenyCmd)
1423
self.assertIsInstance(commands[1], RemoveCmd)
1425
def test_deny_before_remove_reversed(self):
1426
options = self.parser.parse_args(["--remove", "--deny",
1428
check_option_syntax(self.parser, options)
1429
commands = commands_from_options(options)
1430
self.assertEqual(len(commands), 2)
1431
self.assertIsInstance(commands[0], DenyCmd)
1432
self.assertIsInstance(commands[1], RemoveCmd)
1435
class Test_check_option_syntax(unittest.TestCase):
1436
# This mostly corresponds to the definition from has_actions() in
1437
# check_option_syntax()
1439
# The actual values set here are not that important, but we do
1440
# at least stick to the correct types, even though they are
1444
"bump_timeout": True,
1445
"start_checker": True,
1446
"stop_checker": True,
1450
"timeout": datetime.timedelta(),
1451
"extended_timeout": datetime.timedelta(),
1452
"interval": datetime.timedelta(),
1453
"approved_by_default": True,
1454
"approval_delay": datetime.timedelta(),
1455
"approval_duration": datetime.timedelta(),
1457
"secret": io.BytesIO(b"x"),
1463
self.parser = argparse.ArgumentParser()
1464
add_command_line_options(self.parser)
1466
@contextlib.contextmanager
1467
def assertParseError(self):
1468
with self.assertRaises(SystemExit) as e:
1469
with self.temporarily_suppress_stderr():
1471
# Exit code from argparse is guaranteed to be "2". Reference:
1472
# https://docs.python.org/3/library
1473
# /argparse.html#exiting-methods
1474
self.assertEqual(e.exception.code, 2)
1477
@contextlib.contextmanager
1478
def temporarily_suppress_stderr():
1479
null = os.open(os.path.devnull, os.O_RDWR)
1480
stderrcopy = os.dup(sys.stderr.fileno())
1481
os.dup2(null, sys.stderr.fileno())
1487
os.dup2(stderrcopy, sys.stderr.fileno())
1488
os.close(stderrcopy)
1490
def check_option_syntax(self, options):
1491
check_option_syntax(self.parser, options)
1493
def test_actions_requires_client_or_all(self):
1494
for action, value in self.actions.items():
1495
options = self.parser.parse_args()
1496
setattr(options, action, value)
1497
with self.assertParseError():
1498
self.check_option_syntax(options)
1500
def test_actions_conflicts_with_verbose(self):
1501
for action, value in self.actions.items():
1502
options = self.parser.parse_args()
1503
setattr(options, action, value)
1504
options.verbose = True
1505
with self.assertParseError():
1506
self.check_option_syntax(options)
1508
def test_dump_json_conflicts_with_verbose(self):
1509
options = self.parser.parse_args()
1510
options.dump_json = True
1511
options.verbose = True
1512
with self.assertParseError():
1513
self.check_option_syntax(options)
1515
def test_dump_json_conflicts_with_action(self):
1516
for action, value in self.actions.items():
1517
options = self.parser.parse_args()
1518
setattr(options, action, value)
1519
options.dump_json = True
1520
with self.assertParseError():
1521
self.check_option_syntax(options)
1523
def test_all_can_not_be_alone(self):
1524
options = self.parser.parse_args()
1526
with self.assertParseError():
1527
self.check_option_syntax(options)
1529
def test_all_is_ok_with_any_action(self):
1530
for action, value in self.actions.items():
1531
options = self.parser.parse_args()
1532
setattr(options, action, value)
1534
self.check_option_syntax(options)
1536
def test_is_enabled_fails_without_client(self):
1537
options = self.parser.parse_args()
1538
options.is_enabled = True
1539
with self.assertParseError():
1540
self.check_option_syntax(options)
1542
def test_is_enabled_works_with_one_client(self):
1543
options = self.parser.parse_args()
1544
options.is_enabled = True
1545
options.client = ["foo"]
1546
self.check_option_syntax(options)
1548
def test_is_enabled_fails_with_two_clients(self):
1549
options = self.parser.parse_args()
1550
options.is_enabled = True
1551
options.client = ["foo", "barbar"]
1552
with self.assertParseError():
1553
self.check_option_syntax(options)
1555
def test_remove_can_only_be_combined_with_action_deny(self):
1556
for action, value in self.actions.items():
1557
if action in {"remove", "deny"}:
1559
options = self.parser.parse_args()
1560
setattr(options, action, value)
1562
options.remove = True
1563
with self.assertParseError():
1564
self.check_option_syntax(options)
1568
1272
def should_only_run_tests():