78
79
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
81
def milliseconds_to_string(ms):
82
td = datetime.timedelta(0, 0, 0, ms)
83
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
84
.format(days="{}T".format(td.days) if td.days else "",
85
hours=td.seconds // 3600,
86
minutes=(td.seconds % 3600) // 60,
87
seconds=td.seconds % 60))
83
parser = argparse.ArgumentParser()
85
add_command_line_options(parser)
87
options = parser.parse_args()
89
check_option_syntax(parser, options)
91
clientnames = options.client
94
log.setLevel(logging.DEBUG)
96
bus = dbus.SystemBus()
98
mandos_dbus_object = get_mandos_dbus_object(bus)
100
mandos_serv = dbus.Interface(
101
mandos_dbus_object, dbus_interface=server_dbus_interface)
102
mandos_serv_object_manager = dbus.Interface(
103
mandos_dbus_object, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
105
managed_objects = get_managed_objects(mandos_serv_object_manager)
108
for path, ifs_and_props in managed_objects.items():
110
all_clients[path] = ifs_and_props[client_dbus_interface]
114
# Compile dict of (clientpath: properties) to process
116
clients = all_clients
119
for name in clientnames:
120
for objpath, properties in all_clients.items():
121
if properties["Name"] == name:
122
clients[objpath] = properties
125
log.critical("Client not found on server: %r", name)
128
# Run all commands on clients
129
commands = commands_from_options(options)
130
for command in commands:
131
command.run(clients, bus, mandos_serv)
134
def add_command_line_options(parser):
135
parser.add_argument("--version", action="version",
136
version="%(prog)s {}".format(version),
137
help="show version number and exit")
138
parser.add_argument("-a", "--all", action="store_true",
139
help="Select all clients")
140
parser.add_argument("-v", "--verbose", action="store_true",
141
help="Print all fields")
142
parser.add_argument("-j", "--dump-json", action="store_true",
143
help="Dump client data in JSON format")
144
enable_disable = parser.add_mutually_exclusive_group()
145
enable_disable.add_argument("-e", "--enable", action="store_true",
146
help="Enable client")
147
enable_disable.add_argument("-d", "--disable",
149
help="disable client")
150
parser.add_argument("-b", "--bump-timeout", action="store_true",
151
help="Bump timeout for client")
152
start_stop_checker = parser.add_mutually_exclusive_group()
153
start_stop_checker.add_argument("--start-checker",
155
help="Start checker for client")
156
start_stop_checker.add_argument("--stop-checker",
158
help="Stop checker for client")
159
parser.add_argument("-V", "--is-enabled", action="store_true",
160
help="Check if client is enabled")
161
parser.add_argument("-r", "--remove", action="store_true",
162
help="Remove client")
163
parser.add_argument("-c", "--checker",
164
help="Set checker command for client")
165
parser.add_argument("-t", "--timeout", type=string_to_delta,
166
help="Set timeout for client")
167
parser.add_argument("--extended-timeout", type=string_to_delta,
168
help="Set extended timeout for client")
169
parser.add_argument("-i", "--interval", type=string_to_delta,
170
help="Set checker interval for client")
171
approve_deny_default = parser.add_mutually_exclusive_group()
172
approve_deny_default.add_argument(
173
"--approve-by-default", action="store_true",
174
default=None, dest="approved_by_default",
175
help="Set client to be approved by default")
176
approve_deny_default.add_argument(
177
"--deny-by-default", action="store_false",
178
dest="approved_by_default",
179
help="Set client to be denied by default")
180
parser.add_argument("--approval-delay", type=string_to_delta,
181
help="Set delay before client approve/deny")
182
parser.add_argument("--approval-duration", type=string_to_delta,
183
help="Set duration of one client approval")
184
parser.add_argument("-H", "--host", help="Set host for client")
185
parser.add_argument("-s", "--secret",
186
type=argparse.FileType(mode="rb"),
187
help="Set password blob (file) for client")
188
approve_deny = parser.add_mutually_exclusive_group()
189
approve_deny.add_argument(
190
"-A", "--approve", action="store_true",
191
help="Approve any current client request")
192
approve_deny.add_argument("-D", "--deny", action="store_true",
193
help="Deny any current client request")
194
parser.add_argument("--debug", action="store_true",
195
help="Debug mode (show D-Bus commands)")
196
parser.add_argument("--check", action="store_true",
197
help="Run self-test")
198
parser.add_argument("client", nargs="*", help="Client name")
201
def string_to_delta(interval):
202
"""Parse a string and return a datetime.timedelta"""
205
return rfc3339_duration_to_delta(interval)
206
except ValueError as e:
207
log.warning("%s - Parsing as pre-1.6.1 interval instead",
209
return parse_pre_1_6_1_interval(interval)
90
212
def rfc3339_duration_to_delta(duration):
274
## Classes for commands.
276
# Abstract classes first
385
def check_option_syntax(parser, options):
386
"""Apply additional restrictions on options, not expressible in
389
def has_actions(options):
390
return any((options.enable,
392
options.bump_timeout,
393
options.start_checker,
394
options.stop_checker,
397
options.checker is not None,
398
options.timeout is not None,
399
options.extended_timeout is not None,
400
options.interval is not None,
401
options.approved_by_default is not None,
402
options.approval_delay is not None,
403
options.approval_duration is not None,
404
options.host is not None,
405
options.secret is not None,
409
if has_actions(options) and not (options.client or options.all):
410
parser.error("Options require clients names or --all.")
411
if options.verbose and has_actions(options):
412
parser.error("--verbose can only be used alone.")
413
if options.dump_json and (options.verbose
414
or has_actions(options)):
415
parser.error("--dump-json can only be used alone.")
416
if options.all and not has_actions(options):
417
parser.error("--all requires an action.")
418
if options.is_enabled and len(options.client) > 1:
419
parser.error("--is-enabled requires exactly one client")
421
options.remove = False
422
if has_actions(options) and not options.deny:
423
parser.error("--remove can only be combined with --deny")
424
options.remove = True
427
def get_mandos_dbus_object(bus):
428
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
429
dbus_busname, server_dbus_path)
430
with if_dbus_exception_log_with_exception_and_exit(
431
"Could not connect to Mandos server: %s"):
432
mandos_dbus_object = bus.get_object(dbus_busname,
434
return mandos_dbus_object
437
@contextlib.contextmanager
438
def if_dbus_exception_log_with_exception_and_exit(*args, **kwargs):
441
except dbus.exceptions.DBusException as e:
442
log.critical(*(args + (e,)), **kwargs)
446
def get_managed_objects(object_manager):
447
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
448
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
449
with if_dbus_exception_log_with_exception_and_exit(
450
"Failed to access Mandos server through D-Bus:\n%s"):
451
with SilenceLogger("dbus.proxies"):
452
managed_objects = object_manager.GetManagedObjects()
453
return managed_objects
456
class SilenceLogger(object):
457
"Simple context manager to silence a particular logger"
458
def __init__(self, loggername):
459
self.logger = logging.getLogger(loggername)
462
self.logger.addFilter(self.nullfilter)
465
class NullFilter(logging.Filter):
466
def filter(self, record):
469
nullfilter = NullFilter()
471
def __exit__(self, exc_type, exc_val, exc_tb):
472
self.logger.removeFilter(self.nullfilter)
475
def commands_from_options(options):
479
if options.is_enabled:
480
commands.append(IsEnabledCmd())
483
commands.append(ApproveCmd())
486
commands.append(DenyCmd())
489
commands.append(RemoveCmd())
491
if options.dump_json:
492
commands.append(DumpJSONCmd())
495
commands.append(EnableCmd())
498
commands.append(DisableCmd())
500
if options.bump_timeout:
501
commands.append(BumpTimeoutCmd())
503
if options.start_checker:
504
commands.append(StartCheckerCmd())
506
if options.stop_checker:
507
commands.append(StopCheckerCmd())
509
if options.approved_by_default is not None:
510
if options.approved_by_default:
511
commands.append(ApproveByDefaultCmd())
513
commands.append(DenyByDefaultCmd())
515
if options.checker is not None:
516
commands.append(SetCheckerCmd(options.checker))
518
if options.host is not None:
519
commands.append(SetHostCmd(options.host))
521
if options.secret is not None:
522
commands.append(SetSecretCmd(options.secret))
524
if options.timeout is not None:
525
commands.append(SetTimeoutCmd(options.timeout))
527
if options.extended_timeout:
529
SetExtendedTimeoutCmd(options.extended_timeout))
531
if options.interval is not None:
532
commands.append(SetIntervalCmd(options.interval))
534
if options.approval_delay is not None:
535
commands.append(SetApprovalDelayCmd(options.approval_delay))
537
if options.approval_duration is not None:
539
SetApprovalDurationCmd(options.approval_duration))
541
# If no command option has been given, show table of clients,
542
# optionally verbosely
544
commands.append(PrintTableCmd(verbose=options.verbose))
277
549
class Command(object):
278
550
"""Abstract class for commands"""
279
def run(self, mandos, clients):
551
def run(self, clients, bus=None, mandos=None):
280
552
"""Normal commands should implement run_on_one_client(), but
281
553
commands which want to operate on all clients at the same time
282
554
can override this run() method instead."""
283
555
self.mandos = mandos
284
for client, properties in clients.items():
556
for clientpath, properties in clients.items():
557
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
558
dbus_busname, str(clientpath))
559
client = bus.get_object(dbus_busname, clientpath)
285
560
self.run_on_one_client(client, properties)
287
class PrintCmd(Command):
288
"""Abstract class for commands printing client details"""
563
class IsEnabledCmd(Command):
564
def run(self, clients, bus=None, mandos=None):
565
client, properties = next(iter(clients.items()))
566
if self.is_enabled(client, properties):
569
def is_enabled(self, client, properties):
570
return properties["Enabled"]
573
class ApproveCmd(Command):
574
def run_on_one_client(self, client, properties):
575
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
576
client.__dbus_object_path__, client_dbus_interface)
577
client.Approve(dbus.Boolean(True),
578
dbus_interface=client_dbus_interface)
581
class DenyCmd(Command):
582
def run_on_one_client(self, client, properties):
583
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
584
client.__dbus_object_path__, client_dbus_interface)
585
client.Approve(dbus.Boolean(False),
586
dbus_interface=client_dbus_interface)
589
class RemoveCmd(Command):
590
def run_on_one_client(self, client, properties):
591
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
592
server_dbus_path, server_dbus_interface,
593
str(client.__dbus_object_path__))
594
self.mandos.RemoveClient(client.__dbus_object_path__)
597
class OutputCmd(Command):
598
"""Abstract class for commands outputting client details"""
289
599
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
290
600
"Created", "Interval", "Host", "KeyID",
291
601
"Fingerprint", "CheckerRunning", "LastEnabled",
417
710
**{key: self.string_from_client(client, key)
418
711
for key in self.keywords})
422
class DumpJSONCmd(PrintCmd):
423
def output(self, clients):
424
data = {client["Name"]:
425
{key: self.dbus_boolean_to_bool(client[key])
426
for key in self.all_keywords}
427
for client in clients.values()}
428
return json.dumps(data, indent=4, separators=(',', ': '))
430
def dbus_boolean_to_bool(value):
431
if isinstance(value, dbus.Boolean):
435
class IsEnabledCmd(Command):
714
def milliseconds_to_string(ms):
715
td = datetime.timedelta(0, 0, 0, ms)
716
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
717
.format(days="{}T".format(td.days)
719
hours=td.seconds // 3600,
720
minutes=(td.seconds % 3600) // 60,
721
seconds=td.seconds % 60))
724
class PropertyCmd(Command):
725
"""Abstract class for Actions for setting one client property"""
436
727
def run_on_one_client(self, client, properties):
437
if self.is_enabled(client, properties):
440
def is_enabled(self, client, properties):
441
log.debug("D-Bus: %s:%s:%s.Get(%r, %r)", busname,
728
"""Set the Client's D-Bus property"""
729
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
442
730
client.__dbus_object_path__,
443
dbus.PROPERTIES_IFACE, client_interface,
445
return bool(client.Get(client_interface, "Enabled",
446
dbus_interface=dbus.PROPERTIES_IFACE))
448
class RemoveCmd(Command):
449
def run_on_one_client(self, client, properties):
450
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
451
server_path, server_interface,
452
str(client.__dbus_object_path__))
453
self.mandos.RemoveClient(client.__dbus_object_path__)
455
class ApproveCmd(Command):
456
def run_on_one_client(self, client, properties):
457
log.debug("D-Bus: %s:%s.Approve(True)",
458
client.__dbus_object_path__, client_interface)
459
client.Approve(dbus.Boolean(True),
460
dbus_interface=client_interface)
462
class DenyCmd(Command):
463
def run_on_one_client(self, client, properties):
464
log.debug("D-Bus: %s:%s.Approve(False)",
465
client.__dbus_object_path__, client_interface)
466
client.Approve(dbus.Boolean(False),
467
dbus_interface=client_interface)
731
dbus.PROPERTIES_IFACE, client_dbus_interface,
732
self.propname, self.value_to_set
733
if not isinstance(self.value_to_set, dbus.Boolean)
734
else bool(self.value_to_set))
735
client.Set(client_dbus_interface, self.propname,
737
dbus_interface=dbus.PROPERTIES_IFACE)
741
raise NotImplementedError()
469
744
class EnableCmd(PropertyCmd):
470
745
propname = "Enabled"
471
746
value_to_set = dbus.Boolean(True)
473
749
class DisableCmd(PropertyCmd):
474
750
propname = "Enabled"
475
751
value_to_set = dbus.Boolean(False)
477
754
class BumpTimeoutCmd(PropertyCmd):
478
755
propname = "LastCheckedOK"
479
756
value_to_set = ""
481
759
class StartCheckerCmd(PropertyCmd):
482
760
propname = "CheckerRunning"
483
761
value_to_set = dbus.Boolean(True)
485
764
class StopCheckerCmd(PropertyCmd):
486
765
propname = "CheckerRunning"
487
766
value_to_set = dbus.Boolean(False)
489
769
class ApproveByDefaultCmd(PropertyCmd):
490
770
propname = "ApprovedByDefault"
491
771
value_to_set = dbus.Boolean(True)
493
774
class DenyByDefaultCmd(PropertyCmd):
494
775
propname = "ApprovedByDefault"
495
776
value_to_set = dbus.Boolean(False)
497
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
779
class PropertyValueCmd(PropertyCmd):
780
"""Abstract class for PropertyCmd recieving a value as argument"""
781
def __init__(self, value):
782
self.value_to_set = value
785
class SetCheckerCmd(PropertyValueCmd):
498
786
propname = "Checker"
500
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
789
class SetHostCmd(PropertyValueCmd):
501
790
propname = "Host"
503
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
793
class SetSecretCmd(PropertyValueCmd):
504
794
propname = "Secret"
506
797
def value_to_set(self):
508
800
@value_to_set.setter
509
801
def value_to_set(self, value):
510
802
"""When setting, read data from supplied file object"""
511
803
self._vts = value.read()
514
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
807
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
808
"""Abstract class for PropertyValueCmd taking a value argument as
809
a datetime.timedelta() but should store it as milliseconds."""
812
def value_to_set(self):
816
def value_to_set(self, value):
817
"""When setting, convert value from a datetime.timedelta"""
818
self._vts = int(round(value.total_seconds() * 1000))
821
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
515
822
propname = "Timeout"
517
class SetExtendedTimeoutCmd(PropertyCmd,
518
MillisecondsValueArgumentMixIn):
825
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
519
826
propname = "ExtendedTimeout"
521
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
829
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
522
830
propname = "Interval"
524
class SetApprovalDelayCmd(PropertyCmd,
525
MillisecondsValueArgumentMixIn):
833
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
526
834
propname = "ApprovalDelay"
528
class SetApprovalDurationCmd(PropertyCmd,
529
MillisecondsValueArgumentMixIn):
837
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
530
838
propname = "ApprovalDuration"
532
def add_command_line_options(parser):
533
parser.add_argument("--version", action="version",
534
version="%(prog)s {}".format(version),
535
help="show version number and exit")
536
parser.add_argument("-a", "--all", action="store_true",
537
help="Select all clients")
538
parser.add_argument("-v", "--verbose", action="store_true",
539
help="Print all fields")
540
parser.add_argument("-j", "--dump-json", action="store_true",
541
help="Dump client data in JSON format")
542
enable_disable = parser.add_mutually_exclusive_group()
543
enable_disable.add_argument("-e", "--enable", action="store_true",
544
help="Enable client")
545
enable_disable.add_argument("-d", "--disable",
547
help="disable client")
548
parser.add_argument("-b", "--bump-timeout", action="store_true",
549
help="Bump timeout for client")
550
start_stop_checker = parser.add_mutually_exclusive_group()
551
start_stop_checker.add_argument("--start-checker",
553
help="Start checker for client")
554
start_stop_checker.add_argument("--stop-checker",
556
help="Stop checker for client")
557
parser.add_argument("-V", "--is-enabled", action="store_true",
558
help="Check if client is enabled")
559
parser.add_argument("-r", "--remove", action="store_true",
560
help="Remove client")
561
parser.add_argument("-c", "--checker",
562
help="Set checker command for client")
563
parser.add_argument("-t", "--timeout", type=string_to_delta,
564
help="Set timeout for client")
565
parser.add_argument("--extended-timeout", type=string_to_delta,
566
help="Set extended timeout for client")
567
parser.add_argument("-i", "--interval", type=string_to_delta,
568
help="Set checker interval for client")
569
approve_deny_default = parser.add_mutually_exclusive_group()
570
approve_deny_default.add_argument(
571
"--approve-by-default", action="store_true",
572
default=None, dest="approved_by_default",
573
help="Set client to be approved by default")
574
approve_deny_default.add_argument(
575
"--deny-by-default", action="store_false",
576
dest="approved_by_default",
577
help="Set client to be denied by default")
578
parser.add_argument("--approval-delay", type=string_to_delta,
579
help="Set delay before client approve/deny")
580
parser.add_argument("--approval-duration", type=string_to_delta,
581
help="Set duration of one client approval")
582
parser.add_argument("-H", "--host", help="Set host for client")
583
parser.add_argument("-s", "--secret",
584
type=argparse.FileType(mode="rb"),
585
help="Set password blob (file) for client")
586
approve_deny = parser.add_mutually_exclusive_group()
587
approve_deny.add_argument(
588
"-A", "--approve", action="store_true",
589
help="Approve any current client request")
590
approve_deny.add_argument("-D", "--deny", action="store_true",
591
help="Deny any current client request")
592
parser.add_argument("--debug", action="store_true",
593
help="Debug mode (show D-Bus commands)")
594
parser.add_argument("--check", action="store_true",
595
help="Run self-test")
596
parser.add_argument("client", nargs="*", help="Client name")
599
def commands_from_options(options):
603
if options.dump_json:
604
commands.append(DumpJSONCmd())
607
commands.append(EnableCmd())
610
commands.append(DisableCmd())
612
if options.bump_timeout:
613
commands.append(BumpTimeoutCmd())
615
if options.start_checker:
616
commands.append(StartCheckerCmd())
618
if options.stop_checker:
619
commands.append(StopCheckerCmd())
621
if options.is_enabled:
622
commands.append(IsEnabledCmd())
624
if options.checker is not None:
625
commands.append(SetCheckerCmd(options.checker))
627
if options.timeout is not None:
628
commands.append(SetTimeoutCmd(options.timeout))
630
if options.extended_timeout:
632
SetExtendedTimeoutCmd(options.extended_timeout))
634
if options.interval is not None:
635
commands.append(SetIntervalCmd(options.interval))
637
if options.approved_by_default is not None:
638
if options.approved_by_default:
639
commands.append(ApproveByDefaultCmd())
641
commands.append(DenyByDefaultCmd())
643
if options.approval_delay is not None:
644
commands.append(SetApprovalDelayCmd(options.approval_delay))
646
if options.approval_duration is not None:
648
SetApprovalDurationCmd(options.approval_duration))
650
if options.host is not None:
651
commands.append(SetHostCmd(options.host))
653
if options.secret is not None:
654
commands.append(SetSecretCmd(options.secret))
657
commands.append(ApproveCmd())
660
commands.append(DenyCmd())
663
commands.append(RemoveCmd())
665
# If no command option has been given, show table of clients,
666
# optionally verbosely
668
commands.append(PrintTableCmd(verbose=options.verbose))
673
def check_option_syntax(parser, options):
674
"""Apply additional restrictions on options, not expressible in
677
def has_actions(options):
678
return any((options.enable,
680
options.bump_timeout,
681
options.start_checker,
682
options.stop_checker,
685
options.checker is not None,
686
options.timeout is not None,
687
options.extended_timeout is not None,
688
options.interval is not None,
689
options.approved_by_default is not None,
690
options.approval_delay is not None,
691
options.approval_duration is not None,
692
options.host is not None,
693
options.secret is not None,
697
if has_actions(options) and not (options.client or options.all):
698
parser.error("Options require clients names or --all.")
699
if options.verbose and has_actions(options):
700
parser.error("--verbose can only be used alone.")
701
if options.dump_json and (options.verbose
702
or has_actions(options)):
703
parser.error("--dump-json can only be used alone.")
704
if options.all and not has_actions(options):
705
parser.error("--all requires an action.")
706
if options.is_enabled and len(options.client) > 1:
707
parser.error("--is-enabled requires exactly one client")
709
options.remove = False
710
if has_actions(options) and not options.deny:
711
parser.error("--remove can only be combined with --deny")
712
options.remove = True
716
parser = argparse.ArgumentParser()
718
add_command_line_options(parser)
720
options = parser.parse_args()
722
check_option_syntax(parser, options)
724
clientnames = options.client
727
log.setLevel(logging.DEBUG)
730
bus = dbus.SystemBus()
731
log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
733
mandos_dbus_objc = bus.get_object(busname, server_path)
734
except dbus.exceptions.DBusException:
735
log.critical("Could not connect to Mandos server")
738
mandos_serv = dbus.Interface(mandos_dbus_objc,
739
dbus_interface=server_interface)
740
mandos_serv_object_manager = dbus.Interface(
741
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
743
# Filter out log message from dbus module
744
dbus_logger = logging.getLogger("dbus.proxies")
745
class NullFilter(logging.Filter):
746
def filter(self, record):
748
dbus_filter = NullFilter()
750
dbus_logger.addFilter(dbus_filter)
751
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
752
server_path, dbus.OBJECT_MANAGER_IFACE)
753
mandos_clients = {path: ifs_and_props[client_interface]
754
for path, ifs_and_props in
755
mandos_serv_object_manager
756
.GetManagedObjects().items()
757
if client_interface in ifs_and_props}
758
except dbus.exceptions.DBusException as e:
759
log.critical("Failed to access Mandos server through D-Bus:"
763
# restore dbus logger
764
dbus_logger.removeFilter(dbus_filter)
766
# Compile dict of (clients: properties) to process
770
clients = {(log.debug("D-Bus: Connect to: (name=%r, path=%r)",
771
busname, str(path)) and False) or
772
bus.get_object(busname, path): properties
773
for path, properties in mandos_clients.items()}
775
for name in clientnames:
776
for path, client in mandos_clients.items():
777
if client["Name"] == name:
778
log.debug("D-Bus: Connect to: (name=%r, path=%r)",
780
client_objc = bus.get_object(busname, path)
781
clients[client_objc] = client
784
log.critical("Client not found on server: %r", name)
787
# Run all commands on clients
788
commands = commands_from_options(options)
789
for command in commands:
790
command.run(mandos_serv, clients)
793
class Test_milliseconds_to_string(unittest.TestCase):
795
self.assertEqual(milliseconds_to_string(93785000),
797
def test_no_days(self):
798
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
799
def test_all_zero(self):
800
self.assertEqual(milliseconds_to_string(0), "00:00:00")
801
def test_no_fractional_seconds(self):
802
self.assertEqual(milliseconds_to_string(400), "00:00:00")
803
self.assertEqual(milliseconds_to_string(900), "00:00:00")
804
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
806
842
class Test_string_to_delta(unittest.TestCase):
807
843
def test_handles_basic_rfc3339(self):
808
844
self.assertEqual(string_to_delta("PT0S"),
836
873
self.assertEqual(value, datetime.timedelta(0, 7200))
876
class Test_check_option_syntax(unittest.TestCase):
878
self.parser = argparse.ArgumentParser()
879
add_command_line_options(self.parser)
881
def test_actions_requires_client_or_all(self):
882
for action, value in self.actions.items():
883
options = self.parser.parse_args()
884
setattr(options, action, value)
885
with self.assertParseError():
886
self.check_option_syntax(options)
888
# This mostly corresponds to the definition from has_actions() in
889
# check_option_syntax()
891
# The actual values set here are not that important, but we do
892
# at least stick to the correct types, even though they are
896
"bump_timeout": True,
897
"start_checker": True,
898
"stop_checker": True,
902
"timeout": datetime.timedelta(),
903
"extended_timeout": datetime.timedelta(),
904
"interval": datetime.timedelta(),
905
"approved_by_default": True,
906
"approval_delay": datetime.timedelta(),
907
"approval_duration": datetime.timedelta(),
909
"secret": io.BytesIO(b"x"),
914
@contextlib.contextmanager
915
def assertParseError(self):
916
with self.assertRaises(SystemExit) as e:
917
with self.temporarily_suppress_stderr():
919
# Exit code from argparse is guaranteed to be "2". Reference:
920
# https://docs.python.org/3/library
921
# /argparse.html#exiting-methods
922
self.assertEqual(e.exception.code, 2)
925
@contextlib.contextmanager
926
def temporarily_suppress_stderr():
927
null = os.open(os.path.devnull, os.O_RDWR)
928
stderrcopy = os.dup(sys.stderr.fileno())
929
os.dup2(null, sys.stderr.fileno())
935
os.dup2(stderrcopy, sys.stderr.fileno())
938
def check_option_syntax(self, options):
939
check_option_syntax(self.parser, options)
941
def test_actions_conflicts_with_verbose(self):
942
for action, value in self.actions.items():
943
options = self.parser.parse_args()
944
setattr(options, action, value)
945
options.verbose = True
946
with self.assertParseError():
947
self.check_option_syntax(options)
949
def test_dump_json_conflicts_with_verbose(self):
950
options = self.parser.parse_args()
951
options.dump_json = True
952
options.verbose = True
953
with self.assertParseError():
954
self.check_option_syntax(options)
956
def test_dump_json_conflicts_with_action(self):
957
for action, value in self.actions.items():
958
options = self.parser.parse_args()
959
setattr(options, action, value)
960
options.dump_json = True
961
with self.assertParseError():
962
self.check_option_syntax(options)
964
def test_all_can_not_be_alone(self):
965
options = self.parser.parse_args()
967
with self.assertParseError():
968
self.check_option_syntax(options)
970
def test_all_is_ok_with_any_action(self):
971
for action, value in self.actions.items():
972
options = self.parser.parse_args()
973
setattr(options, action, value)
975
self.check_option_syntax(options)
977
def test_is_enabled_fails_without_client(self):
978
options = self.parser.parse_args()
979
options.is_enabled = True
980
with self.assertParseError():
981
self.check_option_syntax(options)
983
def test_is_enabled_works_with_one_client(self):
984
options = self.parser.parse_args()
985
options.is_enabled = True
986
options.client = ["foo"]
987
self.check_option_syntax(options)
989
def test_is_enabled_fails_with_two_clients(self):
990
options = self.parser.parse_args()
991
options.is_enabled = True
992
options.client = ["foo", "barbar"]
993
with self.assertParseError():
994
self.check_option_syntax(options)
996
def test_remove_can_only_be_combined_with_action_deny(self):
997
for action, value in self.actions.items():
998
if action in {"remove", "deny"}:
1000
options = self.parser.parse_args()
1001
setattr(options, action, value)
1003
options.remove = True
1004
with self.assertParseError():
1005
self.check_option_syntax(options)
1008
class Test_get_mandos_dbus_object(unittest.TestCase):
1009
def test_calls_and_returns_get_object_on_bus(self):
1010
class MockBus(object):
1012
def get_object(mockbus_self, busname, dbus_path):
1013
# Note that "self" is still the testcase instance,
1014
# this MockBus instance is in "mockbus_self".
1015
self.assertEqual(busname, dbus_busname)
1016
self.assertEqual(dbus_path, server_dbus_path)
1017
mockbus_self.called = True
1020
mockbus = get_mandos_dbus_object(bus=MockBus())
1021
self.assertIsInstance(mockbus, MockBus)
1022
self.assertTrue(mockbus.called)
1024
def test_logs_and_exits_on_dbus_error(self):
1025
class MockBusFailing(object):
1026
def get_object(self, busname, dbus_path):
1027
raise dbus.exceptions.DBusException("Test")
1029
# assertLogs only exists in Python 3.4
1030
if hasattr(self, "assertLogs"):
1031
with self.assertLogs(log, logging.CRITICAL):
1032
with self.assertRaises(SystemExit) as e:
1033
bus = get_mandos_dbus_object(bus=MockBus())
1035
critical_filter = self.CriticalFilter()
1036
log.addFilter(critical_filter)
1038
with self.assertRaises(SystemExit) as e:
1039
get_mandos_dbus_object(bus=MockBusFailing())
1041
log.removeFilter(critical_filter)
1042
self.assertTrue(critical_filter.found)
1043
if isinstance(e.exception.code, int):
1044
self.assertNotEqual(e.exception.code, 0)
1046
self.assertIsNotNone(e.exception.code)
1048
class CriticalFilter(logging.Filter):
1049
"""Don't show, but register, critical messages"""
1051
def filter(self, record):
1052
is_critical = record.levelno >= logging.CRITICAL
1053
self.found = is_critical or self.found
1054
return not is_critical
1057
class Test_get_managed_objects(unittest.TestCase):
1058
def test_calls_and_returns_GetManagedObjects(self):
1059
managed_objects = {"/clients/foo": { "Name": "foo"}}
1060
class MockObjectManager(object):
1062
def GetManagedObjects():
1063
return managed_objects
1064
retval = get_managed_objects(MockObjectManager())
1065
self.assertDictEqual(managed_objects, retval)
1067
def test_logs_and_exits_on_dbus_error(self):
1068
class MockObjectManagerFailing(object):
1070
def GetManagedObjects():
1071
raise dbus.exceptions.DBusException("Test")
1073
if hasattr(self, "assertLogs"):
1074
with self.assertLogs(log, logging.CRITICAL):
1075
with self.assertRaises(SystemExit):
1076
get_managed_objects(MockObjectManagerFailing())
1078
critical_filter = self.CriticalFilter()
1079
log.addFilter(critical_filter)
1081
with self.assertRaises(SystemExit) as e:
1082
get_managed_objects(MockObjectManagerFailing())
1084
log.removeFilter(critical_filter)
1085
self.assertTrue(critical_filter.found)
1086
if isinstance(e.exception.code, int):
1087
self.assertNotEqual(e.exception.code, 0)
1089
self.assertIsNotNone(e.exception.code)
1091
class CriticalFilter(logging.Filter):
1092
"""Don't show, but register, critical messages"""
1094
def filter(self, record):
1095
is_critical = record.levelno >= logging.CRITICAL
1096
self.found = is_critical or self.found
1097
return not is_critical
1100
class Test_SilenceLogger(unittest.TestCase):
1101
loggername = "mandos-ctl.Test_SilenceLogger"
1102
log = logging.getLogger(loggername)
1103
log.propagate = False
1104
log.addHandler(logging.NullHandler())
1107
self.counting_filter = self.CountingFilter()
1109
class CountingFilter(logging.Filter):
1110
"Count number of records"
1112
def filter(self, record):
1116
def test_should_filter_records_only_when_active(self):
1118
with SilenceLogger(self.loggername):
1119
self.log.addFilter(self.counting_filter)
1120
self.log.info("Filtered log message 1")
1121
self.log.info("Non-filtered message 2")
1122
self.log.info("Non-filtered message 3")
1124
self.log.removeFilter(self.counting_filter)
1125
self.assertEqual(self.counting_filter.count, 2)
1128
class Test_commands_from_options(unittest.TestCase):
1130
self.parser = argparse.ArgumentParser()
1131
add_command_line_options(self.parser)
1133
def test_is_enabled(self):
1134
self.assert_command_from_args(["--is-enabled", "foo"],
1137
def assert_command_from_args(self, args, command_cls,
1139
"""Assert that parsing ARGS should result in an instance of
1140
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1141
options = self.parser.parse_args(args)
1142
check_option_syntax(self.parser, options)
1143
commands = commands_from_options(options)
1144
self.assertEqual(len(commands), 1)
1145
command = commands[0]
1146
self.assertIsInstance(command, command_cls)
1147
for key, value in cmd_attrs.items():
1148
self.assertEqual(getattr(command, key), value)
1150
def test_is_enabled_short(self):
1151
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1153
def test_approve(self):
1154
self.assert_command_from_args(["--approve", "foo"],
1157
def test_approve_short(self):
1158
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1160
def test_deny(self):
1161
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1163
def test_deny_short(self):
1164
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1166
def test_remove(self):
1167
self.assert_command_from_args(["--remove", "foo"],
1170
def test_deny_before_remove(self):
1171
options = self.parser.parse_args(["--deny", "--remove",
1173
check_option_syntax(self.parser, options)
1174
commands = commands_from_options(options)
1175
self.assertEqual(len(commands), 2)
1176
self.assertIsInstance(commands[0], DenyCmd)
1177
self.assertIsInstance(commands[1], RemoveCmd)
1179
def test_deny_before_remove_reversed(self):
1180
options = self.parser.parse_args(["--remove", "--deny",
1182
check_option_syntax(self.parser, options)
1183
commands = commands_from_options(options)
1184
self.assertEqual(len(commands), 2)
1185
self.assertIsInstance(commands[0], DenyCmd)
1186
self.assertIsInstance(commands[1], RemoveCmd)
1188
def test_remove_short(self):
1189
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1191
def test_dump_json(self):
1192
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1194
def test_enable(self):
1195
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1197
def test_enable_short(self):
1198
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1200
def test_disable(self):
1201
self.assert_command_from_args(["--disable", "foo"],
1204
def test_disable_short(self):
1205
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1207
def test_bump_timeout(self):
1208
self.assert_command_from_args(["--bump-timeout", "foo"],
1211
def test_bump_timeout_short(self):
1212
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1214
def test_start_checker(self):
1215
self.assert_command_from_args(["--start-checker", "foo"],
1218
def test_stop_checker(self):
1219
self.assert_command_from_args(["--stop-checker", "foo"],
1222
def test_approve_by_default(self):
1223
self.assert_command_from_args(["--approve-by-default", "foo"],
1224
ApproveByDefaultCmd)
1226
def test_deny_by_default(self):
1227
self.assert_command_from_args(["--deny-by-default", "foo"],
1230
def test_checker(self):
1231
self.assert_command_from_args(["--checker", ":", "foo"],
1232
SetCheckerCmd, value_to_set=":")
1234
def test_checker_empty(self):
1235
self.assert_command_from_args(["--checker", "", "foo"],
1236
SetCheckerCmd, value_to_set="")
1238
def test_checker_short(self):
1239
self.assert_command_from_args(["-c", ":", "foo"],
1240
SetCheckerCmd, value_to_set=":")
1242
def test_host(self):
1243
self.assert_command_from_args(["--host", "foo.example.org",
1245
value_to_set="foo.example.org")
1247
def test_host_short(self):
1248
self.assert_command_from_args(["-H", "foo.example.org",
1250
value_to_set="foo.example.org")
1252
def test_secret_devnull(self):
1253
self.assert_command_from_args(["--secret", os.path.devnull,
1254
"foo"], SetSecretCmd,
1257
def test_secret_tempfile(self):
1258
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1259
value = b"secret\0xyzzy\nbar"
1262
self.assert_command_from_args(["--secret", f.name,
1263
"foo"], SetSecretCmd,
1266
def test_secret_devnull_short(self):
1267
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1268
SetSecretCmd, value_to_set=b"")
1270
def test_secret_tempfile_short(self):
1271
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1272
value = b"secret\0xyzzy\nbar"
1275
self.assert_command_from_args(["-s", f.name, "foo"],
1279
def test_timeout(self):
1280
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1282
value_to_set=300000)
1284
def test_timeout_short(self):
1285
self.assert_command_from_args(["-t", "PT5M", "foo"],
1287
value_to_set=300000)
1289
def test_extended_timeout(self):
1290
self.assert_command_from_args(["--extended-timeout", "PT15M",
1292
SetExtendedTimeoutCmd,
1293
value_to_set=900000)
1295
def test_interval(self):
1296
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1298
value_to_set=120000)
1300
def test_interval_short(self):
1301
self.assert_command_from_args(["-i", "PT2M", "foo"],
1303
value_to_set=120000)
1305
def test_approval_delay(self):
1306
self.assert_command_from_args(["--approval-delay", "PT30S",
1307
"foo"], SetApprovalDelayCmd,
1310
def test_approval_duration(self):
1311
self.assert_command_from_args(["--approval-duration", "PT1S",
1312
"foo"], SetApprovalDurationCmd,
1315
def test_print_table(self):
1316
self.assert_command_from_args([], PrintTableCmd,
1319
def test_print_table_verbose(self):
1320
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1323
def test_print_table_verbose_short(self):
1324
self.assert_command_from_args(["-v"], PrintTableCmd,
839
1328
class TestCmd(unittest.TestCase):
840
1329
"""Abstract class for tests of command classes"""
841
1331
def setUp(self):
843
1333
class MockClient(object):
844
1334
def __init__(self, name, **attributes):
845
self.__dbus_object_path__ = "objpath_{}".format(name)
1335
self.__dbus_object_path__ = "/clients/{}".format(name)
846
1336
self.attributes = attributes
847
1337
self.attributes["Name"] = name
849
1339
def Set(self, interface, propname, value, dbus_interface):
850
testcase.assertEqual(interface, client_interface)
1340
testcase.assertEqual(interface, client_dbus_interface)
851
1341
testcase.assertEqual(dbus_interface,
852
1342
dbus.PROPERTIES_IFACE)
853
1343
self.attributes[propname] = value
854
1344
def Get(self, interface, propname, dbus_interface):
855
testcase.assertEqual(interface, client_interface)
1345
testcase.assertEqual(interface, client_dbus_interface)
856
1346
testcase.assertEqual(dbus_interface,
857
1347
dbus.PROPERTIES_IFACE)
858
1348
return self.attributes[propname]
859
1349
def Approve(self, approve, dbus_interface):
860
testcase.assertEqual(dbus_interface, client_interface)
1350
testcase.assertEqual(dbus_interface,
1351
client_dbus_interface)
861
1352
self.calls.append(("Approve", (approve,
862
1353
dbus_interface)))
863
1354
self.client = MockClient(
903
1394
ApprovedByDefault=dbus.Boolean(False),
904
1395
LastApprovalRequest="2019-01-03T00:00:00",
905
1396
ApprovalDelay=30000,
906
ApprovalDuration=1000,
1397
ApprovalDuration=93785000,
908
1399
ExtendedTimeout=900000,
909
1400
Expires="2019-02-05T00:00:00",
910
1401
LastCheckerStatus=-2)
911
1402
self.clients = collections.OrderedDict(
913
(self.client, self.client.attributes),
914
(self.other_client, self.other_client.attributes),
1404
("/clients/foo", self.client.attributes),
1405
("/clients/barbar", self.other_client.attributes),
916
self.one_client = {self.client: self.client.attributes}
918
class TestPrintTableCmd(TestCmd):
919
def test_normal(self):
920
output = PrintTableCmd().output(self.clients.values())
921
expected_output = """
922
Name Enabled Timeout Last Successful Check
923
foo Yes 00:05:00 2019-02-03T00:00:00
924
barbar Yes 00:05:00 2019-02-04T00:00:00
926
self.assertEqual(output, expected_output)
927
def test_verbose(self):
928
output = PrintTableCmd(verbose=True).output(
929
self.clients.values())
930
expected_output = """
931
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
932
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
933
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
935
self.assertEqual(output, expected_output)
936
def test_one_client(self):
937
output = PrintTableCmd().output(self.one_client.values())
938
expected_output = """
939
Name Enabled Timeout Last Successful Check
940
foo Yes 00:05:00 2019-02-03T00:00:00
942
self.assertEqual(output, expected_output)
1407
self.one_client = {"/clients/foo": self.client.attributes}
1413
def get_object(client_bus_name, path):
1414
self.assertEqual(client_bus_name, dbus_busname)
1416
# Note: "self" here is the TestCmd instance, not
1417
# the Bus instance, since this is a static method!
1418
"/clients/foo": self.client,
1419
"/clients/barbar": self.other_client,
1424
class TestIsEnabledCmd(TestCmd):
1425
def test_is_enabled(self):
1426
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1428
for client, properties
1429
in self.clients.items()))
1431
def test_is_enabled_run_exits_successfully(self):
1432
with self.assertRaises(SystemExit) as e:
1433
IsEnabledCmd().run(self.one_client)
1434
if e.exception.code is not None:
1435
self.assertEqual(e.exception.code, 0)
1437
self.assertIsNone(e.exception.code)
1439
def test_is_enabled_run_exits_with_failure(self):
1440
self.client.attributes["Enabled"] = dbus.Boolean(False)
1441
with self.assertRaises(SystemExit) as e:
1442
IsEnabledCmd().run(self.one_client)
1443
if isinstance(e.exception.code, int):
1444
self.assertNotEqual(e.exception.code, 0)
1446
self.assertIsNotNone(e.exception.code)
1449
class TestApproveCmd(TestCmd):
1450
def test_approve(self):
1451
ApproveCmd().run(self.clients, self.bus)
1452
for clientpath in self.clients:
1453
client = self.bus.get_object(dbus_busname, clientpath)
1454
self.assertIn(("Approve", (True, client_dbus_interface)),
1458
class TestDenyCmd(TestCmd):
1459
def test_deny(self):
1460
DenyCmd().run(self.clients, self.bus)
1461
for clientpath in self.clients:
1462
client = self.bus.get_object(dbus_busname, clientpath)
1463
self.assertIn(("Approve", (False, client_dbus_interface)),
1467
class TestRemoveCmd(TestCmd):
1468
def test_remove(self):
1469
class MockMandos(object):
1472
def RemoveClient(self, dbus_path):
1473
self.calls.append(("RemoveClient", (dbus_path,)))
1474
mandos = MockMandos()
1475
super(TestRemoveCmd, self).setUp()
1476
RemoveCmd().run(self.clients, self.bus, mandos)
1477
self.assertEqual(len(mandos.calls), 2)
1478
for clientpath in self.clients:
1479
self.assertIn(("RemoveClient", (clientpath,)),
944
1483
class TestDumpJSONCmd(TestCmd):
945
1484
def setUp(self):
996
1535
return super(TestDumpJSONCmd, self).setUp()
997
1537
def test_normal(self):
998
json_data = json.loads(DumpJSONCmd().output(self.clients))
1538
output = DumpJSONCmd().output(self.clients.values())
1539
json_data = json.loads(output)
999
1540
self.assertDictEqual(json_data, self.expected_json)
1000
1542
def test_one_client(self):
1001
clients = self.one_client
1002
json_data = json.loads(DumpJSONCmd().output(clients))
1543
output = DumpJSONCmd().output(self.one_client.values())
1544
json_data = json.loads(output)
1003
1545
expected_json = {"foo": self.expected_json["foo"]}
1004
1546
self.assertDictEqual(json_data, expected_json)
1006
class TestIsEnabledCmd(TestCmd):
1007
def test_is_enabled(self):
1008
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
1009
for client, properties in self.clients.items()))
1010
def test_is_enabled_run_exits_successfully(self):
1011
with self.assertRaises(SystemExit) as e:
1012
IsEnabledCmd().run(None, self.one_client)
1013
if e.exception.code is not None:
1014
self.assertEqual(e.exception.code, 0)
1016
self.assertIsNone(e.exception.code)
1017
def test_is_enabled_run_exits_with_failure(self):
1018
self.client.attributes["Enabled"] = dbus.Boolean(False)
1019
with self.assertRaises(SystemExit) as e:
1020
IsEnabledCmd().run(None, self.one_client)
1021
if isinstance(e.exception.code, int):
1022
self.assertNotEqual(e.exception.code, 0)
1024
self.assertIsNotNone(e.exception.code)
1026
class TestRemoveCmd(TestCmd):
1027
def test_remove(self):
1028
class MockMandos(object):
1031
def RemoveClient(self, dbus_path):
1032
self.calls.append(("RemoveClient", (dbus_path,)))
1033
mandos = MockMandos()
1034
super(TestRemoveCmd, self).setUp()
1035
RemoveCmd().run(mandos, self.clients)
1036
self.assertEqual(len(mandos.calls), 2)
1037
for client in self.clients:
1038
self.assertIn(("RemoveClient",
1039
(client.__dbus_object_path__,)),
1042
class TestApproveCmd(TestCmd):
1043
def test_approve(self):
1044
ApproveCmd().run(None, self.clients)
1045
for client in self.clients:
1046
self.assertIn(("Approve", (True, client_interface)),
1049
class TestDenyCmd(TestCmd):
1050
def test_deny(self):
1051
DenyCmd().run(None, self.clients)
1052
for client in self.clients:
1053
self.assertIn(("Approve", (False, client_interface)),
1056
class TestEnableCmd(TestCmd):
1057
def test_enable(self):
1058
for client in self.clients:
1059
client.attributes["Enabled"] = False
1061
EnableCmd().run(None, self.clients)
1063
for client in self.clients:
1064
self.assertTrue(client.attributes["Enabled"])
1066
class TestDisableCmd(TestCmd):
1067
def test_disable(self):
1068
DisableCmd().run(None, self.clients)
1070
for client in self.clients:
1071
self.assertFalse(client.attributes["Enabled"])
1073
class Unique(object):
1074
"""Class for objects which exist only to be unique objects, since
1075
unittest.mock.sentinel only exists in Python 3.3"""
1549
class TestPrintTableCmd(TestCmd):
1550
def test_normal(self):
1551
output = PrintTableCmd().output(self.clients.values())
1552
expected_output = "\n".join((
1553
"Name Enabled Timeout Last Successful Check",
1554
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1555
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1557
self.assertEqual(output, expected_output)
1559
def test_verbose(self):
1560
output = PrintTableCmd(verbose=True).output(
1561
self.clients.values())
1576
"Last Successful Check ",
1577
"2019-02-03T00:00:00 ",
1578
"2019-02-04T00:00:00 ",
1581
"2019-01-02T00:00:00 ",
1582
"2019-01-03T00:00:00 ",
1594
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1596
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1600
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1601
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1603
"Check Is Running ",
1608
"2019-01-03T00:00:00 ",
1609
"2019-01-04T00:00:00 ",
1611
"Approval Is Pending ",
1615
"Approved By Default ",
1619
"Last Approval Request ",
1621
"2019-01-03T00:00:00 ",
1627
"Approval Duration ",
1632
"fping -q -- %(host)s ",
1635
"Extended Timeout ",
1640
"2019-02-04T00:00:00 ",
1641
"2019-02-05T00:00:00 ",
1643
"Last Checker Status",
1648
num_lines = max(len(rows) for rows in columns)
1649
expected_output = "\n".join("".join(rows[line]
1650
for rows in columns)
1651
for line in range(num_lines))
1652
self.assertEqual(output, expected_output)
1654
def test_one_client(self):
1655
output = PrintTableCmd().output(self.one_client.values())
1656
expected_output = "\n".join((
1657
"Name Enabled Timeout Last Successful Check",
1658
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1660
self.assertEqual(output, expected_output)
1077
1663
class TestPropertyCmd(TestCmd):
1078
1664
"""Abstract class for tests of PropertyCmd classes"""
1083
1669
self.values_to_set)
1084
1670
for value_to_set, value_to_get in zip(self.values_to_set,
1085
1671
values_to_get):
1086
for client in self.clients:
1672
for clientpath in self.clients:
1673
client = self.bus.get_object(dbus_busname, clientpath)
1087
1674
old_value = client.attributes[self.propname]
1088
self.assertNotIsInstance(old_value, Unique)
1089
client.attributes[self.propname] = Unique()
1675
self.assertNotIsInstance(old_value, self.Unique)
1676
client.attributes[self.propname] = self.Unique()
1090
1677
self.run_command(value_to_set, self.clients)
1091
for client in self.clients:
1678
for clientpath in self.clients:
1679
client = self.bus.get_object(dbus_busname, clientpath)
1092
1680
value = client.attributes[self.propname]
1093
self.assertNotIsInstance(value, Unique)
1681
self.assertNotIsInstance(value, self.Unique)
1094
1682
self.assertEqual(value, value_to_get)
1684
class Unique(object):
1685
"""Class for objects which exist only to be unique objects,
1686
since unittest.mock.sentinel only exists in Python 3.3"""
1095
1688
def run_command(self, value, clients):
1096
self.command().run(None, clients)
1689
self.command().run(clients, self.bus)
1692
class TestEnableCmd(TestPropertyCmd):
1694
propname = "Enabled"
1695
values_to_set = [dbus.Boolean(True)]
1698
class TestDisableCmd(TestPropertyCmd):
1699
command = DisableCmd
1700
propname = "Enabled"
1701
values_to_set = [dbus.Boolean(False)]
1098
1704
class TestBumpTimeoutCmd(TestPropertyCmd):
1099
1705
command = BumpTimeoutCmd
1100
1706
propname = "LastCheckedOK"
1101
1707
values_to_set = [""]
1103
1710
class TestStartCheckerCmd(TestPropertyCmd):
1104
1711
command = StartCheckerCmd
1105
1712
propname = "CheckerRunning"
1106
1713
values_to_set = [dbus.Boolean(True)]
1108
1716
class TestStopCheckerCmd(TestPropertyCmd):
1109
1717
command = StopCheckerCmd
1110
1718
propname = "CheckerRunning"
1111
1719
values_to_set = [dbus.Boolean(False)]
1113
1722
class TestApproveByDefaultCmd(TestPropertyCmd):
1114
1723
command = ApproveByDefaultCmd
1115
1724
propname = "ApprovedByDefault"
1116
1725
values_to_set = [dbus.Boolean(True)]
1118
1728
class TestDenyByDefaultCmd(TestPropertyCmd):
1119
1729
command = DenyByDefaultCmd
1120
1730
propname = "ApprovedByDefault"
1121
1731
values_to_set = [dbus.Boolean(False)]
1123
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1124
"""Abstract class for tests of PropertyCmd classes using the
1125
ValueArgumentMixIn"""
1734
class TestPropertyValueCmd(TestPropertyCmd):
1735
"""Abstract class for tests of PropertyValueCmd classes"""
1126
1737
def runTest(self):
1127
if type(self) is TestValueArgumentPropertyCmd:
1738
if type(self) is TestPropertyValueCmd:
1129
return super(TestValueArgumentPropertyCmd, self).runTest()
1740
return super(TestPropertyValueCmd, self).runTest()
1130
1742
def run_command(self, value, clients):
1131
self.command(value).run(None, clients)
1133
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1743
self.command(value).run(clients, self.bus)
1746
class TestSetCheckerCmd(TestPropertyValueCmd):
1134
1747
command = SetCheckerCmd
1135
1748
propname = "Checker"
1136
1749
values_to_set = ["", ":", "fping -q -- %s"]
1138
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1752
class TestSetHostCmd(TestPropertyValueCmd):
1139
1753
command = SetHostCmd
1140
1754
propname = "Host"
1141
1755
values_to_set = ["192.0.2.3", "foo.example.org"]
1143
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1758
class TestSetSecretCmd(TestPropertyValueCmd):
1144
1759
command = SetSecretCmd
1145
1760
propname = "Secret"
1146
1761
values_to_set = [io.BytesIO(b""),
1147
1762
io.BytesIO(b"secret\0xyzzy\nbar")]
1148
1763
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1150
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1766
class TestSetTimeoutCmd(TestPropertyValueCmd):
1151
1767
command = SetTimeoutCmd
1152
1768
propname = "Timeout"
1153
1769
values_to_set = [datetime.timedelta(),
1197
1817
datetime.timedelta(weeks=52)]
1198
1818
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1200
class Test_command_from_options(unittest.TestCase):
1202
self.parser = argparse.ArgumentParser()
1203
add_command_line_options(self.parser)
1204
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1205
"""Assert that parsing ARGS should result in an instance of
1206
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1207
options = self.parser.parse_args(args)
1208
check_option_syntax(self.parser, options)
1209
commands = commands_from_options(options)
1210
self.assertEqual(len(commands), 1)
1211
command = commands[0]
1212
self.assertIsInstance(command, command_cls)
1213
for key, value in cmd_attrs.items():
1214
self.assertEqual(getattr(command, key), value)
1215
def test_print_table(self):
1216
self.assert_command_from_args([], PrintTableCmd,
1219
def test_print_table_verbose(self):
1220
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1223
def test_print_table_verbose_short(self):
1224
self.assert_command_from_args(["-v"], PrintTableCmd,
1227
def test_enable(self):
1228
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1230
def test_enable_short(self):
1231
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1233
def test_disable(self):
1234
self.assert_command_from_args(["--disable", "foo"],
1237
def test_disable_short(self):
1238
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1240
def test_bump_timeout(self):
1241
self.assert_command_from_args(["--bump-timeout", "foo"],
1244
def test_bump_timeout_short(self):
1245
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1247
def test_start_checker(self):
1248
self.assert_command_from_args(["--start-checker", "foo"],
1251
def test_stop_checker(self):
1252
self.assert_command_from_args(["--stop-checker", "foo"],
1255
def test_remove(self):
1256
self.assert_command_from_args(["--remove", "foo"],
1259
def test_remove_short(self):
1260
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1262
def test_checker(self):
1263
self.assert_command_from_args(["--checker", ":", "foo"],
1264
SetCheckerCmd, value_to_set=":")
1266
def test_checker_empty(self):
1267
self.assert_command_from_args(["--checker", "", "foo"],
1268
SetCheckerCmd, value_to_set="")
1270
def test_checker_short(self):
1271
self.assert_command_from_args(["-c", ":", "foo"],
1272
SetCheckerCmd, value_to_set=":")
1274
def test_timeout(self):
1275
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1277
value_to_set=300000)
1279
def test_timeout_short(self):
1280
self.assert_command_from_args(["-t", "PT5M", "foo"],
1282
value_to_set=300000)
1284
def test_extended_timeout(self):
1285
self.assert_command_from_args(["--extended-timeout", "PT15M",
1287
SetExtendedTimeoutCmd,
1288
value_to_set=900000)
1290
def test_interval(self):
1291
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1293
value_to_set=120000)
1295
def test_interval_short(self):
1296
self.assert_command_from_args(["-i", "PT2M", "foo"],
1298
value_to_set=120000)
1300
def test_approve_by_default(self):
1301
self.assert_command_from_args(["--approve-by-default", "foo"],
1302
ApproveByDefaultCmd)
1304
def test_deny_by_default(self):
1305
self.assert_command_from_args(["--deny-by-default", "foo"],
1308
def test_approval_delay(self):
1309
self.assert_command_from_args(["--approval-delay", "PT30S",
1310
"foo"], SetApprovalDelayCmd,
1313
def test_approval_duration(self):
1314
self.assert_command_from_args(["--approval-duration", "PT1S",
1315
"foo"], SetApprovalDurationCmd,
1318
def test_host(self):
1319
self.assert_command_from_args(["--host", "foo.example.org",
1321
value_to_set="foo.example.org")
1323
def test_host_short(self):
1324
self.assert_command_from_args(["-H", "foo.example.org",
1326
value_to_set="foo.example.org")
1328
def test_secret_devnull(self):
1329
self.assert_command_from_args(["--secret", os.path.devnull,
1330
"foo"], SetSecretCmd,
1333
def test_secret_tempfile(self):
1334
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1335
value = b"secret\0xyzzy\nbar"
1338
self.assert_command_from_args(["--secret", f.name,
1339
"foo"], SetSecretCmd,
1342
def test_secret_devnull_short(self):
1343
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1344
SetSecretCmd, value_to_set=b"")
1346
def test_secret_tempfile_short(self):
1347
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1348
value = b"secret\0xyzzy\nbar"
1351
self.assert_command_from_args(["-s", f.name, "foo"],
1355
def test_approve(self):
1356
self.assert_command_from_args(["--approve", "foo"],
1359
def test_approve_short(self):
1360
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1362
def test_deny(self):
1363
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1365
def test_deny_short(self):
1366
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1368
def test_dump_json(self):
1369
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1371
def test_is_enabled(self):
1372
self.assert_command_from_args(["--is-enabled", "foo"],
1375
def test_is_enabled_short(self):
1376
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1378
def test_deny_before_remove(self):
1379
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1380
check_option_syntax(self.parser, options)
1381
commands = commands_from_options(options)
1382
self.assertEqual(len(commands), 2)
1383
self.assertIsInstance(commands[0], DenyCmd)
1384
self.assertIsInstance(commands[1], RemoveCmd)
1386
def test_deny_before_remove_reversed(self):
1387
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1388
check_option_syntax(self.parser, options)
1389
commands = commands_from_options(options)
1390
self.assertEqual(len(commands), 2)
1391
self.assertIsInstance(commands[0], DenyCmd)
1392
self.assertIsInstance(commands[1], RemoveCmd)
1395
class Test_check_option_syntax(unittest.TestCase):
1396
# This mostly corresponds to the definition from has_actions() in
1397
# check_option_syntax()
1399
# The actual values set here are not that important, but we do
1400
# at least stick to the correct types, even though they are
1404
"bump_timeout": True,
1405
"start_checker": True,
1406
"stop_checker": True,
1410
"timeout": datetime.timedelta(),
1411
"extended_timeout": datetime.timedelta(),
1412
"interval": datetime.timedelta(),
1413
"approved_by_default": True,
1414
"approval_delay": datetime.timedelta(),
1415
"approval_duration": datetime.timedelta(),
1417
"secret": io.BytesIO(b"x"),
1423
self.parser = argparse.ArgumentParser()
1424
add_command_line_options(self.parser)
1426
@contextlib.contextmanager
1427
def assertParseError(self):
1428
with self.assertRaises(SystemExit) as e:
1429
with self.temporarily_suppress_stderr():
1431
# Exit code from argparse is guaranteed to be "2". Reference:
1432
# https://docs.python.org/3/library/argparse.html#exiting-methods
1433
self.assertEqual(e.exception.code, 2)
1436
@contextlib.contextmanager
1437
def temporarily_suppress_stderr():
1438
null = os.open(os.path.devnull, os.O_RDWR)
1439
stderrcopy = os.dup(sys.stderr.fileno())
1440
os.dup2(null, sys.stderr.fileno())
1446
os.dup2(stderrcopy, sys.stderr.fileno())
1447
os.close(stderrcopy)
1449
def check_option_syntax(self, options):
1450
check_option_syntax(self.parser, options)
1452
def test_actions_requires_client_or_all(self):
1453
for action, value in self.actions.items():
1454
options = self.parser.parse_args()
1455
setattr(options, action, value)
1456
with self.assertParseError():
1457
self.check_option_syntax(options)
1459
def test_actions_conflicts_with_verbose(self):
1460
for action, value in self.actions.items():
1461
options = self.parser.parse_args()
1462
setattr(options, action, value)
1463
options.verbose = True
1464
with self.assertParseError():
1465
self.check_option_syntax(options)
1467
def test_dump_json_conflicts_with_verbose(self):
1468
options = self.parser.parse_args()
1469
options.dump_json = True
1470
options.verbose = True
1471
with self.assertParseError():
1472
self.check_option_syntax(options)
1474
def test_dump_json_conflicts_with_action(self):
1475
for action, value in self.actions.items():
1476
options = self.parser.parse_args()
1477
setattr(options, action, value)
1478
options.dump_json = True
1479
with self.assertParseError():
1480
self.check_option_syntax(options)
1482
def test_all_can_not_be_alone(self):
1483
options = self.parser.parse_args()
1485
with self.assertParseError():
1486
self.check_option_syntax(options)
1488
def test_all_is_ok_with_any_action(self):
1489
for action, value in self.actions.items():
1490
options = self.parser.parse_args()
1491
setattr(options, action, value)
1493
self.check_option_syntax(options)
1495
def test_is_enabled_fails_without_client(self):
1496
options = self.parser.parse_args()
1497
options.is_enabled = True
1498
with self.assertParseError():
1499
self.check_option_syntax(options)
1501
def test_is_enabled_works_with_one_client(self):
1502
options = self.parser.parse_args()
1503
options.is_enabled = True
1504
options.client = ["foo"]
1505
self.check_option_syntax(options)
1507
def test_is_enabled_fails_with_two_clients(self):
1508
options = self.parser.parse_args()
1509
options.is_enabled = True
1510
options.client = ["foo", "barbar"]
1511
with self.assertParseError():
1512
self.check_option_syntax(options)
1514
def test_remove_can_only_be_combined_with_action_deny(self):
1515
for action, value in self.actions.items():
1516
if action in {"remove", "deny"}:
1518
options = self.parser.parse_args()
1519
setattr(options, action, value)
1521
options.remove = True
1522
with self.assertParseError():
1523
self.check_option_syntax(options)
1527
1822
def should_only_run_tests():