79
77
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
83
parser = argparse.ArgumentParser()
85
add_command_line_options(parser)
87
options = parser.parse_args()
89
check_option_syntax(parser, options)
91
clientnames = options.client
94
log.setLevel(logging.DEBUG)
97
bus = dbus.SystemBus()
98
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
99
dbus_busname, server_dbus_path)
100
mandos_dbus_objc = bus.get_object(dbus_busname,
102
except dbus.exceptions.DBusException:
103
log.critical("Could not connect to Mandos server")
106
mandos_serv = dbus.Interface(mandos_dbus_objc,
107
dbus_interface=server_dbus_interface)
108
mandos_serv_object_manager = dbus.Interface(
109
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
112
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
113
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
114
with SilenceLogger("dbus.proxies"):
115
all_clients = {path: ifs_and_props[client_dbus_interface]
116
for path, ifs_and_props in
117
mandos_serv_object_manager
118
.GetManagedObjects().items()
119
if client_dbus_interface in ifs_and_props}
120
except dbus.exceptions.DBusException as e:
121
log.critical("Failed to access Mandos server through D-Bus:"
125
# Compile dict of (clients: properties) to process
129
clients = all_clients
131
for name in clientnames:
132
for objpath, properties in all_clients.items():
133
if properties["Name"] == name:
134
clients[objpath] = properties
137
log.critical("Client not found on server: %r", name)
140
# Run all commands on clients
141
commands = commands_from_options(options)
142
for command in commands:
143
command.run(clients, bus, mandos_serv)
146
def add_command_line_options(parser):
147
parser.add_argument("--version", action="version",
148
version="%(prog)s {}".format(version),
149
help="show version number and exit")
150
parser.add_argument("-a", "--all", action="store_true",
151
help="Select all clients")
152
parser.add_argument("-v", "--verbose", action="store_true",
153
help="Print all fields")
154
parser.add_argument("-j", "--dump-json", action="store_true",
155
help="Dump client data in JSON format")
156
enable_disable = parser.add_mutually_exclusive_group()
157
enable_disable.add_argument("-e", "--enable", action="store_true",
158
help="Enable client")
159
enable_disable.add_argument("-d", "--disable",
161
help="disable client")
162
parser.add_argument("-b", "--bump-timeout", action="store_true",
163
help="Bump timeout for client")
164
start_stop_checker = parser.add_mutually_exclusive_group()
165
start_stop_checker.add_argument("--start-checker",
167
help="Start checker for client")
168
start_stop_checker.add_argument("--stop-checker",
170
help="Stop checker for client")
171
parser.add_argument("-V", "--is-enabled", action="store_true",
172
help="Check if client is enabled")
173
parser.add_argument("-r", "--remove", action="store_true",
174
help="Remove client")
175
parser.add_argument("-c", "--checker",
176
help="Set checker command for client")
177
parser.add_argument("-t", "--timeout", type=string_to_delta,
178
help="Set timeout for client")
179
parser.add_argument("--extended-timeout", type=string_to_delta,
180
help="Set extended timeout for client")
181
parser.add_argument("-i", "--interval", type=string_to_delta,
182
help="Set checker interval for client")
183
approve_deny_default = parser.add_mutually_exclusive_group()
184
approve_deny_default.add_argument(
185
"--approve-by-default", action="store_true",
186
default=None, dest="approved_by_default",
187
help="Set client to be approved by default")
188
approve_deny_default.add_argument(
189
"--deny-by-default", action="store_false",
190
dest="approved_by_default",
191
help="Set client to be denied by default")
192
parser.add_argument("--approval-delay", type=string_to_delta,
193
help="Set delay before client approve/deny")
194
parser.add_argument("--approval-duration", type=string_to_delta,
195
help="Set duration of one client approval")
196
parser.add_argument("-H", "--host", help="Set host for client")
197
parser.add_argument("-s", "--secret",
198
type=argparse.FileType(mode="rb"),
199
help="Set password blob (file) for client")
200
approve_deny = parser.add_mutually_exclusive_group()
201
approve_deny.add_argument(
202
"-A", "--approve", action="store_true",
203
help="Approve any current client request")
204
approve_deny.add_argument("-D", "--deny", action="store_true",
205
help="Deny any current client request")
206
parser.add_argument("--debug", action="store_true",
207
help="Debug mode (show D-Bus commands)")
208
parser.add_argument("--check", action="store_true",
209
help="Run self-test")
210
parser.add_argument("client", nargs="*", help="Client name")
213
def string_to_delta(interval):
214
"""Parse a string and return a datetime.timedelta"""
217
return rfc3339_duration_to_delta(interval)
218
except ValueError as e:
219
log.warning("%s - Parsing as pre-1.6.1 interval instead",
221
return parse_pre_1_6_1_interval(interval)
80
def milliseconds_to_string(ms):
81
td = datetime.timedelta(0, 0, 0, ms)
82
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
83
.format(days="{}T".format(td.days) if td.days else "",
84
hours=td.seconds // 3600,
85
minutes=(td.seconds % 3600) // 60,
86
seconds=td.seconds % 60))
224
89
def rfc3339_duration_to_delta(duration):
397
def check_option_syntax(parser, options):
398
"""Apply additional restrictions on options, not expressible in
401
def has_actions(options):
402
return any((options.enable,
404
options.bump_timeout,
405
options.start_checker,
406
options.stop_checker,
409
options.checker is not None,
410
options.timeout is not None,
411
options.extended_timeout is not None,
412
options.interval is not None,
413
options.approved_by_default is not None,
414
options.approval_delay is not None,
415
options.approval_duration is not None,
416
options.host is not None,
417
options.secret is not None,
421
if has_actions(options) and not (options.client or options.all):
422
parser.error("Options require clients names or --all.")
423
if options.verbose and has_actions(options):
424
parser.error("--verbose can only be used alone.")
425
if options.dump_json and (options.verbose
426
or has_actions(options)):
427
parser.error("--dump-json can only be used alone.")
428
if options.all and not has_actions(options):
429
parser.error("--all requires an action.")
430
if options.is_enabled and len(options.client) > 1:
431
parser.error("--is-enabled requires exactly one client")
433
options.remove = False
434
if has_actions(options) and not options.deny:
435
parser.error("--remove can only be combined with --deny")
436
options.remove = True
439
class SilenceLogger(object):
440
"Simple context manager to silence a particular logger"
441
def __init__(self, loggername):
442
self.logger = logging.getLogger(loggername)
445
self.logger.addFilter(self.nullfilter)
448
class NullFilter(logging.Filter):
449
def filter(self, record):
452
nullfilter = NullFilter()
454
def __exit__(self, exc_type, exc_val, exc_tb):
455
self.logger.removeFilter(self.nullfilter)
458
def commands_from_options(options):
462
if options.is_enabled:
463
commands.append(IsEnabledCmd())
466
commands.append(ApproveCmd())
469
commands.append(DenyCmd())
472
commands.append(RemoveCmd())
474
if options.dump_json:
475
commands.append(DumpJSONCmd())
478
commands.append(EnableCmd())
481
commands.append(DisableCmd())
483
if options.bump_timeout:
484
commands.append(BumpTimeoutCmd())
486
if options.start_checker:
487
commands.append(StartCheckerCmd())
489
if options.stop_checker:
490
commands.append(StopCheckerCmd())
492
if options.approved_by_default is not None:
493
if options.approved_by_default:
494
commands.append(ApproveByDefaultCmd())
496
commands.append(DenyByDefaultCmd())
498
if options.checker is not None:
499
commands.append(SetCheckerCmd(options.checker))
501
if options.host is not None:
502
commands.append(SetHostCmd(options.host))
504
if options.secret is not None:
505
commands.append(SetSecretCmd(options.secret))
507
if options.timeout is not None:
508
commands.append(SetTimeoutCmd(options.timeout))
510
if options.extended_timeout:
512
SetExtendedTimeoutCmd(options.extended_timeout))
514
if options.interval is not None:
515
commands.append(SetIntervalCmd(options.interval))
517
if options.approval_delay is not None:
518
commands.append(SetApprovalDelayCmd(options.approval_delay))
520
if options.approval_duration is not None:
522
SetApprovalDurationCmd(options.approval_duration))
524
# If no command option has been given, show table of clients,
525
# optionally verbosely
527
commands.append(PrintTableCmd(verbose=options.verbose))
273
## Classes for commands.
275
# Abstract classes first
532
276
class Command(object):
533
277
"""Abstract class for commands"""
534
def run(self, clients, bus=None, mandos=None):
278
def run(self, mandos, clients):
535
279
"""Normal commands should implement run_on_one_client(), but
536
280
commands which want to operate on all clients at the same time
537
281
can override this run() method instead."""
538
282
self.mandos = mandos
539
for clientpath, properties in clients.items():
540
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
541
dbus_busname, str(clientpath))
542
client = bus.get_object(dbus_busname, clientpath)
283
for client, properties in clients.items():
543
284
self.run_on_one_client(client, properties)
546
class IsEnabledCmd(Command):
547
def run(self, clients, bus=None, mandos=None):
548
client, properties = next(iter(clients.items()))
549
if self.is_enabled(client, properties):
552
def is_enabled(self, client, properties):
553
return properties["Enabled"]
556
class ApproveCmd(Command):
557
def run_on_one_client(self, client, properties):
558
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
559
client.__dbus_object_path__, client_dbus_interface)
560
client.Approve(dbus.Boolean(True),
561
dbus_interface=client_dbus_interface)
564
class DenyCmd(Command):
565
def run_on_one_client(self, client, properties):
566
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
567
client.__dbus_object_path__, client_dbus_interface)
568
client.Approve(dbus.Boolean(False),
569
dbus_interface=client_dbus_interface)
572
class RemoveCmd(Command):
573
def run_on_one_client(self, client, properties):
574
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
575
server_dbus_path, server_dbus_interface,
576
str(client.__dbus_object_path__))
577
self.mandos.RemoveClient(client.__dbus_object_path__)
580
class OutputCmd(Command):
581
"""Abstract class for commands outputting client details"""
286
class PrintCmd(Command):
287
"""Abstract class for commands printing client details"""
582
288
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
583
289
"Created", "Interval", "Host", "KeyID",
584
290
"Fingerprint", "CheckerRunning", "LastEnabled",
693
405
**{key: self.string_from_client(client, key)
694
406
for key in self.keywords})
697
def milliseconds_to_string(ms):
698
td = datetime.timedelta(0, 0, 0, ms)
699
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
700
.format(days="{}T".format(td.days)
702
hours=td.seconds // 3600,
703
minutes=(td.seconds % 3600) // 60,
704
seconds=td.seconds % 60))
707
class PropertyCmd(Command):
708
"""Abstract class for Actions for setting one client property"""
710
def run_on_one_client(self, client, properties):
711
"""Set the Client's D-Bus property"""
712
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
713
client.__dbus_object_path__,
714
dbus.PROPERTIES_IFACE, client_dbus_interface,
715
self.propname, self.value_to_set
716
if not isinstance(self.value_to_set, dbus.Boolean)
717
else bool(self.value_to_set))
718
client.Set(client_dbus_interface, self.propname,
720
dbus_interface=dbus.PROPERTIES_IFACE)
724
raise NotImplementedError()
410
class DumpJSONCmd(PrintCmd):
411
def output(self, clients):
412
data = {client["Name"]:
413
{key: self.dbus_boolean_to_bool(client[key])
414
for key in self.all_keywords}
415
for client in clients.values()}
416
return json.dumps(data, indent=4, separators=(',', ': '))
418
def dbus_boolean_to_bool(value):
419
if isinstance(value, dbus.Boolean):
423
class IsEnabledCmd(Command):
424
def run_on_one_client(self, client, properties):
425
if self.is_enabled(client, properties):
428
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
431
class RemoveCmd(Command):
432
def run_on_one_client(self, client, properties):
433
self.mandos.RemoveClient(client.__dbus_object_path__)
435
class ApproveCmd(Command):
436
def run_on_one_client(self, client, properties):
437
client.Approve(dbus.Boolean(True),
438
dbus_interface=client_interface)
440
class DenyCmd(Command):
441
def run_on_one_client(self, client, properties):
442
client.Approve(dbus.Boolean(False),
443
dbus_interface=client_interface)
727
445
class EnableCmd(PropertyCmd):
729
447
value_to_set = dbus.Boolean(True)
732
449
class DisableCmd(PropertyCmd):
734
451
value_to_set = dbus.Boolean(False)
737
453
class BumpTimeoutCmd(PropertyCmd):
738
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
739
455
value_to_set = ""
742
457
class StartCheckerCmd(PropertyCmd):
743
propname = "CheckerRunning"
458
property = "CheckerRunning"
744
459
value_to_set = dbus.Boolean(True)
747
461
class StopCheckerCmd(PropertyCmd):
748
propname = "CheckerRunning"
462
property = "CheckerRunning"
749
463
value_to_set = dbus.Boolean(False)
752
465
class ApproveByDefaultCmd(PropertyCmd):
753
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
754
467
value_to_set = dbus.Boolean(True)
757
469
class DenyByDefaultCmd(PropertyCmd):
758
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
759
471
value_to_set = dbus.Boolean(False)
762
class PropertyValueCmd(PropertyCmd):
763
"""Abstract class for PropertyCmd recieving a value as argument"""
764
def __init__(self, value):
765
self.value_to_set = value
768
class SetCheckerCmd(PropertyValueCmd):
772
class SetHostCmd(PropertyValueCmd):
776
class SetSecretCmd(PropertyValueCmd):
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
780
481
def value_to_set(self):
783
483
@value_to_set.setter
784
484
def value_to_set(self, value):
785
485
"""When setting, read data from supplied file object"""
786
486
self._vts = value.read()
790
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
791
"""Abstract class for PropertyValueCmd taking a value argument as
792
a datetime.timedelta() but should store it as milliseconds."""
795
def value_to_set(self):
799
def value_to_set(self, value):
800
"""When setting, convert value from a datetime.timedelta"""
801
self._vts = int(round(value.total_seconds() * 1000))
804
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
808
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
809
propname = "ExtendedTimeout"
812
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
813
propname = "Interval"
816
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
817
propname = "ApprovalDelay"
820
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
821
propname = "ApprovalDuration"
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
508
def add_command_line_options(parser):
509
parser.add_argument("--version", action="version",
510
version="%(prog)s {}".format(version),
511
help="show version number and exit")
512
parser.add_argument("-a", "--all", action="store_true",
513
help="Select all clients")
514
parser.add_argument("-v", "--verbose", action="store_true",
515
help="Print all fields")
516
parser.add_argument("-j", "--dump-json", action="store_true",
517
help="Dump client data in JSON format")
518
enable_disable = parser.add_mutually_exclusive_group()
519
enable_disable.add_argument("-e", "--enable", action="store_true",
520
help="Enable client")
521
enable_disable.add_argument("-d", "--disable",
523
help="disable client")
524
parser.add_argument("-b", "--bump-timeout", action="store_true",
525
help="Bump timeout for client")
526
start_stop_checker = parser.add_mutually_exclusive_group()
527
start_stop_checker.add_argument("--start-checker",
529
help="Start checker for client")
530
start_stop_checker.add_argument("--stop-checker",
532
help="Stop checker for client")
533
parser.add_argument("-V", "--is-enabled", action="store_true",
534
help="Check if client is enabled")
535
parser.add_argument("-r", "--remove", action="store_true",
536
help="Remove client")
537
parser.add_argument("-c", "--checker",
538
help="Set checker command for client")
539
parser.add_argument("-t", "--timeout", type=string_to_delta,
540
help="Set timeout for client")
541
parser.add_argument("--extended-timeout", type=string_to_delta,
542
help="Set extended timeout for client")
543
parser.add_argument("-i", "--interval", type=string_to_delta,
544
help="Set checker interval for client")
545
approve_deny_default = parser.add_mutually_exclusive_group()
546
approve_deny_default.add_argument(
547
"--approve-by-default", action="store_true",
548
default=None, dest="approved_by_default",
549
help="Set client to be approved by default")
550
approve_deny_default.add_argument(
551
"--deny-by-default", action="store_false",
552
dest="approved_by_default",
553
help="Set client to be denied by default")
554
parser.add_argument("--approval-delay", type=string_to_delta,
555
help="Set delay before client approve/deny")
556
parser.add_argument("--approval-duration", type=string_to_delta,
557
help="Set duration of one client approval")
558
parser.add_argument("-H", "--host", help="Set host for client")
559
parser.add_argument("-s", "--secret",
560
type=argparse.FileType(mode="rb"),
561
help="Set password blob (file) for client")
562
approve_deny = parser.add_mutually_exclusive_group()
563
approve_deny.add_argument(
564
"-A", "--approve", action="store_true",
565
help="Approve any current client request")
566
approve_deny.add_argument("-D", "--deny", action="store_true",
567
help="Deny any current client request")
568
parser.add_argument("--check", action="store_true",
569
help="Run self-test")
570
parser.add_argument("client", nargs="*", help="Client name")
573
def commands_from_options(options):
577
if options.dump_json:
578
commands.append(DumpJSONCmd())
581
commands.append(EnableCmd())
584
commands.append(DisableCmd())
586
if options.bump_timeout:
587
commands.append(BumpTimeoutCmd())
589
if options.start_checker:
590
commands.append(StartCheckerCmd())
592
if options.stop_checker:
593
commands.append(StopCheckerCmd())
595
if options.is_enabled:
596
commands.append(IsEnabledCmd())
599
commands.append(RemoveCmd())
601
if options.checker is not None:
602
commands.append(SetCheckerCmd(options.checker))
604
if options.timeout is not None:
605
commands.append(SetTimeoutCmd(options.timeout))
607
if options.extended_timeout:
609
SetExtendedTimeoutCmd(options.extended_timeout))
611
if options.interval is not None:
612
commands.append(SetIntervalCmd(options.interval))
614
if options.approved_by_default is not None:
615
if options.approved_by_default:
616
commands.append(ApproveByDefaultCmd())
618
commands.append(DenyByDefaultCmd())
620
if options.approval_delay is not None:
621
commands.append(SetApprovalDelayCmd(options.approval_delay))
623
if options.approval_duration is not None:
625
SetApprovalDurationCmd(options.approval_duration))
627
if options.host is not None:
628
commands.append(SetHostCmd(options.host))
630
if options.secret is not None:
631
commands.append(SetSecretCmd(options.secret))
634
commands.append(ApproveCmd())
637
commands.append(DenyCmd())
639
# If no command option has been given, show table of clients,
640
# optionally verbosely
642
commands.append(PrintTableCmd(verbose=options.verbose))
647
def check_option_syntax(parser, options):
649
def has_actions(options):
650
return any((options.enable,
652
options.bump_timeout,
653
options.start_checker,
654
options.stop_checker,
657
options.checker is not None,
658
options.timeout is not None,
659
options.extended_timeout is not None,
660
options.interval is not None,
661
options.approved_by_default is not None,
662
options.approval_delay is not None,
663
options.approval_duration is not None,
664
options.host is not None,
665
options.secret is not None,
669
if has_actions(options) and not (options.client or options.all):
670
parser.error("Options require clients names or --all.")
671
if options.verbose and has_actions(options):
672
parser.error("--verbose can only be used alone.")
673
if options.dump_json and (options.verbose
674
or has_actions(options)):
675
parser.error("--dump-json can only be used alone.")
676
if options.all and not has_actions(options):
677
parser.error("--all requires an action.")
678
if options.is_enabled and len(options.client) > 1:
679
parser.error("--is-enabled requires exactly one client")
683
parser = argparse.ArgumentParser()
685
add_command_line_options(parser)
687
options = parser.parse_args()
689
check_option_syntax(parser, options)
691
clientnames = options.client
694
bus = dbus.SystemBus()
695
mandos_dbus_objc = bus.get_object(busname, server_path)
696
except dbus.exceptions.DBusException:
697
log.critical("Could not connect to Mandos server")
700
mandos_serv = dbus.Interface(mandos_dbus_objc,
701
dbus_interface=server_interface)
702
mandos_serv_object_manager = dbus.Interface(
703
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
705
# Filter out log message from dbus module
706
dbus_logger = logging.getLogger("dbus.proxies")
707
class NullFilter(logging.Filter):
708
def filter(self, record):
710
dbus_filter = NullFilter()
712
dbus_logger.addFilter(dbus_filter)
713
mandos_clients = {path: ifs_and_props[client_interface]
714
for path, ifs_and_props in
715
mandos_serv_object_manager
716
.GetManagedObjects().items()
717
if client_interface in ifs_and_props}
718
except dbus.exceptions.DBusException as e:
719
log.critical("Failed to access Mandos server through D-Bus:"
723
# restore dbus logger
724
dbus_logger.removeFilter(dbus_filter)
726
# Compile dict of (clients: properties) to process
730
clients = {bus.get_object(busname, path): properties
731
for path, properties in mandos_clients.items()}
733
for name in clientnames:
734
for path, client in mandos_clients.items():
735
if client["Name"] == name:
736
client_objc = bus.get_object(busname, path)
737
clients[client_objc] = client
740
log.critical("Client not found on server: %r", name)
743
# Run all commands on clients
744
commands = commands_from_options(options)
745
for command in commands:
746
command.run(mandos_serv, clients)
749
class Test_milliseconds_to_string(unittest.TestCase):
751
self.assertEqual(milliseconds_to_string(93785000),
753
def test_no_days(self):
754
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
755
def test_all_zero(self):
756
self.assertEqual(milliseconds_to_string(0), "00:00:00")
757
def test_no_fractional_seconds(self):
758
self.assertEqual(milliseconds_to_string(400), "00:00:00")
759
self.assertEqual(milliseconds_to_string(900), "00:00:00")
760
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
825
762
class Test_string_to_delta(unittest.TestCase):
826
763
def test_handles_basic_rfc3339(self):
827
764
self.assertEqual(string_to_delta("PT0S"),
856
792
self.assertEqual(value, datetime.timedelta(0, 7200))
859
class Test_check_option_syntax(unittest.TestCase):
861
self.parser = argparse.ArgumentParser()
862
add_command_line_options(self.parser)
864
def test_actions_requires_client_or_all(self):
865
for action, value in self.actions.items():
866
options = self.parser.parse_args()
867
setattr(options, action, value)
868
with self.assertParseError():
869
self.check_option_syntax(options)
871
# This mostly corresponds to the definition from has_actions() in
872
# check_option_syntax()
874
# The actual values set here are not that important, but we do
875
# at least stick to the correct types, even though they are
879
"bump_timeout": True,
880
"start_checker": True,
881
"stop_checker": True,
885
"timeout": datetime.timedelta(),
886
"extended_timeout": datetime.timedelta(),
887
"interval": datetime.timedelta(),
888
"approved_by_default": True,
889
"approval_delay": datetime.timedelta(),
890
"approval_duration": datetime.timedelta(),
892
"secret": io.BytesIO(b"x"),
897
@contextlib.contextmanager
898
def assertParseError(self):
899
with self.assertRaises(SystemExit) as e:
900
with self.temporarily_suppress_stderr():
902
# Exit code from argparse is guaranteed to be "2". Reference:
903
# https://docs.python.org/3/library
904
# /argparse.html#exiting-methods
905
self.assertEqual(e.exception.code, 2)
908
@contextlib.contextmanager
909
def temporarily_suppress_stderr():
910
null = os.open(os.path.devnull, os.O_RDWR)
911
stderrcopy = os.dup(sys.stderr.fileno())
912
os.dup2(null, sys.stderr.fileno())
918
os.dup2(stderrcopy, sys.stderr.fileno())
921
def check_option_syntax(self, options):
922
check_option_syntax(self.parser, options)
924
def test_actions_conflicts_with_verbose(self):
925
for action, value in self.actions.items():
926
options = self.parser.parse_args()
927
setattr(options, action, value)
928
options.verbose = True
929
with self.assertParseError():
930
self.check_option_syntax(options)
932
def test_dump_json_conflicts_with_verbose(self):
933
options = self.parser.parse_args()
934
options.dump_json = True
935
options.verbose = True
936
with self.assertParseError():
937
self.check_option_syntax(options)
939
def test_dump_json_conflicts_with_action(self):
940
for action, value in self.actions.items():
941
options = self.parser.parse_args()
942
setattr(options, action, value)
943
options.dump_json = True
944
with self.assertParseError():
945
self.check_option_syntax(options)
947
def test_all_can_not_be_alone(self):
948
options = self.parser.parse_args()
950
with self.assertParseError():
951
self.check_option_syntax(options)
953
def test_all_is_ok_with_any_action(self):
954
for action, value in self.actions.items():
955
options = self.parser.parse_args()
956
setattr(options, action, value)
958
self.check_option_syntax(options)
960
def test_is_enabled_fails_without_client(self):
961
options = self.parser.parse_args()
962
options.is_enabled = True
963
with self.assertParseError():
964
self.check_option_syntax(options)
966
def test_is_enabled_works_with_one_client(self):
967
options = self.parser.parse_args()
968
options.is_enabled = True
969
options.client = ["foo"]
970
self.check_option_syntax(options)
972
def test_is_enabled_fails_with_two_clients(self):
973
options = self.parser.parse_args()
974
options.is_enabled = True
975
options.client = ["foo", "barbar"]
976
with self.assertParseError():
977
self.check_option_syntax(options)
979
def test_remove_can_only_be_combined_with_action_deny(self):
980
for action, value in self.actions.items():
981
if action in {"remove", "deny"}:
983
options = self.parser.parse_args()
984
setattr(options, action, value)
986
options.remove = True
987
with self.assertParseError():
988
self.check_option_syntax(options)
991
class Test_SilenceLogger(unittest.TestCase):
992
loggername = "mandos-ctl.Test_SilenceLogger"
993
log = logging.getLogger(loggername)
994
log.propagate = False
995
log.addHandler(logging.NullHandler())
998
self.counting_filter = self.CountingFilter()
1000
class CountingFilter(logging.Filter):
1001
"Count number of records"
1003
def filter(self, record):
1007
def test_should_filter_records_only_when_active(self):
1009
with SilenceLogger(self.loggername):
1010
self.log.addFilter(self.counting_filter)
1011
self.log.info("Filtered log message 1")
1012
self.log.info("Non-filtered message 2")
1013
self.log.info("Non-filtered message 3")
1015
self.log.removeFilter(self.counting_filter)
1016
self.assertEqual(self.counting_filter.count, 2)
1019
class Test_commands_from_options(unittest.TestCase):
1021
self.parser = argparse.ArgumentParser()
1022
add_command_line_options(self.parser)
1024
def test_is_enabled(self):
1025
self.assert_command_from_args(["--is-enabled", "foo"],
1028
def assert_command_from_args(self, args, command_cls,
1030
"""Assert that parsing ARGS should result in an instance of
1031
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1032
options = self.parser.parse_args(args)
1033
check_option_syntax(self.parser, options)
1034
commands = commands_from_options(options)
1035
self.assertEqual(len(commands), 1)
1036
command = commands[0]
1037
self.assertIsInstance(command, command_cls)
1038
for key, value in cmd_attrs.items():
1039
self.assertEqual(getattr(command, key), value)
1041
def test_is_enabled_short(self):
1042
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1044
def test_approve(self):
1045
self.assert_command_from_args(["--approve", "foo"],
1048
def test_approve_short(self):
1049
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1051
def test_deny(self):
1052
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1054
def test_deny_short(self):
1055
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1057
def test_remove(self):
1058
self.assert_command_from_args(["--remove", "foo"],
1061
def test_deny_before_remove(self):
1062
options = self.parser.parse_args(["--deny", "--remove",
1064
check_option_syntax(self.parser, options)
1065
commands = commands_from_options(options)
1066
self.assertEqual(len(commands), 2)
1067
self.assertIsInstance(commands[0], DenyCmd)
1068
self.assertIsInstance(commands[1], RemoveCmd)
1070
def test_deny_before_remove_reversed(self):
1071
options = self.parser.parse_args(["--remove", "--deny",
1073
check_option_syntax(self.parser, options)
1074
commands = commands_from_options(options)
1075
self.assertEqual(len(commands), 2)
1076
self.assertIsInstance(commands[0], DenyCmd)
1077
self.assertIsInstance(commands[1], RemoveCmd)
1079
def test_remove_short(self):
1080
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1082
def test_dump_json(self):
1083
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1085
def test_enable(self):
1086
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1088
def test_enable_short(self):
1089
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1091
def test_disable(self):
1092
self.assert_command_from_args(["--disable", "foo"],
1095
def test_disable_short(self):
1096
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1098
def test_bump_timeout(self):
1099
self.assert_command_from_args(["--bump-timeout", "foo"],
1102
def test_bump_timeout_short(self):
1103
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1105
def test_start_checker(self):
1106
self.assert_command_from_args(["--start-checker", "foo"],
1109
def test_stop_checker(self):
1110
self.assert_command_from_args(["--stop-checker", "foo"],
1113
def test_approve_by_default(self):
1114
self.assert_command_from_args(["--approve-by-default", "foo"],
1115
ApproveByDefaultCmd)
1117
def test_deny_by_default(self):
1118
self.assert_command_from_args(["--deny-by-default", "foo"],
1121
def test_checker(self):
1122
self.assert_command_from_args(["--checker", ":", "foo"],
1123
SetCheckerCmd, value_to_set=":")
1125
def test_checker_empty(self):
1126
self.assert_command_from_args(["--checker", "", "foo"],
1127
SetCheckerCmd, value_to_set="")
1129
def test_checker_short(self):
1130
self.assert_command_from_args(["-c", ":", "foo"],
1131
SetCheckerCmd, value_to_set=":")
1133
def test_host(self):
1134
self.assert_command_from_args(["--host", "foo.example.org",
1136
value_to_set="foo.example.org")
1138
def test_host_short(self):
1139
self.assert_command_from_args(["-H", "foo.example.org",
1141
value_to_set="foo.example.org")
1143
def test_secret_devnull(self):
1144
self.assert_command_from_args(["--secret", os.path.devnull,
1145
"foo"], SetSecretCmd,
1148
def test_secret_tempfile(self):
1149
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1150
value = b"secret\0xyzzy\nbar"
1153
self.assert_command_from_args(["--secret", f.name,
1154
"foo"], SetSecretCmd,
1157
def test_secret_devnull_short(self):
1158
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1159
SetSecretCmd, value_to_set=b"")
1161
def test_secret_tempfile_short(self):
1162
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1163
value = b"secret\0xyzzy\nbar"
1166
self.assert_command_from_args(["-s", f.name, "foo"],
1170
def test_timeout(self):
1171
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1173
value_to_set=300000)
1175
def test_timeout_short(self):
1176
self.assert_command_from_args(["-t", "PT5M", "foo"],
1178
value_to_set=300000)
1180
def test_extended_timeout(self):
1181
self.assert_command_from_args(["--extended-timeout", "PT15M",
1183
SetExtendedTimeoutCmd,
1184
value_to_set=900000)
1186
def test_interval(self):
1187
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1189
value_to_set=120000)
1191
def test_interval_short(self):
1192
self.assert_command_from_args(["-i", "PT2M", "foo"],
1194
value_to_set=120000)
1196
def test_approval_delay(self):
1197
self.assert_command_from_args(["--approval-delay", "PT30S",
1198
"foo"], SetApprovalDelayCmd,
1201
def test_approval_duration(self):
1202
self.assert_command_from_args(["--approval-duration", "PT1S",
1203
"foo"], SetApprovalDurationCmd,
1206
def test_print_table(self):
1207
self.assert_command_from_args([], PrintTableCmd,
1210
def test_print_table_verbose(self):
1211
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1214
def test_print_table_verbose_short(self):
1215
self.assert_command_from_args(["-v"], PrintTableCmd,
1219
795
class TestCmd(unittest.TestCase):
1220
796
"""Abstract class for tests of command classes"""
1222
797
def setUp(self):
1224
799
class MockClient(object):
1225
800
def __init__(self, name, **attributes):
1226
self.__dbus_object_path__ = "/clients/{}".format(name)
801
self.__dbus_object_path__ = "objpath_{}".format(name)
1227
802
self.attributes = attributes
1228
803
self.attributes["Name"] = name
1230
def Set(self, interface, propname, value, dbus_interface):
1231
testcase.assertEqual(interface, client_dbus_interface)
1232
testcase.assertEqual(dbus_interface,
1233
dbus.PROPERTIES_IFACE)
1234
self.attributes[propname] = value
1235
def Get(self, interface, propname, dbus_interface):
1236
testcase.assertEqual(interface, client_dbus_interface)
1237
testcase.assertEqual(dbus_interface,
1238
dbus.PROPERTIES_IFACE)
1239
return self.attributes[propname]
805
def Set(self, interface, property, value, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
self.attributes[property] = value
810
def Get(self, interface, property, dbus_interface):
811
testcase.assertEqual(interface, client_interface)
812
testcase.assertEqual(dbus_interface,
813
dbus.PROPERTIES_IFACE)
814
return self.attributes[property]
1240
815
def Approve(self, approve, dbus_interface):
1241
testcase.assertEqual(dbus_interface,
1242
client_dbus_interface)
816
testcase.assertEqual(dbus_interface, client_interface)
1243
817
self.calls.append(("Approve", (approve,
1244
818
dbus_interface)))
1245
819
self.client = MockClient(
1285
859
ApprovedByDefault=dbus.Boolean(False),
1286
860
LastApprovalRequest="2019-01-03T00:00:00",
1287
861
ApprovalDelay=30000,
1288
ApprovalDuration=93785000,
862
ApprovalDuration=1000,
1290
864
ExtendedTimeout=900000,
1291
865
Expires="2019-02-05T00:00:00",
1292
866
LastCheckerStatus=-2)
1293
867
self.clients = collections.OrderedDict(
1295
("/clients/foo", self.client.attributes),
1296
("/clients/barbar", self.other_client.attributes),
869
(self.client, self.client.attributes),
870
(self.other_client, self.other_client.attributes),
1298
self.one_client = {"/clients/foo": self.client.attributes}
1304
def get_object(client_bus_name, path):
1305
self.assertEqual(client_bus_name, dbus_busname)
1307
# Note: "self" here is the TestCmd instance, not
1308
# the Bus instance, since this is a static method!
1309
"/clients/foo": self.client,
1310
"/clients/barbar": self.other_client,
1315
class TestIsEnabledCmd(TestCmd):
1316
def test_is_enabled(self):
1317
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1319
for client, properties
1320
in self.clients.items()))
1322
def test_is_enabled_run_exits_successfully(self):
1323
with self.assertRaises(SystemExit) as e:
1324
IsEnabledCmd().run(self.one_client)
1325
if e.exception.code is not None:
1326
self.assertEqual(e.exception.code, 0)
1328
self.assertIsNone(e.exception.code)
1330
def test_is_enabled_run_exits_with_failure(self):
1331
self.client.attributes["Enabled"] = dbus.Boolean(False)
1332
with self.assertRaises(SystemExit) as e:
1333
IsEnabledCmd().run(self.one_client)
1334
if isinstance(e.exception.code, int):
1335
self.assertNotEqual(e.exception.code, 0)
1337
self.assertIsNotNone(e.exception.code)
1340
class TestApproveCmd(TestCmd):
1341
def test_approve(self):
1342
ApproveCmd().run(self.clients, self.bus)
1343
for clientpath in self.clients:
1344
client = self.bus.get_object(dbus_busname, clientpath)
1345
self.assertIn(("Approve", (True, client_dbus_interface)),
1349
class TestDenyCmd(TestCmd):
1350
def test_deny(self):
1351
DenyCmd().run(self.clients, self.bus)
1352
for clientpath in self.clients:
1353
client = self.bus.get_object(dbus_busname, clientpath)
1354
self.assertIn(("Approve", (False, client_dbus_interface)),
1358
class TestRemoveCmd(TestCmd):
1359
def test_remove(self):
1360
class MockMandos(object):
1363
def RemoveClient(self, dbus_path):
1364
self.calls.append(("RemoveClient", (dbus_path,)))
1365
mandos = MockMandos()
1366
super(TestRemoveCmd, self).setUp()
1367
RemoveCmd().run(self.clients, self.bus, mandos)
1368
self.assertEqual(len(mandos.calls), 2)
1369
for clientpath in self.clients:
1370
self.assertIn(("RemoveClient", (clientpath,)),
872
self.one_client = {self.client: self.client.attributes}
874
class TestPrintTableCmd(TestCmd):
875
def test_normal(self):
876
output = PrintTableCmd().output(self.clients)
877
expected_output = """
878
Name Enabled Timeout Last Successful Check
879
foo Yes 00:05:00 2019-02-03T00:00:00
880
barbar Yes 00:05:00 2019-02-04T00:00:00
882
self.assertEqual(output, expected_output)
883
def test_verbose(self):
884
output = PrintTableCmd(verbose=True).output(self.clients)
885
expected_output = """
886
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
887
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
888
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
890
self.assertEqual(output, expected_output)
891
def test_one_client(self):
892
output = PrintTableCmd().output(self.one_client)
893
expected_output = """
894
Name Enabled Timeout Last Successful Check
895
foo Yes 00:05:00 2019-02-03T00:00:00
897
self.assertEqual(output, expected_output)
1374
899
class TestDumpJSONCmd(TestCmd):
1375
900
def setUp(self):
1426
951
return super(TestDumpJSONCmd, self).setUp()
1428
952
def test_normal(self):
1429
output = DumpJSONCmd().output(self.clients.values())
1430
json_data = json.loads(output)
953
json_data = json.loads(DumpJSONCmd().output(self.clients))
1431
954
self.assertDictEqual(json_data, self.expected_json)
1433
955
def test_one_client(self):
1434
output = DumpJSONCmd().output(self.one_client.values())
1435
json_data = json.loads(output)
956
clients = self.one_client
957
json_data = json.loads(DumpJSONCmd().output(clients))
1436
958
expected_json = {"foo": self.expected_json["foo"]}
1437
959
self.assertDictEqual(json_data, expected_json)
1440
class TestPrintTableCmd(TestCmd):
1441
def test_normal(self):
1442
output = PrintTableCmd().output(self.clients.values())
1443
expected_output = "\n".join((
1444
"Name Enabled Timeout Last Successful Check",
1445
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1446
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1448
self.assertEqual(output, expected_output)
1450
def test_verbose(self):
1451
output = PrintTableCmd(verbose=True).output(
1452
self.clients.values())
1467
"Last Successful Check ",
1468
"2019-02-03T00:00:00 ",
1469
"2019-02-04T00:00:00 ",
1472
"2019-01-02T00:00:00 ",
1473
"2019-01-03T00:00:00 ",
1485
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1487
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1491
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1492
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1494
"Check Is Running ",
1499
"2019-01-03T00:00:00 ",
1500
"2019-01-04T00:00:00 ",
1502
"Approval Is Pending ",
1506
"Approved By Default ",
1510
"Last Approval Request ",
1512
"2019-01-03T00:00:00 ",
1518
"Approval Duration ",
1523
"fping -q -- %(host)s ",
1526
"Extended Timeout ",
1531
"2019-02-04T00:00:00 ",
1532
"2019-02-05T00:00:00 ",
1534
"Last Checker Status",
1539
num_lines = max(len(rows) for rows in columns)
1540
expected_output = "\n".join("".join(rows[line]
1541
for rows in columns)
1542
for line in range(num_lines))
1543
self.assertEqual(output, expected_output)
1545
def test_one_client(self):
1546
output = PrintTableCmd().output(self.one_client.values())
1547
expected_output = "\n".join((
1548
"Name Enabled Timeout Last Successful Check",
1549
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1551
self.assertEqual(output, expected_output)
961
class TestIsEnabledCmd(TestCmd):
962
def test_is_enabled(self):
963
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
964
for client, properties in self.clients.items()))
965
def test_is_enabled_run_exits_successfully(self):
966
with self.assertRaises(SystemExit) as e:
967
IsEnabledCmd().run(None, self.one_client)
968
if e.exception.code is not None:
969
self.assertEqual(e.exception.code, 0)
971
self.assertIsNone(e.exception.code)
972
def test_is_enabled_run_exits_with_failure(self):
973
self.client.attributes["Enabled"] = dbus.Boolean(False)
974
with self.assertRaises(SystemExit) as e:
975
IsEnabledCmd().run(None, self.one_client)
976
if isinstance(e.exception.code, int):
977
self.assertNotEqual(e.exception.code, 0)
979
self.assertIsNotNone(e.exception.code)
981
class TestRemoveCmd(TestCmd):
982
def test_remove(self):
983
class MockMandos(object):
986
def RemoveClient(self, dbus_path):
987
self.calls.append(("RemoveClient", (dbus_path,)))
988
mandos = MockMandos()
989
super(TestRemoveCmd, self).setUp()
990
RemoveCmd().run(mandos, self.clients)
991
self.assertEqual(len(mandos.calls), 2)
992
for client in self.clients:
993
self.assertIn(("RemoveClient",
994
(client.__dbus_object_path__,)),
997
class TestApproveCmd(TestCmd):
998
def test_approve(self):
999
ApproveCmd().run(None, self.clients)
1000
for client in self.clients:
1001
self.assertIn(("Approve", (True, client_interface)),
1004
class TestDenyCmd(TestCmd):
1005
def test_deny(self):
1006
DenyCmd().run(None, self.clients)
1007
for client in self.clients:
1008
self.assertIn(("Approve", (False, client_interface)),
1011
class TestEnableCmd(TestCmd):
1012
def test_enable(self):
1013
for client in self.clients:
1014
client.attributes["Enabled"] = False
1016
EnableCmd().run(None, self.clients)
1018
for client in self.clients:
1019
self.assertTrue(client.attributes["Enabled"])
1021
class TestDisableCmd(TestCmd):
1022
def test_disable(self):
1023
DisableCmd().run(None, self.clients)
1025
for client in self.clients:
1026
self.assertFalse(client.attributes["Enabled"])
1028
class Unique(object):
1029
"""Class for objects which exist only to be unique objects, since
1030
unittest.mock.sentinel only exists in Python 3.3"""
1554
1032
class TestPropertyCmd(TestCmd):
1555
1033
"""Abstract class for tests of PropertyCmd classes"""
1560
1038
self.values_to_set)
1561
1039
for value_to_set, value_to_get in zip(self.values_to_set,
1562
1040
values_to_get):
1563
for clientpath in self.clients:
1564
client = self.bus.get_object(dbus_busname, clientpath)
1565
old_value = client.attributes[self.propname]
1566
self.assertNotIsInstance(old_value, self.Unique)
1567
client.attributes[self.propname] = self.Unique()
1041
for client in self.clients:
1042
old_value = client.attributes[self.property]
1043
self.assertNotIsInstance(old_value, Unique)
1044
client.attributes[self.property] = Unique()
1568
1045
self.run_command(value_to_set, self.clients)
1569
for clientpath in self.clients:
1570
client = self.bus.get_object(dbus_busname, clientpath)
1571
value = client.attributes[self.propname]
1572
self.assertNotIsInstance(value, self.Unique)
1046
for client in self.clients:
1047
value = client.attributes[self.property]
1048
self.assertNotIsInstance(value, Unique)
1573
1049
self.assertEqual(value, value_to_get)
1575
class Unique(object):
1576
"""Class for objects which exist only to be unique objects,
1577
since unittest.mock.sentinel only exists in Python 3.3"""
1579
1050
def run_command(self, value, clients):
1580
self.command().run(clients, self.bus)
1583
class TestEnableCmd(TestPropertyCmd):
1585
propname = "Enabled"
1586
values_to_set = [dbus.Boolean(True)]
1589
class TestDisableCmd(TestPropertyCmd):
1590
command = DisableCmd
1591
propname = "Enabled"
1592
values_to_set = [dbus.Boolean(False)]
1051
self.command().run(None, clients)
1595
1053
class TestBumpTimeoutCmd(TestPropertyCmd):
1596
1054
command = BumpTimeoutCmd
1597
propname = "LastCheckedOK"
1055
property = "LastCheckedOK"
1598
1056
values_to_set = [""]
1601
1058
class TestStartCheckerCmd(TestPropertyCmd):
1602
1059
command = StartCheckerCmd
1603
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1604
1061
values_to_set = [dbus.Boolean(True)]
1607
1063
class TestStopCheckerCmd(TestPropertyCmd):
1608
1064
command = StopCheckerCmd
1609
propname = "CheckerRunning"
1065
property = "CheckerRunning"
1610
1066
values_to_set = [dbus.Boolean(False)]
1613
1068
class TestApproveByDefaultCmd(TestPropertyCmd):
1614
1069
command = ApproveByDefaultCmd
1615
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1616
1071
values_to_set = [dbus.Boolean(True)]
1619
1073
class TestDenyByDefaultCmd(TestPropertyCmd):
1620
1074
command = DenyByDefaultCmd
1621
propname = "ApprovedByDefault"
1075
property = "ApprovedByDefault"
1622
1076
values_to_set = [dbus.Boolean(False)]
1625
class TestPropertyValueCmd(TestPropertyCmd):
1626
"""Abstract class for tests of PropertyValueCmd classes"""
1078
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1079
"""Abstract class for tests of PropertyCmd classes using the
1080
ValueArgumentMixIn"""
1628
1081
def runTest(self):
1629
if type(self) is TestPropertyValueCmd:
1082
if type(self) is TestValueArgumentPropertyCmd:
1631
return super(TestPropertyValueCmd, self).runTest()
1084
return super(TestValueArgumentPropertyCmd, self).runTest()
1633
1085
def run_command(self, value, clients):
1634
self.command(value).run(clients, self.bus)
1637
class TestSetCheckerCmd(TestPropertyValueCmd):
1086
self.command(value).run(None, clients)
1088
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1638
1089
command = SetCheckerCmd
1639
propname = "Checker"
1090
property = "Checker"
1640
1091
values_to_set = ["", ":", "fping -q -- %s"]
1643
class TestSetHostCmd(TestPropertyValueCmd):
1093
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1644
1094
command = SetHostCmd
1646
1096
values_to_set = ["192.0.2.3", "foo.example.org"]
1649
class TestSetSecretCmd(TestPropertyValueCmd):
1098
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1650
1099
command = SetSecretCmd
1652
values_to_set = [io.BytesIO(b""),
1101
values_to_set = [open("/dev/null", "rb"),
1653
1102
io.BytesIO(b"secret\0xyzzy\nbar")]
1654
1103
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1657
class TestSetTimeoutCmd(TestPropertyValueCmd):
1105
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1658
1106
command = SetTimeoutCmd
1659
propname = "Timeout"
1107
property = "Timeout"
1660
1108
values_to_set = [datetime.timedelta(),
1661
1109
datetime.timedelta(minutes=5),
1662
1110
datetime.timedelta(seconds=1),
1708
1152
datetime.timedelta(weeks=52)]
1709
1153
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1155
class Test_command_from_options(unittest.TestCase):
1157
self.parser = argparse.ArgumentParser()
1158
add_command_line_options(self.parser)
1159
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1160
"""Assert that parsing ARGS should result in an instance of
1161
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1162
options = self.parser.parse_args(args)
1163
check_option_syntax(self.parser, options)
1164
commands = commands_from_options(options)
1165
self.assertEqual(len(commands), 1)
1166
command = commands[0]
1167
self.assertIsInstance(command, command_cls)
1168
for key, value in cmd_attrs.items():
1169
self.assertEqual(getattr(command, key), value)
1170
def test_print_table(self):
1171
self.assert_command_from_args([], PrintTableCmd,
1174
def test_print_table_verbose(self):
1175
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1178
def test_print_table_verbose_short(self):
1179
self.assert_command_from_args(["-v"], PrintTableCmd,
1182
def test_enable(self):
1183
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1185
def test_enable_short(self):
1186
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1188
def test_disable(self):
1189
self.assert_command_from_args(["--disable", "foo"],
1192
def test_disable_short(self):
1193
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1195
def test_bump_timeout(self):
1196
self.assert_command_from_args(["--bump-timeout", "foo"],
1199
def test_bump_timeout_short(self):
1200
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1202
def test_start_checker(self):
1203
self.assert_command_from_args(["--start-checker", "foo"],
1206
def test_stop_checker(self):
1207
self.assert_command_from_args(["--stop-checker", "foo"],
1210
def test_remove(self):
1211
self.assert_command_from_args(["--remove", "foo"],
1214
def test_remove_short(self):
1215
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1217
def test_checker(self):
1218
self.assert_command_from_args(["--checker", ":", "foo"],
1219
SetCheckerCmd, value_to_set=":")
1221
def test_checker_empty(self):
1222
self.assert_command_from_args(["--checker", "", "foo"],
1223
SetCheckerCmd, value_to_set="")
1225
def test_checker_short(self):
1226
self.assert_command_from_args(["-c", ":", "foo"],
1227
SetCheckerCmd, value_to_set=":")
1229
def test_timeout(self):
1230
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1232
value_to_set=300000)
1234
def test_timeout_short(self):
1235
self.assert_command_from_args(["-t", "PT5M", "foo"],
1237
value_to_set=300000)
1239
def test_extended_timeout(self):
1240
self.assert_command_from_args(["--extended-timeout", "PT15M",
1242
SetExtendedTimeoutCmd,
1243
value_to_set=900000)
1245
def test_interval(self):
1246
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1248
value_to_set=120000)
1250
def test_interval_short(self):
1251
self.assert_command_from_args(["-i", "PT2M", "foo"],
1253
value_to_set=120000)
1255
def test_approve_by_default(self):
1256
self.assert_command_from_args(["--approve-by-default", "foo"],
1257
ApproveByDefaultCmd)
1259
def test_deny_by_default(self):
1260
self.assert_command_from_args(["--deny-by-default", "foo"],
1263
def test_approval_delay(self):
1264
self.assert_command_from_args(["--approval-delay", "PT30S",
1265
"foo"], SetApprovalDelayCmd,
1268
def test_approval_duration(self):
1269
self.assert_command_from_args(["--approval-duration", "PT1S",
1270
"foo"], SetApprovalDurationCmd,
1273
def test_host(self):
1274
self.assert_command_from_args(["--host", "foo.example.org",
1276
value_to_set="foo.example.org")
1278
def test_host_short(self):
1279
self.assert_command_from_args(["-H", "foo.example.org",
1281
value_to_set="foo.example.org")
1283
def test_secret_devnull(self):
1284
self.assert_command_from_args(["--secret", os.path.devnull,
1285
"foo"], SetSecretCmd,
1288
def test_secret_tempfile(self):
1289
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1290
value = b"secret\0xyzzy\nbar"
1293
self.assert_command_from_args(["--secret", f.name,
1294
"foo"], SetSecretCmd,
1297
def test_secret_devnull_short(self):
1298
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1299
SetSecretCmd, value_to_set=b"")
1301
def test_secret_tempfile_short(self):
1302
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1303
value = b"secret\0xyzzy\nbar"
1306
self.assert_command_from_args(["-s", f.name, "foo"],
1310
def test_approve(self):
1311
self.assert_command_from_args(["--approve", "foo"],
1314
def test_approve_short(self):
1315
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1317
def test_deny(self):
1318
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1320
def test_deny_short(self):
1321
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1323
def test_dump_json(self):
1324
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1326
def test_is_enabled(self):
1327
self.assert_command_from_args(["--is-enabled", "foo"],
1330
def test_is_enabled_short(self):
1331
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1713
1335
def should_only_run_tests():