/mandos/trunk

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/trunk

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: Teddy Hogeborn
  • Date: 2019-03-07 20:28:17 UTC
  • Revision ID: teddy@recompile.se-20190307202817-avhha20s3pl14j6y
mandos-ctl: Refactor; move parsing of intervals into argument parsing

* mandos-ctl (MillisecondsValueArgumentMixIn.value_to_set): Assume
  that an incoming value is datetime.timedelta(), not a string.
  (add_command_line_options): Add "type=string_to_delta" to --timeout,
  --extended-timeout, --interval, --approval-delay, and
  --approval-duration.
  (TestSetTimeoutCmd, TestSetExtendedTimeoutCmd, TestSetIntervalCmd,
  TestSetApprovalDelayCmd, TestSetApprovalDurationCmd): Change
  values_to_set to be datetime.timedelta() values, and change to more
  appropriate values to test.  Also adjust values_to_get accordingly.

Show diffs side-by-side

added added

removed removed

Lines of Context:
44
44
import logging
45
45
import io
46
46
import tempfile
47
 
import contextlib
48
47
 
49
48
import dbus
50
49
 
300
299
    """Abstract class for Actions for setting one client property"""
301
300
    def run_on_one_client(self, client, properties):
302
301
        """Set the Client's D-Bus property"""
303
 
        log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
 
                  client.__dbus_object_path__,
305
 
                  dbus.PROPERTIES_IFACE, client_interface,
306
 
                  self.property, self.value_to_set
307
 
                  if not isinstance(self.value_to_set, dbus.Boolean)
308
 
                  else bool(self.value_to_set))
309
302
        client.Set(client_interface, self.property, self.value_to_set,
310
303
                   dbus_interface=dbus.PROPERTIES_IFACE)
311
304
 
437
430
 
438
431
class RemoveCmd(Command):
439
432
    def run_on_one_client(self, client, properties):
440
 
        log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
441
 
                  server_path, server_interface,
442
 
                  str(client.__dbus_object_path__))
443
433
        self.mandos.RemoveClient(client.__dbus_object_path__)
444
434
 
445
435
class ApproveCmd(Command):
446
436
    def run_on_one_client(self, client, properties):
447
 
        log.debug("D-Bus: %s:%s.Approve(True)",
448
 
                  client.__dbus_object_path__, client_interface)
449
437
        client.Approve(dbus.Boolean(True),
450
438
                       dbus_interface=client_interface)
451
439
 
452
440
class DenyCmd(Command):
453
441
    def run_on_one_client(self, client, properties):
454
 
        log.debug("D-Bus: %s:%s.Approve(False)",
455
 
                  client.__dbus_object_path__, client_interface)
456
442
        client.Approve(dbus.Boolean(False),
457
443
                       dbus_interface=client_interface)
458
444
 
579
565
        help="Approve any current client request")
580
566
    approve_deny.add_argument("-D", "--deny", action="store_true",
581
567
                              help="Deny any current client request")
582
 
    parser.add_argument("--debug", action="store_true",
583
 
                        help="Debug mode (show D-Bus commands)")
584
568
    parser.add_argument("--check", action="store_true",
585
569
                        help="Run self-test")
586
570
    parser.add_argument("client", nargs="*", help="Client name")
611
595
    if options.is_enabled:
612
596
        commands.append(IsEnabledCmd())
613
597
 
 
598
    if options.remove:
 
599
        commands.append(RemoveCmd())
 
600
 
614
601
    if options.checker is not None:
615
602
        commands.append(SetCheckerCmd(options.checker))
616
603
 
649
636
    if options.deny:
650
637
        commands.append(DenyCmd())
651
638
 
652
 
    if options.remove:
653
 
        commands.append(RemoveCmd())
654
 
 
655
639
    # If no command option has been given, show table of clients,
656
640
    # optionally verbosely
657
641
    if not commands:
660
644
    return commands
661
645
 
662
646
 
663
 
def check_option_syntax(parser, options):
664
 
    """Apply additional restrictions on options, not expressible in
665
 
argparse"""
 
647
def main():
 
648
    parser = argparse.ArgumentParser()
 
649
 
 
650
    add_command_line_options(parser)
 
651
 
 
652
    options = parser.parse_args()
666
653
 
667
654
    def has_actions(options):
