77
79
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
80
def milliseconds_to_string(ms):
81
td = datetime.timedelta(0, 0, 0, ms)
82
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
83
.format(days="{}T".format(td.days) if td.days else "",
84
hours=td.seconds // 3600,
85
minutes=(td.seconds % 3600) // 60,
86
seconds=td.seconds % 60))
83
parser = argparse.ArgumentParser()
85
add_command_line_options(parser)
87
options = parser.parse_args()
89
check_option_syntax(parser, options)
91
clientnames = options.client
94
log.setLevel(logging.DEBUG)
96
bus = dbus.SystemBus()
98
mandos_dbus_object = get_mandos_dbus_object(bus)
100
mandos_serv = dbus.Interface(
101
mandos_dbus_object, dbus_interface=server_dbus_interface)
102
mandos_serv_object_manager = dbus.Interface(
103
mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
106
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
107
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
108
with SilenceLogger("dbus.proxies"):
109
all_clients = {path: ifs_and_props[client_dbus_interface]
110
for path, ifs_and_props in
111
mandos_serv_object_manager
112
.GetManagedObjects().items()
113
if client_dbus_interface in ifs_and_props}
114
except dbus.exceptions.DBusException as e:
115
log.critical("Failed to access Mandos server through D-Bus:"
119
# Compile dict of (clients: properties) to process
123
clients = all_clients
125
for name in clientnames:
126
for objpath, properties in all_clients.items():
127
if properties["Name"] == name:
128
clients[objpath] = properties
131
log.critical("Client not found on server: %r", name)
134
# Run all commands on clients
135
commands = commands_from_options(options)
136
for command in commands:
137
command.run(clients, bus, mandos_serv)
140
def add_command_line_options(parser):
141
parser.add_argument("--version", action="version",
142
version="%(prog)s {}".format(version),
143
help="show version number and exit")
144
parser.add_argument("-a", "--all", action="store_true",
145
help="Select all clients")
146
parser.add_argument("-v", "--verbose", action="store_true",
147
help="Print all fields")
148
parser.add_argument("-j", "--dump-json", action="store_true",
149
help="Dump client data in JSON format")
150
enable_disable = parser.add_mutually_exclusive_group()
151
enable_disable.add_argument("-e", "--enable", action="store_true",
152
help="Enable client")
153
enable_disable.add_argument("-d", "--disable",
155
help="disable client")
156
parser.add_argument("-b", "--bump-timeout", action="store_true",
157
help="Bump timeout for client")
158
start_stop_checker = parser.add_mutually_exclusive_group()
159
start_stop_checker.add_argument("--start-checker",
161
help="Start checker for client")
162
start_stop_checker.add_argument("--stop-checker",
164
help="Stop checker for client")
165
parser.add_argument("-V", "--is-enabled", action="store_true",
166
help="Check if client is enabled")
167
parser.add_argument("-r", "--remove", action="store_true",
168
help="Remove client")
169
parser.add_argument("-c", "--checker",
170
help="Set checker command for client")
171
parser.add_argument("-t", "--timeout", type=string_to_delta,
172
help="Set timeout for client")
173
parser.add_argument("--extended-timeout", type=string_to_delta,
174
help="Set extended timeout for client")
175
parser.add_argument("-i", "--interval", type=string_to_delta,
176
help="Set checker interval for client")
177
approve_deny_default = parser.add_mutually_exclusive_group()
178
approve_deny_default.add_argument(
179
"--approve-by-default", action="store_true",
180
default=None, dest="approved_by_default",
181
help="Set client to be approved by default")
182
approve_deny_default.add_argument(
183
"--deny-by-default", action="store_false",
184
dest="approved_by_default",
185
help="Set client to be denied by default")
186
parser.add_argument("--approval-delay", type=string_to_delta,
187
help="Set delay before client approve/deny")
188
parser.add_argument("--approval-duration", type=string_to_delta,
189
help="Set duration of one client approval")
190
parser.add_argument("-H", "--host", help="Set host for client")
191
parser.add_argument("-s", "--secret",
192
type=argparse.FileType(mode="rb"),
193
help="Set password blob (file) for client")
194
approve_deny = parser.add_mutually_exclusive_group()
195
approve_deny.add_argument(
196
"-A", "--approve", action="store_true",
197
help="Approve any current client request")
198
approve_deny.add_argument("-D", "--deny", action="store_true",
199
help="Deny any current client request")
200
parser.add_argument("--debug", action="store_true",
201
help="Debug mode (show D-Bus commands)")
202
parser.add_argument("--check", action="store_true",
203
help="Run self-test")
204
parser.add_argument("client", nargs="*", help="Client name")
207
def string_to_delta(interval):
208
"""Parse a string and return a datetime.timedelta"""
211
return rfc3339_duration_to_delta(interval)
212
except ValueError as e:
213
log.warning("%s - Parsing as pre-1.6.1 interval instead",
215
return parse_pre_1_6_1_interval(interval)
89
218
def rfc3339_duration_to_delta(duration):
273
## Classes for commands.
275
# Abstract classes first
391
def check_option_syntax(parser, options):
392
"""Apply additional restrictions on options, not expressible in
395
def has_actions(options):
396
return any((options.enable,
398
options.bump_timeout,
399
options.start_checker,
400
options.stop_checker,
403
options.checker is not None,
404
options.timeout is not None,
405
options.extended_timeout is not None,
406
options.interval is not None,
407
options.approved_by_default is not None,
408
options.approval_delay is not None,
409
options.approval_duration is not None,
410
options.host is not None,
411
options.secret is not None,
415
if has_actions(options) and not (options.client or options.all):
416
parser.error("Options require clients names or --all.")
417
if options.verbose and has_actions(options):
418
parser.error("--verbose can only be used alone.")
419
if options.dump_json and (options.verbose
420
or has_actions(options)):
421
parser.error("--dump-json can only be used alone.")
422
if options.all and not has_actions(options):
423
parser.error("--all requires an action.")
424
if options.is_enabled and len(options.client) > 1:
425
parser.error("--is-enabled requires exactly one client")
427
options.remove = False
428
if has_actions(options) and not options.deny:
429
parser.error("--remove can only be combined with --deny")
430
options.remove = True
433
def get_mandos_dbus_object(bus):
435
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
436
dbus_busname, server_dbus_path)
437
mandos_dbus_object = bus.get_object(dbus_busname,
439
except dbus.exceptions.DBusException:
440
log.critical("Could not connect to Mandos server")
443
return mandos_dbus_object
446
class SilenceLogger(object):
447
"Simple context manager to silence a particular logger"
448
def __init__(self, loggername):
449
self.logger = logging.getLogger(loggername)
452
self.logger.addFilter(self.nullfilter)
455
class NullFilter(logging.Filter):
456
def filter(self, record):
459
nullfilter = NullFilter()
461
def __exit__(self, exc_type, exc_val, exc_tb):
462
self.logger.removeFilter(self.nullfilter)
465
def commands_from_options(options):
469
if options.is_enabled:
470
commands.append(IsEnabledCmd())
473
commands.append(ApproveCmd())
476
commands.append(DenyCmd())
479
commands.append(RemoveCmd())
481
if options.dump_json:
482
commands.append(DumpJSONCmd())
485
commands.append(EnableCmd())
488
commands.append(DisableCmd())
490
if options.bump_timeout:
491
commands.append(BumpTimeoutCmd())
493
if options.start_checker:
494
commands.append(StartCheckerCmd())
496
if options.stop_checker:
497
commands.append(StopCheckerCmd())
499
if options.approved_by_default is not None:
500
if options.approved_by_default:
501
commands.append(ApproveByDefaultCmd())
503
commands.append(DenyByDefaultCmd())
505
if options.checker is not None:
506
commands.append(SetCheckerCmd(options.checker))
508
if options.host is not None:
509
commands.append(SetHostCmd(options.host))
511
if options.secret is not None:
512
commands.append(SetSecretCmd(options.secret))
514
if options.timeout is not None:
515
commands.append(SetTimeoutCmd(options.timeout))
517
if options.extended_timeout:
519
SetExtendedTimeoutCmd(options.extended_timeout))
521
if options.interval is not None:
522
commands.append(SetIntervalCmd(options.interval))
524
if options.approval_delay is not None:
525
commands.append(SetApprovalDelayCmd(options.approval_delay))
527
if options.approval_duration is not None:
529
SetApprovalDurationCmd(options.approval_duration))
531
# If no command option has been given, show table of clients,
532
# optionally verbosely
534
commands.append(PrintTableCmd(verbose=options.verbose))
276
539
class Command(object):
277
540
"""Abstract class for commands"""
278
def run(self, mandos, clients):
541
def run(self, clients, bus=None, mandos=None):
279
542
"""Normal commands should implement run_on_one_client(), but
280
543
commands which want to operate on all clients at the same time
281
544
can override this run() method instead."""
282
545
self.mandos = mandos
283
for client, properties in clients.items():
546
for clientpath, properties in clients.items():
547
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
548
dbus_busname, str(clientpath))
549
client = bus.get_object(dbus_busname, clientpath)
284
550
self.run_on_one_client(client, properties)
286
class PrintCmd(Command):
287
"""Abstract class for commands printing client details"""
553
class IsEnabledCmd(Command):
554
def run(self, clients, bus=None, mandos=None):
555
client, properties = next(iter(clients.items()))
556
if self.is_enabled(client, properties):
559
def is_enabled(self, client, properties):
560
return properties["Enabled"]
563
class ApproveCmd(Command):
564
def run_on_one_client(self, client, properties):
565
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
566
client.__dbus_object_path__, client_dbus_interface)
567
client.Approve(dbus.Boolean(True),
568
dbus_interface=client_dbus_interface)
571
class DenyCmd(Command):
572
def run_on_one_client(self, client, properties):
573
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
574
client.__dbus_object_path__, client_dbus_interface)
575
client.Approve(dbus.Boolean(False),
576
dbus_interface=client_dbus_interface)
579
class RemoveCmd(Command):
580
def run_on_one_client(self, client, properties):
581
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
582
server_dbus_path, server_dbus_interface,
583
str(client.__dbus_object_path__))
584
self.mandos.RemoveClient(client.__dbus_object_path__)
587
class OutputCmd(Command):
588
"""Abstract class for commands outputting client details"""
288
589
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
289
590
"Created", "Interval", "Host", "KeyID",
290
591
"Fingerprint", "CheckerRunning", "LastEnabled",
405
700
**{key: self.string_from_client(client, key)
406
701
for key in self.keywords})
410
class DumpJSONCmd(PrintCmd):
411
def output(self, clients):
412
data = {client["Name"]:
413
{key: self.dbus_boolean_to_bool(client[key])
414
for key in self.all_keywords}
415
for client in clients.values()}
416
return json.dumps(data, indent=4, separators=(',', ': '))
418
def dbus_boolean_to_bool(value):
419
if isinstance(value, dbus.Boolean):
423
class IsEnabledCmd(Command):
424
def run_on_one_client(self, client, properties):
425
if self.is_enabled(client, properties):
428
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
431
class RemoveCmd(Command):
432
def run_on_one_client(self, client, properties):
433
self.mandos.RemoveClient(client.__dbus_object_path__)
435
class ApproveCmd(Command):
436
def run_on_one_client(self, client, properties):
437
client.Approve(dbus.Boolean(True),
438
dbus_interface=client_interface)
440
class DenyCmd(Command):
441
def run_on_one_client(self, client, properties):
442
client.Approve(dbus.Boolean(False),
443
dbus_interface=client_interface)
704
def milliseconds_to_string(ms):
705
td = datetime.timedelta(0, 0, 0, ms)
706
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
707
.format(days="{}T".format(td.days)
709
hours=td.seconds // 3600,
710
minutes=(td.seconds % 3600) // 60,
711
seconds=td.seconds % 60))
714
class PropertyCmd(Command):
715
"""Abstract class for Actions for setting one client property"""
717
def run_on_one_client(self, client, properties):
718
"""Set the Client's D-Bus property"""
719
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
720
client.__dbus_object_path__,
721
dbus.PROPERTIES_IFACE, client_dbus_interface,
722
self.propname, self.value_to_set
723
if not isinstance(self.value_to_set, dbus.Boolean)
724
else bool(self.value_to_set))
725
client.Set(client_dbus_interface, self.propname,
727
dbus_interface=dbus.PROPERTIES_IFACE)
731
raise NotImplementedError()
445
734
class EnableCmd(PropertyCmd):
447
736
value_to_set = dbus.Boolean(True)
449
739
class DisableCmd(PropertyCmd):
451
741
value_to_set = dbus.Boolean(False)
453
744
class BumpTimeoutCmd(PropertyCmd):
454
property = "LastCheckedOK"
745
propname = "LastCheckedOK"
455
746
value_to_set = ""
457
749
class StartCheckerCmd(PropertyCmd):
458
property = "CheckerRunning"
750
propname = "CheckerRunning"
459
751
value_to_set = dbus.Boolean(True)
461
754
class StopCheckerCmd(PropertyCmd):
462
property = "CheckerRunning"
755
propname = "CheckerRunning"
463
756
value_to_set = dbus.Boolean(False)
465
759
class ApproveByDefaultCmd(PropertyCmd):
466
property = "ApprovedByDefault"
760
propname = "ApprovedByDefault"
467
761
value_to_set = dbus.Boolean(True)
469
764
class DenyByDefaultCmd(PropertyCmd):
470
property = "ApprovedByDefault"
765
propname = "ApprovedByDefault"
471
766
value_to_set = dbus.Boolean(False)
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
769
class PropertyValueCmd(PropertyCmd):
770
"""Abstract class for PropertyCmd recieving a value as argument"""
771
def __init__(self, value):
772
self.value_to_set = value
775
class SetCheckerCmd(PropertyValueCmd):
779
class SetHostCmd(PropertyValueCmd):
783
class SetSecretCmd(PropertyValueCmd):
481
787
def value_to_set(self):
483
790
@value_to_set.setter
484
791
def value_to_set(self, value):
485
792
"""When setting, read data from supplied file object"""
486
793
self._vts = value.read()
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
508
def add_command_line_options(parser):
509
parser.add_argument("--version", action="version",
510
version="%(prog)s {}".format(version),
511
help="show version number and exit")
512
parser.add_argument("-a", "--all", action="store_true",
513
help="Select all clients")
514
parser.add_argument("-v", "--verbose", action="store_true",
515
help="Print all fields")
516
parser.add_argument("-j", "--dump-json", action="store_true",
517
help="Dump client data in JSON format")
518
enable_disable = parser.add_mutually_exclusive_group()
519
enable_disable.add_argument("-e", "--enable", action="store_true",
520
help="Enable client")
521
enable_disable.add_argument("-d", "--disable",
523
help="disable client")
524
parser.add_argument("-b", "--bump-timeout", action="store_true",
525
help="Bump timeout for client")
526
start_stop_checker = parser.add_mutually_exclusive_group()
527
start_stop_checker.add_argument("--start-checker",
529
help="Start checker for client")
530
start_stop_checker.add_argument("--stop-checker",
532
help="Stop checker for client")
533
parser.add_argument("-V", "--is-enabled", action="store_true",
534
help="Check if client is enabled")
535
parser.add_argument("-r", "--remove", action="store_true",
536
help="Remove client")
537
parser.add_argument("-c", "--checker",
538
help="Set checker command for client")
539
parser.add_argument("-t", "--timeout", type=string_to_delta,
540
help="Set timeout for client")
541
parser.add_argument("--extended-timeout", type=string_to_delta,
542
help="Set extended timeout for client")
543
parser.add_argument("-i", "--interval", type=string_to_delta,
544
help="Set checker interval for client")
545
approve_deny_default = parser.add_mutually_exclusive_group()
546
approve_deny_default.add_argument(
547
"--approve-by-default", action="store_true",
548
default=None, dest="approved_by_default",
549
help="Set client to be approved by default")
550
approve_deny_default.add_argument(
551
"--deny-by-default", action="store_false",
552
dest="approved_by_default",
553
help="Set client to be denied by default")
554
parser.add_argument("--approval-delay", type=string_to_delta,
555
help="Set delay before client approve/deny")
556
parser.add_argument("--approval-duration", type=string_to_delta,
557
help="Set duration of one client approval")
558
parser.add_argument("-H", "--host", help="Set host for client")
559
parser.add_argument("-s", "--secret",
560
type=argparse.FileType(mode="rb"),
561
help="Set password blob (file) for client")
562
approve_deny = parser.add_mutually_exclusive_group()
563
approve_deny.add_argument(
564
"-A", "--approve", action="store_true",
565
help="Approve any current client request")
566
approve_deny.add_argument("-D", "--deny", action="store_true",
567
help="Deny any current client request")
568
parser.add_argument("--check", action="store_true",
569
help="Run self-test")
570
parser.add_argument("client", nargs="*", help="Client name")
573
def commands_from_options(options):
577
if options.dump_json:
578
commands.append(DumpJSONCmd())
581
commands.append(EnableCmd())
584
commands.append(DisableCmd())
586
if options.bump_timeout:
587
commands.append(BumpTimeoutCmd())
589
if options.start_checker:
590
commands.append(StartCheckerCmd())
592
if options.stop_checker:
593
commands.append(StopCheckerCmd())
595
if options.is_enabled:
596
commands.append(IsEnabledCmd())
599
commands.append(RemoveCmd())
601
if options.checker is not None:
602
commands.append(SetCheckerCmd(options.checker))
604
if options.timeout is not None:
605
commands.append(SetTimeoutCmd(options.timeout))
607
if options.extended_timeout:
609
SetExtendedTimeoutCmd(options.extended_timeout))
611
if options.interval is not None:
612
commands.append(SetIntervalCmd(options.interval))
614
if options.approved_by_default is not None:
615
if options.approved_by_default:
616
commands.append(ApproveByDefaultCmd())
618
commands.append(DenyByDefaultCmd())
620
if options.approval_delay is not None:
621
commands.append(SetApprovalDelayCmd(options.approval_delay))
623
if options.approval_duration is not None:
625
SetApprovalDurationCmd(options.approval_duration))
627
if options.host is not None:
628
commands.append(SetHostCmd(options.host))
630
if options.secret is not None:
631
commands.append(SetSecretCmd(options.secret))
634
commands.append(ApproveCmd())
637
commands.append(DenyCmd())
639
# If no command option has been given, show table of clients,
640
# optionally verbosely
642
commands.append(PrintTableCmd(verbose=options.verbose))
647
def check_option_syntax(parser, options):
649
def has_actions(options):
650
return any((options.enable,
652
options.bump_timeout,
653
options.start_checker,
654
options.stop_checker,
657
options.checker is not None,
658
options.timeout is not None,
659
options.extended_timeout is not None,
660
options.interval is not None,
661
options.approved_by_default is not None,
662
options.approval_delay is not None,
663
options.approval_duration is not None,
664
options.host is not None,
665
options.secret is not None,
669
if has_actions(options) and not (options.client or options.all):
670
parser.error("Options require clients names or --all.")
671
if options.verbose and has_actions(options):
672
parser.error("--verbose can only be used alone.")
673
if options.dump_json and (options.verbose
674
or has_actions(options)):
675
parser.error("--dump-json can only be used alone.")
676
if options.all and not has_actions(options):
677
parser.error("--all requires an action.")
678
if options.is_enabled and len(options.client) > 1:
679
parser.error("--is-enabled requires exactly one client")
683
parser = argparse.ArgumentParser()
685
add_command_line_options(parser)
687
options = parser.parse_args()
689
check_option_syntax(parser, options)
691
clientnames = options.client
694
bus = dbus.SystemBus()
695
mandos_dbus_objc = bus.get_object(busname, server_path)
696
except dbus.exceptions.DBusException:
697
log.critical("Could not connect to Mandos server")
700
mandos_serv = dbus.Interface(mandos_dbus_objc,
701
dbus_interface=server_interface)
702
mandos_serv_object_manager = dbus.Interface(
703
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
705
# Filter out log message from dbus module
706
dbus_logger = logging.getLogger("dbus.proxies")
707
class NullFilter(logging.Filter):
708
def filter(self, record):
710
dbus_filter = NullFilter()
712
dbus_logger.addFilter(dbus_filter)
713
mandos_clients = {path: ifs_and_props[client_interface]
714
for path, ifs_and_props in
715
mandos_serv_object_manager
716
.GetManagedObjects().items()
717
if client_interface in ifs_and_props}
718
except dbus.exceptions.DBusException as e:
719
log.critical("Failed to access Mandos server through D-Bus:"
723
# restore dbus logger
724
dbus_logger.removeFilter(dbus_filter)
726
# Compile dict of (clients: properties) to process
730
clients = {bus.get_object(busname, path): properties
731
for path, properties in mandos_clients.items()}
733
for name in clientnames:
734
for path, client in mandos_clients.items():
735
if client["Name"] == name:
736
client_objc = bus.get_object(busname, path)
737
clients[client_objc] = client
740
log.critical("Client not found on server: %r", name)
743
# Run all commands on clients
744
commands = commands_from_options(options)
745
for command in commands:
746
command.run(mandos_serv, clients)
797
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
798
"""Abstract class for PropertyValueCmd taking a value argument as
799
a datetime.timedelta() but should store it as milliseconds."""
802
def value_to_set(self):
806
def value_to_set(self, value):
807
"""When setting, convert value from a datetime.timedelta"""
808
self._vts = int(round(value.total_seconds() * 1000))
811
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
815
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
816
propname = "ExtendedTimeout"
819
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
820
propname = "Interval"
823
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
824
propname = "ApprovalDelay"
827
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
828
propname = "ApprovalDuration"
749
class Test_milliseconds_to_string(unittest.TestCase):
751
self.assertEqual(milliseconds_to_string(93785000),
753
def test_no_days(self):
754
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
755
def test_all_zero(self):
756
self.assertEqual(milliseconds_to_string(0), "00:00:00")
757
def test_no_fractional_seconds(self):
758
self.assertEqual(milliseconds_to_string(400), "00:00:00")
759
self.assertEqual(milliseconds_to_string(900), "00:00:00")
760
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
762
832
class Test_string_to_delta(unittest.TestCase):
763
833
def test_handles_basic_rfc3339(self):
764
834
self.assertEqual(string_to_delta("PT0S"),
792
863
self.assertEqual(value, datetime.timedelta(0, 7200))
866
class Test_check_option_syntax(unittest.TestCase):
868
self.parser = argparse.ArgumentParser()
869
add_command_line_options(self.parser)
871
def test_actions_requires_client_or_all(self):
872
for action, value in self.actions.items():
873
options = self.parser.parse_args()
874
setattr(options, action, value)
875
with self.assertParseError():
876
self.check_option_syntax(options)
878
# This mostly corresponds to the definition from has_actions() in
879
# check_option_syntax()
881
# The actual values set here are not that important, but we do
882
# at least stick to the correct types, even though they are
886
"bump_timeout": True,
887
"start_checker": True,
888
"stop_checker": True,
892
"timeout": datetime.timedelta(),
893
"extended_timeout": datetime.timedelta(),
894
"interval": datetime.timedelta(),
895
"approved_by_default": True,
896
"approval_delay": datetime.timedelta(),
897
"approval_duration": datetime.timedelta(),
899
"secret": io.BytesIO(b"x"),
904
@contextlib.contextmanager
905
def assertParseError(self):
906
with self.assertRaises(SystemExit) as e:
907
with self.temporarily_suppress_stderr():
909
# Exit code from argparse is guaranteed to be "2". Reference:
910
# https://docs.python.org/3/library
911
# /argparse.html#exiting-methods
912
self.assertEqual(e.exception.code, 2)
915
@contextlib.contextmanager
916
def temporarily_suppress_stderr():
917
null = os.open(os.path.devnull, os.O_RDWR)
918
stderrcopy = os.dup(sys.stderr.fileno())
919
os.dup2(null, sys.stderr.fileno())
925
os.dup2(stderrcopy, sys.stderr.fileno())
928
def check_option_syntax(self, options):
929
check_option_syntax(self.parser, options)
931
def test_actions_conflicts_with_verbose(self):
932
for action, value in self.actions.items():
933
options = self.parser.parse_args()
934
setattr(options, action, value)
935
options.verbose = True
936
with self.assertParseError():
937
self.check_option_syntax(options)
939
def test_dump_json_conflicts_with_verbose(self):
940
options = self.parser.parse_args()
941
options.dump_json = True
942
options.verbose = True
943
with self.assertParseError():
944
self.check_option_syntax(options)
946
def test_dump_json_conflicts_with_action(self):
947
for action, value in self.actions.items():
948
options = self.parser.parse_args()
949
setattr(options, action, value)
950
options.dump_json = True
951
with self.assertParseError():
952
self.check_option_syntax(options)
954
def test_all_can_not_be_alone(self):
955
options = self.parser.parse_args()
957
with self.assertParseError():
958
self.check_option_syntax(options)
960
def test_all_is_ok_with_any_action(self):
961
for action, value in self.actions.items():
962
options = self.parser.parse_args()
963
setattr(options, action, value)
965
self.check_option_syntax(options)
967
def test_is_enabled_fails_without_client(self):
968
options = self.parser.parse_args()
969
options.is_enabled = True
970
with self.assertParseError():
971
self.check_option_syntax(options)
973
def test_is_enabled_works_with_one_client(self):
974
options = self.parser.parse_args()
975
options.is_enabled = True
976
options.client = ["foo"]
977
self.check_option_syntax(options)
979
def test_is_enabled_fails_with_two_clients(self):
980
options = self.parser.parse_args()
981
options.is_enabled = True
982
options.client = ["foo", "barbar"]
983
with self.assertParseError():
984
self.check_option_syntax(options)
986
def test_remove_can_only_be_combined_with_action_deny(self):
987
for action, value in self.actions.items():
988
if action in {"remove", "deny"}:
990
options = self.parser.parse_args()
991
setattr(options, action, value)
993
options.remove = True
994
with self.assertParseError():
995
self.check_option_syntax(options)
998
class Test_get_mandos_dbus_object(unittest.TestCase):
999
def test_calls_and_returns_get_object_on_bus(self):
1000
class MockBus(object):
1002
def get_object(mockbus_self, busname, dbus_path):
1003
# Note that "self" is still the testcase instance,
1004
# this MockBus instance is in "mockbus_self".
1005
self.assertEqual(busname, dbus_busname)
1006
self.assertEqual(dbus_path, server_dbus_path)
1007
mockbus_self.called = True
1010
mockbus = get_mandos_dbus_object(bus=MockBus())
1011
self.assertIsInstance(mockbus, MockBus)
1012
self.assertTrue(mockbus.called)
1014
def test_logs_and_exits_on_dbus_error(self):
1015
class MockBusFailing(object):
1016
def get_object(self, busname, dbus_path):
1017
raise dbus.exceptions.DBusException("Test")
1019
# assertLogs only exists in Python 3.4
1020
if hasattr(self, "assertLogs"):
1021
with self.assertLogs(log, logging.CRITICAL):
1022
with self.assertRaises(SystemExit) as e:
1023
bus = get_mandos_dbus_object(bus=MockBus())
1025
critical_filter = self.CriticalFilter()
1026
log.addFilter(critical_filter)
1028
with self.assertRaises(SystemExit) as e:
1029
get_mandos_dbus_object(bus=MockBusFailing())
1031
log.removeFilter(critical_filter)
1032
self.assertTrue(critical_filter.found)
1033
if isinstance(e.exception.code, int):
1034
self.assertNotEqual(e.exception.code, 0)
1036
self.assertIsNotNone(e.exception.code)
1038
class CriticalFilter(logging.Filter):
1039
"""Don't show, but register, critical messages"""
1041
def filter(self, record):
1042
is_critical = record.levelno >= logging.CRITICAL
1043
self.found = is_critical or self.found
1044
return not is_critical
1047
class Test_SilenceLogger(unittest.TestCase):
1048
loggername = "mandos-ctl.Test_SilenceLogger"
1049
log = logging.getLogger(loggername)
1050
log.propagate = False
1051
log.addHandler(logging.NullHandler())
1054
self.counting_filter = self.CountingFilter()
1056
class CountingFilter(logging.Filter):
1057
"Count number of records"
1059
def filter(self, record):
1063
def test_should_filter_records_only_when_active(self):
1065
with SilenceLogger(self.loggername):
1066
self.log.addFilter(self.counting_filter)
1067
self.log.info("Filtered log message 1")
1068
self.log.info("Non-filtered message 2")
1069
self.log.info("Non-filtered message 3")
1071
self.log.removeFilter(self.counting_filter)
1072
self.assertEqual(self.counting_filter.count, 2)
1075
class Test_commands_from_options(unittest.TestCase):
1077
self.parser = argparse.ArgumentParser()
1078
add_command_line_options(self.parser)
1080
def test_is_enabled(self):
1081
self.assert_command_from_args(["--is-enabled", "foo"],
1084
def assert_command_from_args(self, args, command_cls,
1086
"""Assert that parsing ARGS should result in an instance of
1087
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1088
options = self.parser.parse_args(args)
1089
check_option_syntax(self.parser, options)
1090
commands = commands_from_options(options)
1091
self.assertEqual(len(commands), 1)
1092
command = commands[0]
1093
self.assertIsInstance(command, command_cls)
1094
for key, value in cmd_attrs.items():
1095
self.assertEqual(getattr(command, key), value)
1097
def test_is_enabled_short(self):
1098
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1100
def test_approve(self):
1101
self.assert_command_from_args(["--approve", "foo"],
1104
def test_approve_short(self):
1105
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1107
def test_deny(self):
1108
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1110
def test_deny_short(self):
1111
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1113
def test_remove(self):
1114
self.assert_command_from_args(["--remove", "foo"],
1117
def test_deny_before_remove(self):
1118
options = self.parser.parse_args(["--deny", "--remove",
1120
check_option_syntax(self.parser, options)
1121
commands = commands_from_options(options)
1122
self.assertEqual(len(commands), 2)
1123
self.assertIsInstance(commands[0], DenyCmd)
1124
self.assertIsInstance(commands[1], RemoveCmd)
1126
def test_deny_before_remove_reversed(self):
1127
options = self.parser.parse_args(["--remove", "--deny",
1129
check_option_syntax(self.parser, options)
1130
commands = commands_from_options(options)
1131
self.assertEqual(len(commands), 2)
1132
self.assertIsInstance(commands[0], DenyCmd)
1133
self.assertIsInstance(commands[1], RemoveCmd)
1135
def test_remove_short(self):
1136
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1138
def test_dump_json(self):
1139
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1141
def test_enable(self):
1142
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1144
def test_enable_short(self):
1145
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1147
def test_disable(self):
1148
self.assert_command_from_args(["--disable", "foo"],
1151
def test_disable_short(self):
1152
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1154
def test_bump_timeout(self):
1155
self.assert_command_from_args(["--bump-timeout", "foo"],
1158
def test_bump_timeout_short(self):
1159
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1161
def test_start_checker(self):
1162
self.assert_command_from_args(["--start-checker", "foo"],
1165
def test_stop_checker(self):
1166
self.assert_command_from_args(["--stop-checker", "foo"],
1169
def test_approve_by_default(self):
1170
self.assert_command_from_args(["--approve-by-default", "foo"],
1171
ApproveByDefaultCmd)
1173
def test_deny_by_default(self):
1174
self.assert_command_from_args(["--deny-by-default", "foo"],
1177
def test_checker(self):
1178
self.assert_command_from_args(["--checker", ":", "foo"],
1179
SetCheckerCmd, value_to_set=":")
1181
def test_checker_empty(self):
1182
self.assert_command_from_args(["--checker", "", "foo"],
1183
SetCheckerCmd, value_to_set="")
1185
def test_checker_short(self):
1186
self.assert_command_from_args(["-c", ":", "foo"],
1187
SetCheckerCmd, value_to_set=":")
1189
def test_host(self):
1190
self.assert_command_from_args(["--host", "foo.example.org",
1192
value_to_set="foo.example.org")
1194
def test_host_short(self):
1195
self.assert_command_from_args(["-H", "foo.example.org",
1197
value_to_set="foo.example.org")
1199
def test_secret_devnull(self):
1200
self.assert_command_from_args(["--secret", os.path.devnull,
1201
"foo"], SetSecretCmd,
1204
def test_secret_tempfile(self):
1205
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1206
value = b"secret\0xyzzy\nbar"
1209
self.assert_command_from_args(["--secret", f.name,
1210
"foo"], SetSecretCmd,
1213
def test_secret_devnull_short(self):
1214
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1215
SetSecretCmd, value_to_set=b"")
1217
def test_secret_tempfile_short(self):
1218
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1219
value = b"secret\0xyzzy\nbar"
1222
self.assert_command_from_args(["-s", f.name, "foo"],
1226
def test_timeout(self):
1227
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1229
value_to_set=300000)
1231
def test_timeout_short(self):
1232
self.assert_command_from_args(["-t", "PT5M", "foo"],
1234
value_to_set=300000)
1236
def test_extended_timeout(self):
1237
self.assert_command_from_args(["--extended-timeout", "PT15M",
1239
SetExtendedTimeoutCmd,
1240
value_to_set=900000)
1242
def test_interval(self):
1243
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1245
value_to_set=120000)
1247
def test_interval_short(self):
1248
self.assert_command_from_args(["-i", "PT2M", "foo"],
1250
value_to_set=120000)
1252
def test_approval_delay(self):
1253
self.assert_command_from_args(["--approval-delay", "PT30S",
1254
"foo"], SetApprovalDelayCmd,
1257
def test_approval_duration(self):
1258
self.assert_command_from_args(["--approval-duration", "PT1S",
1259
"foo"], SetApprovalDurationCmd,
1262
def test_print_table(self):
1263
self.assert_command_from_args([], PrintTableCmd,
1266
def test_print_table_verbose(self):
1267
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1270
def test_print_table_verbose_short(self):
1271
self.assert_command_from_args(["-v"], PrintTableCmd,
795
1275
class TestCmd(unittest.TestCase):
796
1276
"""Abstract class for tests of command classes"""
797
1278
def setUp(self):
799
1280
class MockClient(object):
800
1281
def __init__(self, name, **attributes):
801
self.__dbus_object_path__ = "objpath_{}".format(name)
1282
self.__dbus_object_path__ = "/clients/{}".format(name)
802
1283
self.attributes = attributes
803
1284
self.attributes["Name"] = name
805
def Set(self, interface, property, value, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
self.attributes[property] = value
810
def Get(self, interface, property, dbus_interface):
811
testcase.assertEqual(interface, client_interface)
812
testcase.assertEqual(dbus_interface,
813
dbus.PROPERTIES_IFACE)
814
return self.attributes[property]
1286
def Set(self, interface, propname, value, dbus_interface):
1287
testcase.assertEqual(interface, client_dbus_interface)
1288
testcase.assertEqual(dbus_interface,
1289
dbus.PROPERTIES_IFACE)
1290
self.attributes[propname] = value
1291
def Get(self, interface, propname, dbus_interface):
1292
testcase.assertEqual(interface, client_dbus_interface)
1293
testcase.assertEqual(dbus_interface,
1294
dbus.PROPERTIES_IFACE)
1295
return self.attributes[propname]
815
1296
def Approve(self, approve, dbus_interface):
816
testcase.assertEqual(dbus_interface, client_interface)
1297
testcase.assertEqual(dbus_interface,
1298
client_dbus_interface)
817
1299
self.calls.append(("Approve", (approve,
818
1300
dbus_interface)))
819
1301
self.client = MockClient(
859
1341
ApprovedByDefault=dbus.Boolean(False),
860
1342
LastApprovalRequest="2019-01-03T00:00:00",
861
1343
ApprovalDelay=30000,
862
ApprovalDuration=1000,
1344
ApprovalDuration=93785000,
864
1346
ExtendedTimeout=900000,
865
1347
Expires="2019-02-05T00:00:00",
866
1348
LastCheckerStatus=-2)
867
1349
self.clients = collections.OrderedDict(
869
(self.client, self.client.attributes),
870
(self.other_client, self.other_client.attributes),
1351
("/clients/foo", self.client.attributes),
1352
("/clients/barbar", self.other_client.attributes),
872
self.one_client = {self.client: self.client.attributes}
874
class TestPrintTableCmd(TestCmd):
875
def test_normal(self):
876
output = PrintTableCmd().output(self.clients)
877
expected_output = """
878
Name Enabled Timeout Last Successful Check
879
foo Yes 00:05:00 2019-02-03T00:00:00
880
barbar Yes 00:05:00 2019-02-04T00:00:00
882
self.assertEqual(output, expected_output)
883
def test_verbose(self):
884
output = PrintTableCmd(verbose=True).output(self.clients)
885
expected_output = """
886
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
887
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
888
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
890
self.assertEqual(output, expected_output)
891
def test_one_client(self):
892
output = PrintTableCmd().output(self.one_client)
893
expected_output = """
894
Name Enabled Timeout Last Successful Check
895
foo Yes 00:05:00 2019-02-03T00:00:00
897
self.assertEqual(output, expected_output)
1354
self.one_client = {"/clients/foo": self.client.attributes}
1360
def get_object(client_bus_name, path):
1361
self.assertEqual(client_bus_name, dbus_busname)
1363
# Note: "self" here is the TestCmd instance, not
1364
# the Bus instance, since this is a static method!
1365
"/clients/foo": self.client,
1366
"/clients/barbar": self.other_client,
1371
class TestIsEnabledCmd(TestCmd):
1372
def test_is_enabled(self):
1373
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1375
for client, properties
1376
in self.clients.items()))
1378
def test_is_enabled_run_exits_successfully(self):
1379
with self.assertRaises(SystemExit) as e:
1380
IsEnabledCmd().run(self.one_client)
1381
if e.exception.code is not None:
1382
self.assertEqual(e.exception.code, 0)
1384
self.assertIsNone(e.exception.code)
1386
def test_is_enabled_run_exits_with_failure(self):
1387
self.client.attributes["Enabled"] = dbus.Boolean(False)
1388
with self.assertRaises(SystemExit) as e:
1389
IsEnabledCmd().run(self.one_client)
1390
if isinstance(e.exception.code, int):
1391
self.assertNotEqual(e.exception.code, 0)
1393
self.assertIsNotNone(e.exception.code)
1396
class TestApproveCmd(TestCmd):
1397
def test_approve(self):
1398
ApproveCmd().run(self.clients, self.bus)
1399
for clientpath in self.clients:
1400
client = self.bus.get_object(dbus_busname, clientpath)
1401
self.assertIn(("Approve", (True, client_dbus_interface)),
1405
class TestDenyCmd(TestCmd):
1406
def test_deny(self):
1407
DenyCmd().run(self.clients, self.bus)
1408
for clientpath in self.clients:
1409
client = self.bus.get_object(dbus_busname, clientpath)
1410
self.assertIn(("Approve", (False, client_dbus_interface)),
1414
class TestRemoveCmd(TestCmd):
1415
def test_remove(self):
1416
class MockMandos(object):
1419
def RemoveClient(self, dbus_path):
1420
self.calls.append(("RemoveClient", (dbus_path,)))
1421
mandos = MockMandos()
1422
super(TestRemoveCmd, self).setUp()
1423
RemoveCmd().run(self.clients, self.bus, mandos)
1424
self.assertEqual(len(mandos.calls), 2)
1425
for clientpath in self.clients:
1426
self.assertIn(("RemoveClient", (clientpath,)),
899
1430
class TestDumpJSONCmd(TestCmd):
900
1431
def setUp(self):
951
1482
return super(TestDumpJSONCmd, self).setUp()
952
1484
def test_normal(self):
953
json_data = json.loads(DumpJSONCmd().output(self.clients))
1485
output = DumpJSONCmd().output(self.clients.values())
1486
json_data = json.loads(output)
954
1487
self.assertDictEqual(json_data, self.expected_json)
955
1489
def test_one_client(self):
956
clients = self.one_client
957
json_data = json.loads(DumpJSONCmd().output(clients))
1490
output = DumpJSONCmd().output(self.one_client.values())
1491
json_data = json.loads(output)
958
1492
expected_json = {"foo": self.expected_json["foo"]}
959
1493
self.assertDictEqual(json_data, expected_json)
961
class TestIsEnabledCmd(TestCmd):
962
def test_is_enabled(self):
963
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
964
for client, properties in self.clients.items()))
965
def test_is_enabled_run_exits_successfully(self):
966
with self.assertRaises(SystemExit) as e:
967
IsEnabledCmd().run(None, self.one_client)
968
if e.exception.code is not None:
969
self.assertEqual(e.exception.code, 0)
971
self.assertIsNone(e.exception.code)
972
def test_is_enabled_run_exits_with_failure(self):
973
self.client.attributes["Enabled"] = dbus.Boolean(False)
974
with self.assertRaises(SystemExit) as e:
975
IsEnabledCmd().run(None, self.one_client)
976
if isinstance(e.exception.code, int):
977
self.assertNotEqual(e.exception.code, 0)
979
self.assertIsNotNone(e.exception.code)
981
class TestRemoveCmd(TestCmd):
982
def test_remove(self):
983
class MockMandos(object):
986
def RemoveClient(self, dbus_path):
987
self.calls.append(("RemoveClient", (dbus_path,)))
988
mandos = MockMandos()
989
super(TestRemoveCmd, self).setUp()
990
RemoveCmd().run(mandos, self.clients)
991
self.assertEqual(len(mandos.calls), 2)
992
for client in self.clients:
993
self.assertIn(("RemoveClient",
994
(client.__dbus_object_path__,)),
997
class TestApproveCmd(TestCmd):
998
def test_approve(self):
999
ApproveCmd().run(None, self.clients)
1000
for client in self.clients:
1001
self.assertIn(("Approve", (True, client_interface)),
1004
class TestDenyCmd(TestCmd):
1005
def test_deny(self):
1006
DenyCmd().run(None, self.clients)
1007
for client in self.clients:
1008
self.assertIn(("Approve", (False, client_interface)),
1011
class TestEnableCmd(TestCmd):
1012
def test_enable(self):
1013
for client in self.clients:
1014
client.attributes["Enabled"] = False
1016
EnableCmd().run(None, self.clients)
1018
for client in self.clients:
1019
self.assertTrue(client.attributes["Enabled"])
1021
class TestDisableCmd(TestCmd):
1022
def test_disable(self):
1023
DisableCmd().run(None, self.clients)
1025
for client in self.clients:
1026
self.assertFalse(client.attributes["Enabled"])
1028
class Unique(object):
1029
"""Class for objects which exist only to be unique objects, since
1030
unittest.mock.sentinel only exists in Python 3.3"""
1496
class TestPrintTableCmd(TestCmd):
1497
def test_normal(self):
1498
output = PrintTableCmd().output(self.clients.values())
1499
expected_output = "\n".join((
1500
"Name Enabled Timeout Last Successful Check",
1501
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1502
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1504
self.assertEqual(output, expected_output)
1506
def test_verbose(self):
1507
output = PrintTableCmd(verbose=True).output(
1508
self.clients.values())
1523
"Last Successful Check ",
1524
"2019-02-03T00:00:00 ",
1525
"2019-02-04T00:00:00 ",
1528
"2019-01-02T00:00:00 ",
1529
"2019-01-03T00:00:00 ",
1541
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1543
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1547
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1548
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1550
"Check Is Running ",
1555
"2019-01-03T00:00:00 ",
1556
"2019-01-04T00:00:00 ",
1558
"Approval Is Pending ",
1562
"Approved By Default ",
1566
"Last Approval Request ",
1568
"2019-01-03T00:00:00 ",
1574
"Approval Duration ",
1579
"fping -q -- %(host)s ",
1582
"Extended Timeout ",
1587
"2019-02-04T00:00:00 ",
1588
"2019-02-05T00:00:00 ",
1590
"Last Checker Status",
1595
num_lines = max(len(rows) for rows in columns)
1596
expected_output = "\n".join("".join(rows[line]
1597
for rows in columns)
1598
for line in range(num_lines))
1599
self.assertEqual(output, expected_output)
1601
def test_one_client(self):
1602
output = PrintTableCmd().output(self.one_client.values())
1603
expected_output = "\n".join((
1604
"Name Enabled Timeout Last Successful Check",
1605
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1607
self.assertEqual(output, expected_output)
1032
1610
class TestPropertyCmd(TestCmd):
1033
1611
"""Abstract class for tests of PropertyCmd classes"""
1038
1616
self.values_to_set)
1039
1617
for value_to_set, value_to_get in zip(self.values_to_set,
1040
1618
values_to_get):
1041
for client in self.clients:
1042
old_value = client.attributes[self.property]
1043
self.assertNotIsInstance(old_value, Unique)
1044
client.attributes[self.property] = Unique()
1619
for clientpath in self.clients:
1620
client = self.bus.get_object(dbus_busname, clientpath)
1621
old_value = client.attributes[self.propname]
1622
self.assertNotIsInstance(old_value, self.Unique)
1623
client.attributes[self.propname] = self.Unique()
1045
1624
self.run_command(value_to_set, self.clients)
1046
for client in self.clients:
1047
value = client.attributes[self.property]
1048
self.assertNotIsInstance(value, Unique)
1625
for clientpath in self.clients:
1626
client = self.bus.get_object(dbus_busname, clientpath)
1627
value = client.attributes[self.propname]
1628
self.assertNotIsInstance(value, self.Unique)
1049
1629
self.assertEqual(value, value_to_get)
1631
class Unique(object):
1632
"""Class for objects which exist only to be unique objects,
1633
since unittest.mock.sentinel only exists in Python 3.3"""
1050
1635
def run_command(self, value, clients):
1051
self.command().run(None, clients)
1636
self.command().run(clients, self.bus)
1639
class TestEnableCmd(TestPropertyCmd):
1641
propname = "Enabled"
1642
values_to_set = [dbus.Boolean(True)]
1645
class TestDisableCmd(TestPropertyCmd):
1646
command = DisableCmd
1647
propname = "Enabled"
1648
values_to_set = [dbus.Boolean(False)]
1053
1651
class TestBumpTimeoutCmd(TestPropertyCmd):
1054
1652
command = BumpTimeoutCmd
1055
property = "LastCheckedOK"
1653
propname = "LastCheckedOK"
1056
1654
values_to_set = [""]
1058
1657
class TestStartCheckerCmd(TestPropertyCmd):
1059
1658
command = StartCheckerCmd
1060
property = "CheckerRunning"
1659
propname = "CheckerRunning"
1061
1660
values_to_set = [dbus.Boolean(True)]
1063
1663
class TestStopCheckerCmd(TestPropertyCmd):
1064
1664
command = StopCheckerCmd
1065
property = "CheckerRunning"
1665
propname = "CheckerRunning"
1066
1666
values_to_set = [dbus.Boolean(False)]
1068
1669
class TestApproveByDefaultCmd(TestPropertyCmd):
1069
1670
command = ApproveByDefaultCmd
1070
property = "ApprovedByDefault"
1671
propname = "ApprovedByDefault"
1071
1672
values_to_set = [dbus.Boolean(True)]
1073
1675
class TestDenyByDefaultCmd(TestPropertyCmd):
1074
1676
command = DenyByDefaultCmd
1075
property = "ApprovedByDefault"
1677
propname = "ApprovedByDefault"
1076
1678
values_to_set = [dbus.Boolean(False)]
1078
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1079
"""Abstract class for tests of PropertyCmd classes using the
1080
ValueArgumentMixIn"""
1681
class TestPropertyValueCmd(TestPropertyCmd):
1682
"""Abstract class for tests of PropertyValueCmd classes"""
1081
1684
def runTest(self):
1082
if type(self) is TestValueArgumentPropertyCmd:
1685
if type(self) is TestPropertyValueCmd:
1084
return super(TestValueArgumentPropertyCmd, self).runTest()
1687
return super(TestPropertyValueCmd, self).runTest()
1085
1689
def run_command(self, value, clients):
1086
self.command(value).run(None, clients)
1088
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1690
self.command(value).run(clients, self.bus)
1693
class TestSetCheckerCmd(TestPropertyValueCmd):
1089
1694
command = SetCheckerCmd
1090
property = "Checker"
1695
propname = "Checker"
1091
1696
values_to_set = ["", ":", "fping -q -- %s"]
1093
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1699
class TestSetHostCmd(TestPropertyValueCmd):
1094
1700
command = SetHostCmd
1096
1702
values_to_set = ["192.0.2.3", "foo.example.org"]
1098
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1705
class TestSetSecretCmd(TestPropertyValueCmd):
1099
1706
command = SetSecretCmd
1101
values_to_set = [open("/dev/null", "rb"),
1708
values_to_set = [io.BytesIO(b""),
1102
1709
io.BytesIO(b"secret\0xyzzy\nbar")]
1103
1710
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1105
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1713
class TestSetTimeoutCmd(TestPropertyValueCmd):
1106
1714
command = SetTimeoutCmd
1107
property = "Timeout"
1715
propname = "Timeout"
1108
1716
values_to_set = [datetime.timedelta(),
1109
1717
datetime.timedelta(minutes=5),
1110
1718
datetime.timedelta(seconds=1),
1152
1764
datetime.timedelta(weeks=52)]
1153
1765
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1155
class Test_command_from_options(unittest.TestCase):
1157
self.parser = argparse.ArgumentParser()
1158
add_command_line_options(self.parser)
1159
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1160
"""Assert that parsing ARGS should result in an instance of
1161
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1162
options = self.parser.parse_args(args)
1163
check_option_syntax(self.parser, options)
1164
commands = commands_from_options(options)
1165
self.assertEqual(len(commands), 1)
1166
command = commands[0]
1167
self.assertIsInstance(command, command_cls)
1168
for key, value in cmd_attrs.items():
1169
self.assertEqual(getattr(command, key), value)
1170
def test_print_table(self):
1171
self.assert_command_from_args([], PrintTableCmd,
1174
def test_print_table_verbose(self):
1175
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1178
def test_print_table_verbose_short(self):
1179
self.assert_command_from_args(["-v"], PrintTableCmd,
1182
def test_enable(self):
1183
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1185
def test_enable_short(self):
1186
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1188
def test_disable(self):
1189
self.assert_command_from_args(["--disable", "foo"],
1192
def test_disable_short(self):
1193
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1195
def test_bump_timeout(self):
1196
self.assert_command_from_args(["--bump-timeout", "foo"],
1199
def test_bump_timeout_short(self):
1200
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1202
def test_start_checker(self):
1203
self.assert_command_from_args(["--start-checker", "foo"],
1206
def test_stop_checker(self):
1207
self.assert_command_from_args(["--stop-checker", "foo"],
1210
def test_remove(self):
1211
self.assert_command_from_args(["--remove", "foo"],
1214
def test_remove_short(self):
1215
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1217
def test_checker(self):
1218
self.assert_command_from_args(["--checker", ":", "foo"],
1219
SetCheckerCmd, value_to_set=":")
1221
def test_checker_empty(self):
1222
self.assert_command_from_args(["--checker", "", "foo"],
1223
SetCheckerCmd, value_to_set="")
1225
def test_checker_short(self):
1226
self.assert_command_from_args(["-c", ":", "foo"],
1227
SetCheckerCmd, value_to_set=":")
1229
def test_timeout(self):
1230
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1232
value_to_set=300000)
1234
def test_timeout_short(self):
1235
self.assert_command_from_args(["-t", "PT5M", "foo"],
1237
value_to_set=300000)
1239
def test_extended_timeout(self):
1240
self.assert_command_from_args(["--extended-timeout", "PT15M",
1242
SetExtendedTimeoutCmd,
1243
value_to_set=900000)
1245
def test_interval(self):
1246
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1248
value_to_set=120000)
1250
def test_interval_short(self):
1251
self.assert_command_from_args(["-i", "PT2M", "foo"],
1253
value_to_set=120000)
1255
def test_approve_by_default(self):
1256
self.assert_command_from_args(["--approve-by-default", "foo"],
1257
ApproveByDefaultCmd)
1259
def test_deny_by_default(self):
1260
self.assert_command_from_args(["--deny-by-default", "foo"],
1263
def test_approval_delay(self):
1264
self.assert_command_from_args(["--approval-delay", "PT30S",
1265
"foo"], SetApprovalDelayCmd,
1268
def test_approval_duration(self):
1269
self.assert_command_from_args(["--approval-duration", "PT1S",
1270
"foo"], SetApprovalDurationCmd,
1273
def test_host(self):
1274
self.assert_command_from_args(["--host", "foo.example.org",
1276
value_to_set="foo.example.org")
1278
def test_host_short(self):
1279
self.assert_command_from_args(["-H", "foo.example.org",
1281
value_to_set="foo.example.org")
1283
def test_secret_devnull(self):
1284
self.assert_command_from_args(["--secret", os.path.devnull,
1285
"foo"], SetSecretCmd,
1288
def test_secret_tempfile(self):
1289
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1290
value = b"secret\0xyzzy\nbar"
1293
self.assert_command_from_args(["--secret", f.name,
1294
"foo"], SetSecretCmd,
1297
def test_secret_devnull_short(self):
1298
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1299
SetSecretCmd, value_to_set=b"")
1301
def test_secret_tempfile_short(self):
1302
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1303
value = b"secret\0xyzzy\nbar"
1306
self.assert_command_from_args(["-s", f.name, "foo"],
1310
def test_approve(self):
1311
self.assert_command_from_args(["--approve", "foo"],
1314
def test_approve_short(self):
1315
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1317
def test_deny(self):
1318
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1320
def test_deny_short(self):
1321
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1323
def test_dump_json(self):
1324
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1326
def test_is_enabled(self):
1327
self.assert_command_from_args(["--is-enabled", "foo"],
1330
def test_is_enabled_short(self):
1331
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1335
1769
def should_only_run_tests():