=== added file 'mandos-monitor' --- mandos-monitor 1970-01-01 00:00:00 +0000 +++ mandos-monitor 2009-11-05 19:16:46 +0000 @@ -0,0 +1,390 @@ +#!/usr/bin/python +# -*- mode: python; coding: utf-8 -*- + +from __future__ import division, absolute_import, with_statement + +import sys +import signal + +import urwid.curses_display +import urwid + +from dbus.mainloop.glib import DBusGMainLoop +import gobject + +import dbus + +import UserList + +# Some useful constants +domain = 'se.bsnet.fukt' +server_interface = domain + '.Mandos' +client_interface = domain + '.Mandos.Client' +version = "1.0.14" + +# Always run in monochrome mode +urwid.curses_display.curses.has_colors = lambda : False + +# Urwid doesn't support blinking, but we want it. Since we have no +# use for underline on its own, we make underline also always blink. +urwid.curses_display.curses.A_UNDERLINE |= ( + urwid.curses_display.curses.A_BLINK) + +class MandosClientPropertyCache(object): + """This wraps a Mandos Client D-Bus proxy object, caches the + properties and calls a hook function when any of them are + changed. + """ + def __init__(self, proxy_object=None, properties=None, *args, + **kwargs): + # Type conversion mapping + self.type_map = { + dbus.ObjectPath: unicode, + dbus.ByteArray: str, + dbus.Signature: unicode, + dbus.Byte: chr, + dbus.Int16: int, + dbus.UInt16: int, + dbus.Int32: int, + dbus.UInt32: int, + dbus.Int64: int, + dbus.UInt64: int, + dbus.Dictionary: dict, + dbus.Array: list, + dbus.String: unicode, + dbus.Boolean: bool, + dbus.Double: float, + dbus.Struct: tuple, + } + self.proxy = proxy_object # Mandos Client proxy object + + if properties is None: + self.properties = dict() + else: + self.properties = dict(self.convert_property(prop, val) + for prop, val in + properties.iteritems()) + self.proxy.connect_to_signal("PropertyChanged", + self.property_changed, + client_interface, + byte_arrays=True) + + if properties is None: + self.properties.update( + self.convert_property(prop, val) + for prop, val in + self.proxy.GetAll(client_interface, + dbus_interface = + dbus.PROPERTIES_IFACE).iteritems()) + super(MandosClientPropertyCache, self).__init__( + proxy_object=proxy_object, + properties=properties, *args, **kwargs) + + def convert_property(self, property, value): + """This converts the arguments from a D-Bus signal, which are + D-Bus types, into normal Python types, using a conversion + function from "self.type_map". + """ + property_name = unicode(property) # Always a dbus.String + if isinstance(value, dbus.UTF8String): + # Should not happen, but prepare for it anyway + value = dbus.String(str(value).decode("utf-8")) + try: + convfunc = self.type_map[type(value)] + except KeyError: + # Unknown type, return unmodified + return property_name, value + return property_name, convfunc(value) + def property_changed(self, property=None, value=None): + """This is called whenever we get a PropertyChanged signal + It updates the changed property in the "properties" dict. + """ + # Convert name and value + property_name, cvalue = self.convert_property(property, value) + # Update properties dict with new value + self.properties[property_name] = cvalue + + +class MandosClientWidget(urwid.FlowWidget, MandosClientPropertyCache): + """A Mandos Client which is visible on the screen. + """ + + def __init__(self, server_proxy_object=None, update_hook=None, + delete_hook=None, *args, **kwargs): + # Called on update + self.update_hook = update_hook + # Called on delete + self.delete_hook = delete_hook + # Mandos Server proxy object + self.server_proxy_object = server_proxy_object + + # The widget shown normally + self._text_widget = urwid.Text("") + # The widget shown when we have focus + self._focus_text_widget = urwid.Text("") + super(MandosClientWidget, self).__init__( + update_hook=update_hook, delete_hook=delete_hook, + *args, **kwargs) + self.update() + self.opened = False + + def selectable(self): + """Make this a "selectable" widget. + This overrides the method from urwid.FlowWidget.""" + return True + + def rows(self, (maxcol,), focus=False): + """How many rows this widget will occupy might depend on + whether we have focus or not. + This overrides the method from urwid.FlowWidget""" + return self.current_widget(focus).rows((maxcol,), focus=focus) + + def current_widget(self, focus=False): + if focus or self.opened: + return self._focus_widget + return self._widget + + def update(self): + "Called when what is visible on the screen should be updated." + # How to add standout mode to a style + with_standout = { u"normal": u"standout", + u"bold": u"bold-standout", + u"underline-blink": + u"underline-blink-standout", + u"bold-underline-blink": + u"bold-underline-blink-standout", + } + + # Rebuild focus and non-focus widgets using current properties + self._text = (u'name="%(name)s", enabled=%(enabled)s' + % self.properties) + if not urwid.supports_unicode(): + self._text = self._text.encode("ascii", "replace") + textlist = [(u"normal", u"BLÄRGH: "), (u"bold", self._text)] + self._text_widget.set_text(textlist) + self._focus_text_widget.set_text([(with_standout[text[0]], + text[1]) + if isinstance(text, tuple) + else text + for text in textlist]) + self._widget = self._text_widget + self._focus_widget = urwid.AttrWrap(self._focus_text_widget, + "standout") + # Run update hook, if any + if self.update_hook is not None: + self.update_hook() + + def delete(self): + if self.delete_hook is not None: + self.delete_hook(self) + + def render(self, (maxcol,), focus=False): + """Render differently if we have focus. + This overrides the method from urwid.FlowWidget""" + return self.current_widget(focus).render((maxcol,), + focus=focus) + + def keypress(self, (maxcol,), key): + """Handle keys. + This overrides the method from urwid.FlowWidget""" + if key == u"e" or key == u"+": + self.proxy.Enable() + elif key == u"d" or key == u"-": + self.proxy.Disable() + elif key == u"r" or key == u"_": + self.server_proxy_object.RemoveClient(self.proxy + .object_path) + elif key == u"s": + self.proxy.StartChecker() + elif key == u"c": + self.proxy.StopChecker() + elif key == u"S": + self.proxy.CheckedOK() + # xxx +# elif key == u"p" or key == "=": +# self.proxy.pause() +# elif key == u"u" or key == ":": +# self.proxy.unpause() +# elif key == u"RET": +# self.open() + else: + return key + + def property_changed(self, property=None, value=None, + *args, **kwargs): + """Call self.update() if old value is not new value. + This overrides the method from MandosClientPropertyCache""" + property_name = unicode(property) + old_value = self.properties.get(property_name) + super(MandosClientWidget, self).property_changed( + property=property, value=value, *args, **kwargs) + if self.properties.get(property_name) != old_value: + self.update() + + +class UserInterface(object): + """This is the entire user interface - the whole screen + with boxes, lists of client widgets, etc. + """ + def __init__(self): + DBusGMainLoop(set_as_default=True ) + + self.screen = urwid.curses_display.Screen() + + self.screen.register_palette(( + (u"normal", + u"default", u"default", None), + (u"bold", + u"default", u"default", u"bold"), + (u"underline-blink", + u"default", u"default", u"underline"), + (u"standout", + u"default", u"default", u"standout"), + (u"bold-underline-blink", + u"default", u"default", (u"bold", u"underline")), + (u"bold-standout", + u"default", u"default", (u"bold", u"standout")), + (u"underline-blink-standout", + u"default", u"default", (u"underline", u"standout")), + (u"bold-underline-blink-standout", + u"default", u"default", (u"bold", u"underline", + u"standout")), + )) + + self.screen.start() + + self.size = self.screen.get_cols_rows() + + self.clients = urwid.SimpleListWalker([]) + self.clients_dict = {} + self.topwidget = urwid.LineBox(urwid.ListBox(self.clients)) + #self.topwidget = urwid.ListBox(clients) + + self.busname = domain + '.Mandos' + self.main_loop = gobject.MainLoop() + self.bus = dbus.SystemBus() + mandos_dbus_objc = self.bus.get_object( + self.busname, u"/", follow_name_owner_changes=True) + self.mandos_serv = dbus.Interface(mandos_dbus_objc, + dbus_interface + = server_interface) + try: + mandos_clients = (self.mandos_serv + .GetAllClientsWithProperties()) + except dbus.exceptions.DBusException: + mandos_clients = dbus.Dictionary() + + (self.mandos_serv + .connect_to_signal("ClientRemoved", + self.find_and_remove_client, + dbus_interface=server_interface, + byte_arrays=True)) + (self.mandos_serv + .connect_to_signal("ClientAdded", + self.add_new_client, + dbus_interface=server_interface, + byte_arrays=True)) + for path, client in (mandos_clients.iteritems()): + client_proxy_object = self.bus.get_object(self.busname, + path) + self.add_client(MandosClientWidget(server_proxy_object + =self.mandos_serv, + proxy_object + =client_proxy_object, + properties=client, + update_hook + =self.refresh, + delete_hook + =self.remove_client), + path=path) + + def find_and_remove_client(self, path, name): + """Find an client from its object path and remove it. + + This is connected to the ClientRemoved signal from the + Mandos server object.""" + try: + client = self.clients_dict[path] + except KeyError: + # not found? + return + self.remove_client(client, path) + + def add_new_client(self, path, properties): + client_proxy_object = self.bus.get_object(self.busname, path) + self.add_client(MandosClientWidget(server_proxy_object + =self.mandos_serv, + proxy_object + =client_proxy_object, + properties=properties, + update_hook + =self.refresh, + delete_hook + =self.remove_client), + path=path) + + def add_client(self, client, path=None): + self.clients.append(client) + if path is None: + path = client.proxy.object_path + self.clients_dict[path] = client + self.clients.sort(None, lambda c: c.properties[u"name"]) + self.refresh() + + def remove_client(self, client, path=None): + self.clients.remove(client) + if path is None: + path = client.proxy.object_path + del self.clients_dict[path] + self.refresh() + + def refresh(self): + """Redraw the screen""" + canvas = self.topwidget.render(self.size, focus=True) + self.screen.draw_screen(self.size, canvas) + + def run(self): + """Start the main loop and exit when it's done.""" + self.refresh() + self._input_callback_tag = (gobject.io_add_watch + (sys.stdin.fileno(), + gobject.IO_IN, + self.process_input)) + self.main_loop.run() + # Main loop has finished, we should close everything now + gobject.source_remove(self._input_callback_tag) + self.screen.stop() + + def stop(self): + self.main_loop.quit() + + def process_input(self, source, condition): + keys = self.screen.get_input() + translations = { u"j": u"down", + u"k": u"up", + } + for key in keys: + try: + key = translations[key] + except KeyError: # :-) + pass + + if key == u"q" or key == u"Q": + self.stop() + break + elif key == u"window resize": + self.size = self.screen.get_cols_rows() + self.refresh() + elif key == " ": + self.refresh() + elif self.topwidget.selectable(): + self.topwidget.keypress(self.size, key) + self.refresh() + return True + +ui = UserInterface() +try: + ui.run() +except: + ui.screen.stop() + raise