300
299
"""Abstract class for Actions for setting one client property"""
301
300
def run_on_one_client(self, client, properties):
302
301
"""Set the Client's D-Bus property"""
303
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
client.__dbus_object_path__,
305
dbus.PROPERTIES_IFACE, client_interface,
306
self.property, self.value_to_set
307
if not isinstance(self.value_to_set, dbus.Boolean)
308
else bool(self.value_to_set))
309
302
client.Set(client_interface, self.property, self.value_to_set,
310
303
dbus_interface=dbus.PROPERTIES_IFACE)
438
431
class RemoveCmd(Command):
439
432
def run_on_one_client(self, client, properties):
440
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
441
server_path, server_interface,
442
str(client.__dbus_object_path__))
443
433
self.mandos.RemoveClient(client.__dbus_object_path__)
445
435
class ApproveCmd(Command):
446
436
def run_on_one_client(self, client, properties):
447
log.debug("D-Bus: %s:%s.Approve(True)",
448
client.__dbus_object_path__, client_interface)
449
437
client.Approve(dbus.Boolean(True),
450
438
dbus_interface=client_interface)
452
440
class DenyCmd(Command):
453
441
def run_on_one_client(self, client, properties):
454
log.debug("D-Bus: %s:%s.Approve(False)",
455
client.__dbus_object_path__, client_interface)
456
442
client.Approve(dbus.Boolean(False),
457
443
dbus_interface=client_interface)
519
505
MillisecondsValueArgumentMixIn):
520
506
property = "ApprovalDuration"
508
def has_actions(options):
509
return any((options.enable,
511
options.bump_timeout,
512
options.start_checker,
513
options.stop_checker,
516
options.checker is not None,
517
options.timeout is not None,
518
options.extended_timeout is not None,
519
options.interval is not None,
520
options.approved_by_default is not None,
521
options.approval_delay is not None,
522
options.approval_duration is not None,
523
options.host is not None,
524
options.secret is not None,
522
528
def add_command_line_options(parser):
523
529
parser.add_argument("--version", action="version",
524
530
version="%(prog)s {}".format(version),
550
556
help="Remove client")
551
557
parser.add_argument("-c", "--checker",
552
558
help="Set checker command for client")
553
parser.add_argument("-t", "--timeout", type=string_to_delta,
559
parser.add_argument("-t", "--timeout",
554
560
help="Set timeout for client")
555
parser.add_argument("--extended-timeout", type=string_to_delta,
561
parser.add_argument("--extended-timeout",
556
562
help="Set extended timeout for client")
557
parser.add_argument("-i", "--interval", type=string_to_delta,
563
parser.add_argument("-i", "--interval",
558
564
help="Set checker interval for client")
559
565
approve_deny_default = parser.add_mutually_exclusive_group()
560
566
approve_deny_default.add_argument(
565
571
"--deny-by-default", action="store_false",
566
572
dest="approved_by_default",
567
573
help="Set client to be denied by default")
568
parser.add_argument("--approval-delay", type=string_to_delta,
574
parser.add_argument("--approval-delay",
569
575
help="Set delay before client approve/deny")
570
parser.add_argument("--approval-duration", type=string_to_delta,
576
parser.add_argument("--approval-duration",
571
577
help="Set duration of one client approval")
572
578
parser.add_argument("-H", "--host", help="Set host for client")
573
579
parser.add_argument("-s", "--secret",
663
def check_option_syntax(parser, options):
664
"""Apply additional restrictions on options, not expressible in
667
def has_actions(options):
668
return any((options.enable,
670
options.bump_timeout,
671
options.start_checker,
672
options.stop_checker,
675
options.checker is not None,
676
options.timeout is not None,
677
options.extended_timeout is not None,
678
options.interval is not None,
679
options.approved_by_default is not None,
680
options.approval_delay is not None,
681
options.approval_duration is not None,
682
options.host is not None,
683
options.secret is not None,
668
parser = argparse.ArgumentParser()
670
add_command_line_options(parser)
672
options = parser.parse_args()
687
674
if has_actions(options) and not (options.client or options.all):
688
675
parser.error("Options require clients names or --all.")
1123
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1124
1094
command = SetSecretCmd
1125
1095
property = "Secret"
1126
values_to_set = [io.BytesIO(b""),
1096
values_to_set = [open("/dev/null", "rb"),
1127
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1128
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1130
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1131
1101
command = SetTimeoutCmd
1132
1102
property = "Timeout"
1133
values_to_set = [datetime.timedelta(),
1134
datetime.timedelta(minutes=5),
1135
datetime.timedelta(seconds=1),
1136
datetime.timedelta(weeks=1),
1137
datetime.timedelta(weeks=52)]
1138
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1103
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1104
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1140
1106
class TestSetExtendedTimeoutCmd(TestValueArgumentPropertyCmd):
1141
1107
command = SetExtendedTimeoutCmd
1142
1108
property = "ExtendedTimeout"
1143
values_to_set = [datetime.timedelta(),
1144
datetime.timedelta(minutes=5),
1145
datetime.timedelta(seconds=1),
1146
datetime.timedelta(weeks=1),
1147
datetime.timedelta(weeks=52)]
1148
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1109
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1110
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1150
1112
class TestSetIntervalCmd(TestValueArgumentPropertyCmd):
1151
1113
command = SetIntervalCmd
1152
1114
property = "Interval"
1153
values_to_set = [datetime.timedelta(),
1154
datetime.timedelta(minutes=5),
1155
datetime.timedelta(seconds=1),
1156
datetime.timedelta(weeks=1),
1157
datetime.timedelta(weeks=52)]
1158
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1115
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1116
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1160
1118
class TestSetApprovalDelayCmd(TestValueArgumentPropertyCmd):
1161
1119
command = SetApprovalDelayCmd
1162
1120
property = "ApprovalDelay"
1163
values_to_set = [datetime.timedelta(),
1164
datetime.timedelta(minutes=5),
1165
datetime.timedelta(seconds=1),
1166
datetime.timedelta(weeks=1),
1167
datetime.timedelta(weeks=52)]
1168
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1121
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1122
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1170
1124
class TestSetApprovalDurationCmd(TestValueArgumentPropertyCmd):
1171
1125
command = SetApprovalDurationCmd
1172
1126
property = "ApprovalDuration"
1173
values_to_set = [datetime.timedelta(),
1174
datetime.timedelta(minutes=5),
1175
datetime.timedelta(seconds=1),
1176
datetime.timedelta(weeks=1),
1177
datetime.timedelta(weeks=52)]
1178
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1127
values_to_set = ["P0D", "PT5M", "PT1S", "PT120S", "P1Y"]
1128
values_to_get = [0, 300000, 1000, 120000, 31449600000]
1180
1130
class Test_command_from_options(unittest.TestCase):
1181
1131
def setUp(self):
1200
1149
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1203
def test_print_table_verbose_short(self):
1204
self.assert_command_from_args(["-v"], PrintTableCmd,
1207
1152
def test_enable(self):
1208
1153
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1210
def test_enable_short(self):
1211
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1213
1155
def test_disable(self):
1214
1156
self.assert_command_from_args(["--disable", "foo"],
1217
def test_disable_short(self):
1218
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1220
1159
def test_bump_timeout(self):
1221
1160
self.assert_command_from_args(["--bump-timeout", "foo"],
1222
1161
BumpTimeoutCmd)
1224
def test_bump_timeout_short(self):
1225
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1227
1163
def test_start_checker(self):
1228
1164
self.assert_command_from_args(["--start-checker", "foo"],
1229
1165
StartCheckerCmd)
1236
1172
self.assert_command_from_args(["--remove", "foo"],
1239
def test_remove_short(self):
1240
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1242
1175
def test_checker(self):
1243
1176
self.assert_command_from_args(["--checker", ":", "foo"],
1244
1177
SetCheckerCmd, value_to_set=":")
1246
def test_checker_empty(self):
1247
self.assert_command_from_args(["--checker", "", "foo"],
1248
SetCheckerCmd, value_to_set="")
1250
def test_checker_short(self):
1251
self.assert_command_from_args(["-c", ":", "foo"],
1252
SetCheckerCmd, value_to_set=":")
1254
1179
def test_timeout(self):
1255
1180
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1257
1182
value_to_set=300000)
1259
def test_timeout_short(self):
1260
self.assert_command_from_args(["-t", "PT5M", "foo"],
1262
value_to_set=300000)
1264
1184
def test_extended_timeout(self):
1265
1185
self.assert_command_from_args(["--extended-timeout", "PT15M",
1319
1229
"foo"], SetSecretCmd,
1320
1230
value_to_set=value)
1322
def test_secret_devnull_short(self):
1323
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1324
SetSecretCmd, value_to_set=b"")
1326
def test_secret_tempfile_short(self):
1327
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1328
value = b"secret\0xyzzy\nbar"
1331
self.assert_command_from_args(["-s", f.name, "foo"],
1335
1232
def test_approve(self):
1336
1233
self.assert_command_from_args(["--approve", "foo"],
1339
def test_approve_short(self):
1340
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1342
1236
def test_deny(self):
1343
1237
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1345
def test_deny_short(self):
1346
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1348
1239
def test_dump_json(self):
1349
1240
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1352
1243
self.assert_command_from_args(["--is-enabled", "foo"],
1355
def test_is_enabled_short(self):
1356
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1359
class Test_check_option_syntax(unittest.TestCase):
1360
# This mostly corresponds to the definition from has_actions() in
1361
# check_option_syntax()
1363
# The actual values set here are not that important, but we do
1364
# at least stick to the correct types, even though they are
1368
"bump_timeout": True,
1369
"start_checker": True,
1370
"stop_checker": True,
1374
"timeout": datetime.timedelta(),
1375
"extended_timeout": datetime.timedelta(),
1376
"interval": datetime.timedelta(),
1377
"approved_by_default": True,
1378
"approval_delay": datetime.timedelta(),
1379
"approval_duration": datetime.timedelta(),
1381
"secret": io.BytesIO(b"x"),
1387
self.parser = argparse.ArgumentParser()
1388
add_command_line_options(self.parser)
1390
@contextlib.contextmanager
1391
def assertParseError(self):
1392
with self.assertRaises(SystemExit) as e:
1393
with self.temporarily_suppress_stderr():
1395
# Exit code from argparse is guaranteed to be "2". Reference:
1396
# https://docs.python.org/3/library/argparse.html#exiting-methods
1397
self.assertEqual(e.exception.code, 2)
1400
@contextlib.contextmanager
1401
def temporarily_suppress_stderr():
1402
null = os.open(os.path.devnull, os.O_RDWR)
1403
stderrcopy = os.dup(sys.stderr.fileno())
1404
os.dup2(null, sys.stderr.fileno())
1410
os.dup2(stderrcopy, sys.stderr.fileno())
1411
os.close(stderrcopy)
1413
def check_option_syntax(self, options):
1414
check_option_syntax(self.parser, options)
1416
def test_actions_requires_client_or_all(self):
1417
for action, value in self.actions.items():
1418
options = self.parser.parse_args()
1419
setattr(options, action, value)
1420
with self.assertParseError():
1421
self.check_option_syntax(options)
1423
def test_actions_conflicts_with_verbose(self):
1424
for action, value in self.actions.items():
1425
options = self.parser.parse_args()
1426
setattr(options, action, value)
1427
options.verbose = True
1428
with self.assertParseError():
1429
self.check_option_syntax(options)
1431
def test_dump_json_conflicts_with_verbose(self):
1432
options = self.parser.parse_args()
1433
options.dump_json = True
1434
options.verbose = True
1435
with self.assertParseError():
1436
self.check_option_syntax(options)
1438
def test_dump_json_conflicts_with_action(self):
1439
for action, value in self.actions.items():
1440
options = self.parser.parse_args()
1441
setattr(options, action, value)
1442
options.dump_json = True
1443
with self.assertParseError():
1444
self.check_option_syntax(options)
1446
def test_all_can_not_be_alone(self):
1447
options = self.parser.parse_args()
1449
with self.assertParseError():
1450
self.check_option_syntax(options)
1452
def test_all_is_ok_with_any_action(self):
1453
for action, value in self.actions.items():
1454
options = self.parser.parse_args()
1455
setattr(options, action, value)
1457
self.check_option_syntax(options)
1459
def test_is_enabled_fails_without_client(self):
1460
options = self.parser.parse_args()
1461
options.is_enabled = True
1462
with self.assertParseError():
1463
self.check_option_syntax(options)
1465
def test_is_enabled_works_with_one_client(self):
1466
options = self.parser.parse_args()
1467
options.is_enabled = True
1468
options.client = ["foo"]
1469
self.check_option_syntax(options)
1471
def test_is_enabled_fails_with_two_clients(self):
1472
options = self.parser.parse_args()
1473
options.is_enabled = True
1474
options.client = ["foo", "barbar"]
1475
with self.assertParseError():
1476
self.check_option_syntax(options)
1480
1248
def should_only_run_tests():