230
def print_clients(clients, keywords):
231
def valuetostring(value, keyword):
232
if type(value) is dbus.Boolean:
233
return "Yes" if value else "No"
234
if keyword in ("Timeout", "Interval", "ApprovalDelay",
235
"ApprovalDuration", "ExtendedTimeout"):
236
return milliseconds_to_string(value)
239
# Create format string to print table rows
240
format_string = " ".join("{{{key}:{width}}}".format(
241
width = max(len(tablewords[key]),
242
max(len(valuetostring(client[key], key))
243
for client in clients)),
247
print(format_string.format(**tablewords))
248
for client in clients:
249
print(format_string.format(**{
250
key: valuetostring(client[key], key)
251
for key in keywords }))
271
## Classes for commands.
273
# Abstract classes first
274
class Command(object):
275
"""Abstract class for commands"""
276
def run(self, mandos, clients):
277
"""Normal commands should implement run_on_one_client(), but
278
commands which want to operate on all clients at the same time
279
can override this run() method instead."""
281
for client in clients:
282
self.run_on_one_client(client)
284
class PrintCmd(Command):
285
"""Abstract class for commands printing client details"""
286
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
287
"Created", "Interval", "Host", "KeyID",
288
"Fingerprint", "CheckerRunning", "LastEnabled",
289
"ApprovalPending", "ApprovedByDefault",
290
"LastApprovalRequest", "ApprovalDelay",
291
"ApprovalDuration", "Checker", "ExtendedTimeout",
292
"Expires", "LastCheckerStatus")
293
def run(self, mandos, clients):
294
print(self.output(clients))
296
class PropertyCmd(Command):
297
"""Abstract class for Actions for setting one client property"""
298
def run_on_one_client(self, client):
299
"""Set the Client's D-Bus property"""
300
client.Set(client_interface, self.property, self.value_to_set,
301
dbus_interface=dbus.PROPERTIES_IFACE)
303
class ValueArgumentMixIn(object):
304
"""Mixin class for commands taking a value as argument"""
305
def __init__(self, value):
306
self.value_to_set = value
308
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
309
"""Mixin class for commands taking a value argument as
312
def value_to_set(self):
315
def value_to_set(self, value):
316
"""When setting, convert value to a datetime.timedelta"""
317
self._vts = string_to_delta(value).total_seconds() * 1000
319
# Actual (non-abstract) command classes
321
class PrintTableCmd(PrintCmd):
322
def __init__(self, verbose=False):
323
self.verbose = verbose
325
def output(self, clients):
327
keywords = self.all_keywords
329
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
330
return str(self.TableOfClients(clients.values(), keywords))
332
class TableOfClients(object):
335
"Enabled": "Enabled",
336
"Timeout": "Timeout",
337
"LastCheckedOK": "Last Successful Check",
338
"LastApprovalRequest": "Last Approval Request",
339
"Created": "Created",
340
"Interval": "Interval",
342
"Fingerprint": "Fingerprint",
344
"CheckerRunning": "Check Is Running",
345
"LastEnabled": "Last Enabled",
346
"ApprovalPending": "Approval Is Pending",
347
"ApprovedByDefault": "Approved By Default",
348
"ApprovalDelay": "Approval Delay",
349
"ApprovalDuration": "Approval Duration",
350
"Checker": "Checker",
351
"ExtendedTimeout": "Extended Timeout",
352
"Expires": "Expires",
353
"LastCheckerStatus": "Last Checker Status",
356
def __init__(self, clients, keywords, tableheaders=None):
357
self.clients = clients
358
self.keywords = keywords
359
if tableheaders is not None:
360
self.tableheaders = tableheaders
363
return "\n".join(self.rows())
365
if sys.version_info.major == 2:
366
__unicode__ = __str__
368
return str(self).encode(locale.getpreferredencoding())
371
format_string = self.row_formatting_string()
372
rows = [self.header_line(format_string)]
373
rows.extend(self.client_line(client, format_string)
374
for client in self.clients)
377
def row_formatting_string(self):
378
"Format string used to format table rows"
379
return " ".join("{{{key}:{width}}}".format(
380
width=max(len(self.tableheaders[key]),
381
*(len(self.string_from_client(client, key))
382
for client in self.clients)),
384
for key in self.keywords)
386
def string_from_client(self, client, key):
387
return self.valuetostring(client[key], key)
390
def valuetostring(value, keyword):
391
if isinstance(value, dbus.Boolean):
392
return "Yes" if value else "No"
393
if keyword in ("Timeout", "Interval", "ApprovalDelay",
394
"ApprovalDuration", "ExtendedTimeout"):
395
return milliseconds_to_string(value)
398
def header_line(self, format_string):
399
return format_string.format(**self.tableheaders)
401
def client_line(self, client, format_string):
402
return format_string.format(
403
**{key: self.string_from_client(client, key)
404
for key in self.keywords})
408
class DumpJSONCmd(PrintCmd):
409
def output(self, clients):
410
data = {client["Name"]:
411
{key: self.dbus_boolean_to_bool(client[key])
412
for key in self.all_keywords}
413
for client in clients.values()}
414
return json.dumps(data, indent=4, separators=(',', ': '))
416
def dbus_boolean_to_bool(value):
417
if isinstance(value, dbus.Boolean):
421
class IsEnabledCmd(Command):
422
def run_on_one_client(self, client):
423
if self.is_enabled(client):
426
def is_enabled(self, client):
427
return client.Get(client_interface, "Enabled",
428
dbus_interface=dbus.PROPERTIES_IFACE)
430
class RemoveCmd(Command):
431
def run_on_one_client(self, client):
432
self.mandos.RemoveClient(client.__dbus_object_path__)
434
class ApproveCmd(Command):
435
def run_on_one_client(self, client):
436
client.Approve(dbus.Boolean(True),
437
dbus_interface=client_interface)
439
class DenyCmd(Command):
440
def run_on_one_client(self, client):
441
client.Approve(dbus.Boolean(False),
442
dbus_interface=client_interface)
444
class EnableCmd(PropertyCmd):
446
value_to_set = dbus.Boolean(True)
448
class DisableCmd(PropertyCmd):
450
value_to_set = dbus.Boolean(False)
452
class BumpTimeoutCmd(PropertyCmd):
453
property = "LastCheckedOK"
456
class StartCheckerCmd(PropertyCmd):
457
property = "CheckerRunning"
458
value_to_set = dbus.Boolean(True)
460
class StopCheckerCmd(PropertyCmd):
461
property = "CheckerRunning"
462
value_to_set = dbus.Boolean(False)
464
class ApproveByDefaultCmd(PropertyCmd):
465
property = "ApprovedByDefault"
466
value_to_set = dbus.Boolean(True)
468
class DenyByDefaultCmd(PropertyCmd):
469
property = "ApprovedByDefault"
470
value_to_set = dbus.Boolean(False)
472
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
475
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
478
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
481
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
484
class SetExtendedTimeoutCmd(PropertyCmd,
485
MillisecondsValueArgumentMixIn):
486
property = "ExtendedTimeout"
488
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
489
property = "Interval"
491
class SetApprovalDelayCmd(PropertyCmd,
492
MillisecondsValueArgumentMixIn):
493
property = "ApprovalDelay"
495
class SetApprovalDurationCmd(PropertyCmd,
496
MillisecondsValueArgumentMixIn):
497
property = "ApprovalDuration"
254
499
def has_actions(options):
255
500
return any((options.enable,
319
574
parser.add_argument("-s", "--secret",
320
575
type=argparse.FileType(mode="rb"),
321
576
help="Set password blob (file) for client")
322
parser.add_argument("-A", "--approve", action="store_true",
323
help="Approve any current client request")
324
parser.add_argument("-D", "--deny", action="store_true",
325
help="Deny any current client request")
577
approve_deny = parser.add_mutually_exclusive_group()
578
approve_deny.add_argument(
579
"-A", "--approve", action="store_true",
580
help="Approve any current client request")
581
approve_deny.add_argument("-D", "--deny", action="store_true",
582
help="Deny any current client request")
326
583
parser.add_argument("--check", action="store_true",
327
584
help="Run self-test")
328
585
parser.add_argument("client", nargs="*", help="Client name")
329
options = parser.parse_args()
586
options = parser.parse_args(args=args)
331
588
if has_actions(options) and not (options.client or options.all):
332
589
parser.error("Options require clients names or --all.")
333
590
if options.verbose and has_actions(options):
334
591
parser.error("--verbose can only be used alone.")
335
if options.dump_json and (options.verbose or has_actions(options)):
592
if options.dump_json and (options.verbose
593
or has_actions(options)):
336
594
parser.error("--dump-json can only be used alone.")
337
595
if options.all and not has_actions(options):
338
596
parser.error("--all requires an action.")
341
fail_count, test_count = doctest.testmod()
342
sys.exit(os.EX_OK if fail_count == 0 else 1)
597
if options.is_enabled and len(options.client) > 1:
598
parser.error("--is-enabled requires exactly one client")
602
if options.dump_json:
603
commands.append(DumpJSONCmd())
606
commands.append(EnableCmd())
609
commands.append(DisableCmd())
611
if options.bump_timeout:
612
commands.append(BumpTimeoutCmd(options.bump_timeout))
614
if options.start_checker:
615
commands.append(StartCheckerCmd())
617
if options.stop_checker:
618
commands.append(StopCheckerCmd())
620
if options.is_enabled:
621
commands.append(IsEnabledCmd())
624
commands.append(RemoveCmd())
626
if options.checker is not None:
627
commands.append(SetCheckerCmd())
629
if options.timeout is not None:
630
commands.append(SetTimeoutCmd(options.timeout))
632
if options.extended_timeout:
634
SetExtendedTimeoutCmd(options.extended_timeout))
636
if options.interval is not None:
637
command.append(SetIntervalCmd(options.interval))
639
if options.approved_by_default is not None:
640
if options.approved_by_default:
641
command.append(ApproveByDefaultCmd())
643
command.append(DenyByDefaultCmd())
645
if options.approval_delay is not None:
646
command.append(SetApprovalDelayCmd(options.approval_delay))
648
if options.approval_duration is not None:
650
SetApprovalDurationCmd(options.approval_duration))
652
if options.host is not None:
653
command.append(SetHostCmd(options.host))
655
if options.secret is not None:
656
command.append(SetSecretCmd(options.secret))
659
commands.append(ApproveCmd())
662
commands.append(DenyCmd())
664
# If no command option has been given, show table of clients,
665
# optionally verbosely
667
commands.append(PrintTableCmd(verbose=options.verbose))
669
return commands, options.client
673
commands, clientnames = commands_and_clients_from_options()
345
676
bus = dbus.SystemBus()
346
677
mandos_dbus_objc = bus.get_object(busname, server_path)
347
678
except dbus.exceptions.DBusException:
348
print("Could not connect to Mandos server", file=sys.stderr)
679
log.critical("Could not connect to Mandos server")
351
682
mandos_serv = dbus.Interface(mandos_dbus_objc,
352
dbus_interface = server_interface)
683
dbus_interface=server_interface)
353
684
mandos_serv_object_manager = dbus.Interface(
354
mandos_dbus_objc, dbus_interface = dbus.OBJECT_MANAGER_IFACE)
356
#block stderr since dbus library prints to stderr
357
null = os.open(os.path.devnull, os.O_RDWR)
358
stderrcopy = os.dup(sys.stderr.fileno())
359
os.dup2(null, sys.stderr.fileno())
685
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
687
# Filter out log message from dbus module
688
dbus_logger = logging.getLogger("dbus.proxies")
689
class NullFilter(logging.Filter):
690
def filter(self, record):
692
dbus_filter = NullFilter()
693
dbus_logger.addFilter(dbus_filter)
363
mandos_clients = { path: ifs_and_props[client_interface]
364
for path, ifs_and_props in
365
mandos_serv_object_manager
366
.GetManagedObjects().items()
367
if client_interface in ifs_and_props }
696
mandos_clients = {path: ifs_and_props[client_interface]
697
for path, ifs_and_props in
698
mandos_serv_object_manager
699
.GetManagedObjects().items()
700
if client_interface in ifs_and_props}
370
os.dup2(stderrcopy, sys.stderr.fileno())
702
# restore dbus logger
703
dbus_logger.removeFilter(dbus_filter)
372
704
except dbus.exceptions.DBusException as e:
373
print("Access denied: Accessing mandos server through D-Bus: {}"
374
.format(e), file=sys.stderr)
705
log.critical("Failed to access Mandos server through D-Bus:"
377
709
# Compile dict of (clients: properties) to process
380
if options.all or not options.client:
381
clients = { bus.get_object(busname, path): properties
382
for path, properties in mandos_clients.items() }
713
clients = {bus.get_object(busname, path): properties
714
for path, properties in mandos_clients.items()}
384
for name in options.client:
716
for name in clientnames:
385
717
for path, client in mandos_clients.items():
386
718
if client["Name"] == name:
387
719
client_objc = bus.get_object(busname, path)
388
720
clients[client_objc] = client
391
print("Client not found on server: {!r}"
392
.format(name), file=sys.stderr)
723
log.critical("Client not found on server: %r", name)
395
if not has_actions(options) and clients:
396
if options.verbose or options.dump_json:
397
keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
398
"Created", "Interval", "Host", "Fingerprint",
399
"CheckerRunning", "LastEnabled",
400
"ApprovalPending", "ApprovedByDefault",
401
"LastApprovalRequest", "ApprovalDelay",
402
"ApprovalDuration", "Checker",
405
keywords = defaultkeywords
407
if options.dump_json:
408
json.dump({client["Name"]: {key: client[key]
409
for key in keywords }
410
for client in clients.values() },
411
fp = sys.stdout, indent = 4,
412
separators = (',', ': '))
415
print_clients(clients.values(), keywords)
417
# Process each client in the list by all selected options
418
for client in clients:
420
def set_client_prop(prop, value):
421
"""Set a Client D-Bus property"""
422
client.Set(client_interface, prop, value,
423
dbus_interface=dbus.PROPERTIES_IFACE)
425
def set_client_prop_ms(prop, value):
426
"""Set a Client D-Bus property, converted
427
from a string to milliseconds."""
428
set_client_prop(prop,
429
string_to_delta(value).total_seconds()
433
mandos_serv.RemoveClient(client.__dbus_object_path__)
435
set_client_prop("Enabled", dbus.Boolean(True))
437
set_client_prop("Enabled", dbus.Boolean(False))
438
if options.bump_timeout:
439
set_client_prop("LastCheckedOK", "")
440
if options.start_checker:
441
set_client_prop("CheckerRunning", dbus.Boolean(True))
442
if options.stop_checker:
443
set_client_prop("CheckerRunning", dbus.Boolean(False))
444
if options.is_enabled:
445
sys.exit(0 if client.Get(client_interface,
448
dbus.PROPERTIES_IFACE)
450
if options.checker is not None:
451
set_client_prop("Checker", options.checker)
452
if options.host is not None:
453
set_client_prop("Host", options.host)
454
if options.interval is not None:
455
set_client_prop_ms("Interval", options.interval)
456
if options.approval_delay is not None:
457
set_client_prop_ms("ApprovalDelay",
458
options.approval_delay)
459
if options.approval_duration is not None:
460
set_client_prop_ms("ApprovalDuration",
461
options.approval_duration)
462
if options.timeout is not None:
463
set_client_prop_ms("Timeout", options.timeout)
464
if options.extended_timeout is not None:
465
set_client_prop_ms("ExtendedTimeout",
466
options.extended_timeout)
467
if options.secret is not None:
468
set_client_prop("Secret",
469
dbus.ByteArray(options.secret.read()))
470
if options.approved_by_default is not None:
471
set_client_prop("ApprovedByDefault",
473
.approved_by_default))
475
client.Approve(dbus.Boolean(True),
476
dbus_interface=client_interface)
478
client.Approve(dbus.Boolean(False),
479
dbus_interface=client_interface)
726
# Run all commands on clients
727
for command in commands:
728
command.run(mandos_serv, clients)
731
class Test_milliseconds_to_string(unittest.TestCase):
733
self.assertEqual(milliseconds_to_string(93785000),
735
def test_no_days(self):
736
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
737
def test_all_zero(self):
738
self.assertEqual(milliseconds_to_string(0), "00:00:00")
739
def test_no_fractional_seconds(self):
740
self.assertEqual(milliseconds_to_string(400), "00:00:00")
741
self.assertEqual(milliseconds_to_string(900), "00:00:00")
742
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
744
class Test_string_to_delta(unittest.TestCase):
745
def test_handles_basic_rfc3339(self):
746
self.assertEqual(string_to_delta("PT2H"),
747
datetime.timedelta(0, 7200))
748
def test_falls_back_to_pre_1_6_1_with_warning(self):
749
# assertLogs only exists in Python 3.4
750
if hasattr(self, "assertLogs"):
751
with self.assertLogs(log, logging.WARNING):
752
value = string_to_delta("2h")
754
class WarningFilter(logging.Filter):
755
"""Don't show, but record the presence of, warnings"""
756
def filter(self, record):
757
is_warning = record.levelno >= logging.WARNING
758
self.found = is_warning or getattr(self, "found",
760
return not is_warning
761
warning_filter = WarningFilter()
762
log.addFilter(warning_filter)
764
value = string_to_delta("2h")
766
log.removeFilter(warning_filter)
767
self.assertTrue(getattr(warning_filter, "found", False))
768
self.assertEqual(value, datetime.timedelta(0, 7200))
771
class TestCmd(unittest.TestCase):
772
"""Abstract class for tests of command classes"""
775
class MockClient(object):
776
def __init__(self, name, **attributes):
777
self.__dbus_object_path__ = "objpath_{}".format(name)
778
self.attributes = attributes
779
self.attributes["Name"] = name
780
def Set(interface, property, value,
781
properties_interface):
782
testcase.assertEqual(interface, client_interface)
783
testcase.assertEqual(properties_interface,
784
dbus.PROPERTIES_IFACE)
785
self.attributes[property] = value
786
def Get(interface, property, properties_interface):
787
testcase.assertEqual(interface, client_interface)
788
testcase.assertEqual(properties_interface,
789
dbus.PROPERTIES_IFACE)
790
return self.attributes[property]
791
def __getitem__(self, key):
792
return self.attributes[key]
793
self.clients = collections.OrderedDict([
797
KeyID=("92ed150794387c03ce684574b1139a65"
798
"94a34f895daaaf09fd8ea90a27cddb12"),
800
Host="foo.example.org",
801
Enabled=dbus.Boolean(True),
803
LastCheckedOK="2019-02-03T00:00:00",
804
Created="2019-01-02T00:00:00",
806
Fingerprint=("778827225BA7DE539C5A"
807
"7CFA59CFF7CDBD9A5920"),
808
CheckerRunning=dbus.Boolean(False),
809
LastEnabled="2019-01-03T00:00:00",
810
ApprovalPending=dbus.Boolean(False),
811
ApprovedByDefault=dbus.Boolean(True),
812
LastApprovalRequest="",
814
ApprovalDuration=1000,
815
Checker="fping -q -- %(host)s",
816
ExtendedTimeout=900000,
817
Expires="2019-02-04T00:00:00",
818
LastCheckerStatus=0)),
822
KeyID=("0558568eedd67d622f5c83b35a115f79"
823
"6ab612cff5ad227247e46c2b020f441c"),
826
Enabled=dbus.Boolean(True),
828
LastCheckedOK="2019-02-04T00:00:00",
829
Created="2019-01-03T00:00:00",
831
Fingerprint=("3E393AEAEFB84C7E89E2"
832
"F547B3A107558FCA3A27"),
833
CheckerRunning=dbus.Boolean(True),
834
LastEnabled="2019-01-04T00:00:00",
835
ApprovalPending=dbus.Boolean(False),
836
ApprovedByDefault=dbus.Boolean(False),
837
LastApprovalRequest="2019-01-03T00:00:00",
839
ApprovalDuration=1000,
841
ExtendedTimeout=900000,
842
Expires="2019-02-05T00:00:00",
843
LastCheckerStatus=-2)),
846
class TestPrintTableCmd(TestCmd):
847
def test_normal(self):
848
output = PrintTableCmd().output(self.clients)
849
expected_output = """
850
Name Enabled Timeout Last Successful Check
851
foo Yes 00:05:00 2019-02-03T00:00:00
852
barbar Yes 00:05:00 2019-02-04T00:00:00
854
self.assertEqual(output, expected_output)
855
def test_verbose(self):
856
output = PrintTableCmd(verbose=True).output(self.clients)
857
expected_output = """
858
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
859
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
860
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
862
self.assertEqual(output, expected_output)
863
def test_one_client(self):
864
output = PrintTableCmd().output({"foo": self.clients["foo"]})
865
expected_output = """
866
Name Enabled Timeout Last Successful Check
867
foo Yes 00:05:00 2019-02-03T00:00:00
869
self.assertEqual(output, expected_output)
871
class TestDumpJSONCmd(TestCmd):
873
self.expected_json = {
876
"KeyID": ("92ed150794387c03ce684574b1139a65"
877
"94a34f895daaaf09fd8ea90a27cddb12"),
878
"Host": "foo.example.org",
881
"LastCheckedOK": "2019-02-03T00:00:00",
882
"Created": "2019-01-02T00:00:00",
884
"Fingerprint": ("778827225BA7DE539C5A"
885
"7CFA59CFF7CDBD9A5920"),
886
"CheckerRunning": False,
887
"LastEnabled": "2019-01-03T00:00:00",
888
"ApprovalPending": False,
889
"ApprovedByDefault": True,
890
"LastApprovalRequest": "",
892
"ApprovalDuration": 1000,
893
"Checker": "fping -q -- %(host)s",
894
"ExtendedTimeout": 900000,
895
"Expires": "2019-02-04T00:00:00",
896
"LastCheckerStatus": 0,
900
"KeyID": ("0558568eedd67d622f5c83b35a115f79"
901
"6ab612cff5ad227247e46c2b020f441c"),
905
"LastCheckedOK": "2019-02-04T00:00:00",
906
"Created": "2019-01-03T00:00:00",
908
"Fingerprint": ("3E393AEAEFB84C7E89E2"
909
"F547B3A107558FCA3A27"),
910
"CheckerRunning": True,
911
"LastEnabled": "2019-01-04T00:00:00",
912
"ApprovalPending": False,
913
"ApprovedByDefault": False,
914
"LastApprovalRequest": "2019-01-03T00:00:00",
915
"ApprovalDelay": 30000,
916
"ApprovalDuration": 1000,
918
"ExtendedTimeout": 900000,
919
"Expires": "2019-02-05T00:00:00",
920
"LastCheckerStatus": -2,
923
return super(TestDumpJSONCmd, self).setUp()
924
def test_normal(self):
925
json_data = json.loads(DumpJSONCmd().output(self.clients))
926
self.assertDictEqual(json_data, self.expected_json)
927
def test_one_client(self):
928
clients = {"foo": self.clients["foo"]}
929
json_data = json.loads(DumpJSONCmd().output(clients))
930
expected_json = {"foo": self.expected_json["foo"]}
931
self.assertDictEqual(json_data, expected_json)
934
def should_only_run_tests():
935
parser = argparse.ArgumentParser(add_help=False)
936
parser.add_argument("--check", action='store_true')
937
args, unknown_args = parser.parse_known_args()
938
run_tests = args.check
940
# Remove --check argument from sys.argv
941
sys.argv[1:] = unknown_args
944
# Add all tests from doctest strings
945
def load_tests(loader, tests, none):
947
tests.addTests(doctest.DocTestSuite())
482
950
if __name__ == "__main__":
951
if should_only_run_tests():
952
# Call using ./tdd-python-script --check [--verbose]