668
655
        return any((options.enable,
696
683
    if options.is_enabled and len(options.client) > 1:
697
684
        parser.error("--is-enabled requires exactly one client")
698
685
 
699
 
 
700
 
def main():
701
 
    parser = argparse.ArgumentParser()
702
 
 
703
 
    add_command_line_options(parser)
704
 
 
705
 
    options = parser.parse_args()
706
 
 
707
 
    check_option_syntax(parser, options)
708
 
 
709
686
    clientnames = options.client
710
687
 
711
 
    if options.debug:
712
 
        log.setLevel(logging.DEBUG)
713
 
 
714
688
    try:
715
689
        bus = dbus.SystemBus()
716
 
        log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
717
 
                  server_path)
718
690
        mandos_dbus_objc = bus.get_object(busname, server_path)
719
691
    except dbus.exceptions.DBusException:
720
692
        log.critical("Could not connect to Mandos server")
733
705
    dbus_filter = NullFilter()
734
706
    try:
735
707
        dbus_logger.addFilter(dbus_filter)
736
 
        log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
737
 
                  server_path, dbus.OBJECT_MANAGER_IFACE)
738
708
        mandos_clients = {path: ifs_and_props[client_interface]
739
709
                          for path, ifs_and_props in
740
710
                          mandos_serv_object_manager
1123
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1124
1094
    command = SetSecretCmd
1125
1095
    property = "Secret"
1126
 
    values_to_set = [io.BytesIO(b""),
 
1096
    values_to_set = [open("/dev/null", "rb"),
1127
1097
                     io.BytesIO(b"secret\0xyzzy\nbar")]
1128
1098
    values_to_get = [b"", b"secret\0xyzzy\nbar"]
1129
1099
 
1185
1155
        """Assert that parsing ARGS should result in an instance of
1186
1156
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1187
1157
        options = self.parser.parse_args(args)
1188
 
        check_option_syntax(self.parser, options)
1189
1158
        commands = commands_from_options(options)
1190
1159
        self.assertEqual(len(commands), 1)
1191
1160
        command = commands[0]
1200
1169
        self.assert_command_from_args(["--verbose"], PrintTableCmd,
1201
1170
                                      verbose=True)
1202
1171
 
1203
 
    def test_print_table_verbose_short(self):
1204
 
        self.assert_command_from_args(["-v"], PrintTableCmd,
1205
 
                                      verbose=True)
1206
 
 
1207
1172
    def test_enable(self):
1208
1173
        self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1209
1174
 
1210
 
    def test_enable_short(self):
1211
 
        self.assert_command_from_args(["-e", "foo"], EnableCmd)
1212
 
 
1213
1175
    def test_disable(self):
1214
1176
        self.assert_command_from_args(["--disable", "foo"],
1215
1177
                                      DisableCmd)
1216
1178
 
1217
 
    def test_disable_short(self):
1218
 
        self.assert_command_from_args(["-d", "foo"], DisableCmd)
1219
 
 
1220
1179
    def test_bump_timeout(self):
1221
1180
        self.assert_command_from_args(["--bump-timeout", "foo"],
1222
1181
                                      BumpTimeoutCmd)
1223
1182
 
1224
 
    def test_bump_timeout_short(self):
1225
 
        self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1226
 
 
1227
1183
    def test_start_checker(self):
1228
1184
        self.assert_command_from_args(["--start-checker", "foo"],
1229
1185
                                      StartCheckerCmd)
1236
1192
        self.assert_command_from_args(["--remove", "foo"],
1237
1193
                                      RemoveCmd)
1238
1194
 
1239
 
    def test_remove_short(self):
1240
 
        self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1241
 
 
1242
1195
    def test_checker(self):
1243
1196
        self.assert_command_from_args(["--checker", ":", "foo"],
1244
1197
                                      SetCheckerCmd, value_to_set=":")
1247
1200
        self.assert_command_from_args(["--checker", "", "foo"],
1248
1201
                                      SetCheckerCmd, value_to_set="")
1249
1202
 
1250
 
    def test_checker_short(self):
1251
 
        self.assert_command_from_args(["-c", ":", "foo"],
1252
 
                                      SetCheckerCmd, value_to_set=":")
1253
 
 
1254
1203
    def test_timeout(self):
1255
1204
        self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1256
1205
                                      SetTimeoutCmd,
1257
1206
                                      value_to_set=300000)
1258
1207
 
1259
 
    def test_timeout_short(self):
1260
 
        self.assert_command_from_args(["-t", "PT5M", "foo"],
1261
 
                                      SetTimeoutCmd,
1262
 
                                      value_to_set=300000)
1263
 
 
1264
1208
    def test_extended_timeout(self):
1265
1209
        self.assert_command_from_args(["--extended-timeout", "PT15M",
1266
1210
                                       "foo"],
1272
1216
                                      SetIntervalCmd,
1273
1217
                                      value_to_set=120000)
1274
1218
 
1275
 
    def test_interval_short(self):
1276
 
        self.assert_command_from_args(["-i", "PT2M", "foo"],
1277
 
                                      SetIntervalCmd,
1278
 
                                      value_to_set=120000)
1279
 
 
1280
1219
    def test_approve_by_default(self):
1281
1220
        self.assert_command_from_args(["--approve-by-default", "foo"],
1282
1221
                                      ApproveByDefaultCmd)
1300
1239
                                       "foo"], SetHostCmd,
1301
1240
                                      value_to_set="foo.example.org")
1302
1241
 
1303
 
    def test_host_short(self):
1304
 
        self.assert_command_from_args(["-H", "foo.example.org",
1305
 
                                       "foo"], SetHostCmd,
1306
 
                                      value_to_set="foo.example.org")
1307
 
 
1308
1242
    def test_secret_devnull(self):
1309
1243
        self.assert_command_from_args(["--secret", os.path.devnull,
1310
1244
                                       "foo"], SetSecretCmd,
1319
1253
                                           "foo"], SetSecretCmd,
1320
1254
                                          value_to_set=value)
1321
1255
 
1322
 
    def test_secret_devnull_short(self):
1323
 
        self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1324
 
                                      SetSecretCmd, value_to_set=b"")
1325
 
 
1326
 
    def test_secret_tempfile_short(self):
1327
 
        with tempfile.NamedTemporaryFile(mode="r+b") as f:
1328
 
            value = b"secret\0xyzzy\nbar"
1329
 
            f.write(value)
1330
 
            f.seek(0)
1331
 
            self.assert_command_from_args(["-s", f.name, "foo"],
1332
 
                                          SetSecretCmd,
1333
 
                                          value_to_set=value)
1334
 
 
1335
1256
    def test_approve(self):
1336
1257
        self.assert_command_from_args(["--approve", "foo"],
1337
1258
                                      ApproveCmd)
1338
1259
 
1339
 
    def test_approve_short(self):
1340
 
        self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1341
 
 
1342
1260
    def test_deny(self):
1343
1261
        self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1344
1262
 
1345
 
    def test_deny_short(self):
1346
 
        self.assert_command_from_args(["-D", "foo"], DenyCmd)
1347
 
 
1348
1263
    def test_dump_json(self):
1349
1264
        self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1350
1265
 
1352
1267
        self.assert_command_from_args(["--is-enabled", "foo"],
1353
1268
                                      IsEnabledCmd)
1354
1269
 
1355
 
    def test_is_enabled_short(self):
1356
 
        self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1357
 
 
1358
 
    def test_deny_before_remove(self):
1359
 
        options = self.parser.parse_args(["--deny", "--remove", "foo"])
1360
 
        check_option_syntax(self.parser, options)
1361
 
        commands = commands_from_options(options)
1362
 
        self.assertEqual(len(commands), 2)
1363
 
        self.assertIsInstance(commands[0], DenyCmd)
1364
 
        self.assertIsInstance(commands[1], RemoveCmd)
1365
 
 
1366
 
    def test_deny_before_remove_reversed(self):
1367
 
        options = self.parser.parse_args(["--remove", "--deny", "--all"])
1368
 
        check_option_syntax(self.parser, options)
1369
 
        commands = commands_from_options(options)
1370
 
        self.assertEqual(len(commands), 2)
1371
 
        self.assertIsInstance(commands[0], DenyCmd)
1372
 
        self.assertIsInstance(commands[1], RemoveCmd)
1373
 
 
1374
 
 
1375
 
class Test_check_option_syntax(unittest.TestCase):
1376
 
    # This mostly corresponds to the definition from has_actions() in
1377
 
    # check_option_syntax()
1378
 
    actions = {
1379
 
        # The actual values set here are not that important, but we do
1380
 
        # at least stick to the correct types, even though they are
1381
 
        # never used
1382
 
        "enable": True,
1383
 
        "disable": True,
1384
 
        "bump_timeout": True,
1385
 
        "start_checker": True,
1386
 
        "stop_checker": True,
1387
 
        "is_enabled": True,
1388
 
        "remove": True,
1389
 
        "checker": "x",
1390
 
        "timeout": datetime.timedelta(),
1391
 
        "extended_timeout": datetime.timedelta(),
1392
 
        "interval": datetime.timedelta(),
1393
 
        "approved_by_default": True,
1394
 
        "approval_delay": datetime.timedelta(),
1395
 
        "approval_duration": datetime.timedelta(),
1396
 
        "host": "x",
1397
 
        "secret": io.BytesIO(b"x"),
1398
 
        "approve": True,
1399
 
        "deny": True,
1400
 
    }
1401
 
 
1402
 
    def setUp(self):
1403
 
        self.parser = argparse.ArgumentParser()
1404
 
        add_command_line_options(self.parser)
1405
 
 
1406
 
    @contextlib.contextmanager
1407
 
    def assertParseError(self):
1408
 
        with self.assertRaises(SystemExit) as e:
1409
 
            with self.temporarily_suppress_stderr():
1410
 
                yield
1411
 
        # Exit code from argparse is guaranteed to be "2".  Reference:
1412
 
        # https://docs.python.org/3/library/argparse.html#exiting-methods
1413
 
        self.assertEqual(e.exception.code, 2)
1414
 
 
1415
 
    @staticmethod
1416
 
    @contextlib.contextmanager
1417
 
    def temporarily_suppress_stderr():
1418
 
        null = os.open(os.path.devnull, os.O_RDWR)
1419
 
        stderrcopy = os.dup(sys.stderr.fileno())
1420
 
        os.dup2(null, sys.stderr.fileno())
1421
 
        os.close(null)
1422
 
        try:
1423
 
            yield
1424
 
        finally:
1425
 
            # restore stderr
1426
 
            os.dup2(stderrcopy, sys.stderr.fileno())
1427
 
            os.close(stderrcopy)
1428
 
 
1429
 
    def check_option_syntax(self, options):
1430
 
        check_option_syntax(self.parser, options)
1431
 
 
1432
 
    def test_actions_requires_client_or_all(self):
1433
 
        for action, value in self.actions.items():
1434
 
            options = self.parser.parse_args()
1435
 
            setattr(options, action, value)
1436
 
            with self.assertParseError():
1437
 
                self.check_option_syntax(options)
1438
 
 
1439
 
    def test_actions_conflicts_with_verbose(self):
1440
 
        for action, value in self.actions.items():
1441
 
            options = self.parser.parse_args()
1442
 
            setattr(options, action, value)
1443
 
            options.verbose = True
1444
 
            with self.assertParseError():
1445
 
                self.check_option_syntax(options)
1446
 
 
1447
 
    def test_dump_json_conflicts_with_verbose(self):
1448
 
        options = self.parser.parse_args()
1449
 
        options.dump_json = True
1450
 
        options.verbose = True
1451
 
        with self.assertParseError():
1452
 
            self.check_option_syntax(options)
1453
 
 
1454
 
    def test_dump_json_conflicts_with_action(self):
1455
 
        for action, value in self.actions.items():
1456
 
            options = self.parser.parse_args()
1457
 
            setattr(options, action, value)
1458
 
            options.dump_json = True
1459
 
            with self.assertParseError():
1460
 
                self.check_option_syntax(options)
1461
 
 
1462
 
    def test_all_can_not_be_alone(self):
1463
 
        options = self.parser.parse_args()
1464
 
        options.all = True
1465
 
        with self.assertParseError():
1466
 
            self.check_option_syntax(options)
1467
 
 
1468
 
    def test_all_is_ok_with_any_action(self):
1469
 
        for action, value in self.actions.items():
1470
 
            options = self.parser.parse_args()
1471
 
            setattr(options, action, value)
1472
 
            options.all = True
1473
 
            self.check_option_syntax(options)
1474
 
 
1475
 
    def test_is_enabled_fails_without_client(self):
1476
 
        options = self.parser.parse_args()
1477
 
        options.is_enabled = True
1478
 
        with self.assertParseError():
1479
 
            self.check_option_syntax(options)
1480
 
 
1481
 
    def test_is_enabled_works_with_one_client(self):
1482
 
        options = self.parser.parse_args()
1483
 
        options.is_enabled = True
1484
 
        options.client = ["foo"]
1485
 
        self.check_option_syntax(options)
1486
 
 
1487
 
    def test_is_enabled_fails_with_two_clients(self):
1488
 
        options = self.parser.parse_args()
1489
 
        options.is_enabled = True
1490
 
        options.client = ["foo", "barbar"]
1491
 
        with self.assertParseError():
1492
 
            self.check_option_syntax(options)
1493
 
 
1494
1270
 
1495
1271
 
1496
1272
def should_only_run_tests():