gajim-plural/scripts/gajim-remote.py

489 lines
13 KiB
Python
Executable File

#!/bin/sh
''':'
exec python -OOt "$0" ${1+"$@"}
' '''
## scripts/gajim-remote.py
##
## Gajim Team:
## - Yann Le Boulanger <asterix@lagaule.org>
## - Vincent Hanquez <tab@snarc.org>
## - Nikos Kouremenos <kourem@gmail.com>
## - Dimitur Kirov <dkirov@gmail.com>
##
## This file was initially written by Dimitur Kirov
##
## Copyright (C) 2003-2005 Gajim Team
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published
## by the Free Software Foundation; version 2 only.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
# gajim-remote help will show you the DBUS API of Gajim
import sys
import gtk
import gobject
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL) # ^C exits the application
import i18n
_ = i18n._
i18n.init()
try:
import dbus
except:
send_error('Dbus is not supported.\n')
_version = getattr(dbus, 'version', (0, 20, 0))
if _version[1] >= 41:
import dbus.service
import dbus.glib
OBJ_PATH = '/org/gajim/dbus/RemoteObject'
INTERFACE = 'org.gajim.dbus.RemoteInterface'
SERVICE = 'org.gajim.dbus'
BASENAME = 'gajim-remote'
class GajimRemote:
def __init__(self):
self.argv_len = len(sys.argv)
# define commands dict. Prototype :
# {
# 'command': [comment, [list of arguments] ]
# }
#
# each argument is defined as a tuple:
# (argument name, help on argument, is mandatory)
#
self.commands = {
'help':[
_('show a help on specific command'),
[
#parameter, named "on_command". User gets help for the command, specified by this parameter
(_('on_command'),
_('show help on command'), False)
]
],
'toggle_roster_appearance' : [
_('Shows or hides the roster window'),
[]
],
'show_next_unread': [
_('Popup a window with the next unread message'),
[]
],
'list_contacts': [
_('Print a list of all contacts in the roster. \
Each contact appear on a separate line'),
[
(_('account'), _('show only contacts of the \
given account'), False)
]
],
'list_accounts': [
_('Print a list of registered accounts'),
[]
],
'change_status': [
_('Change the status of account or accounts'),
[
(_('status'), _('one of: offline, online, chat, away, \
xa, dnd, invisible '), True),
(_('message'), _('status message'), False),
(_('account'), _('change status of account "account". \
If not specified, try to change status of all accounts that \
have "sync with global status" option set'), False)
]
],
'open_chat': [
_('Show the chat dialog so that you can send message to a \
contact'),
[
('jid', _('jid of the contact that you want to chat \
with'),
True),
(_('account'), _('if specified, contact is taken from \
the contact list of this account'), False)
]
],
'send_message':[
_('Send new message to a contact in the roster. Both OpenPGP \
key and account are optional. If you want to set only \'account\', whitout \
\'pgp key\', just set \'pgp key\' to \'\'.'),
[
('jid', _('jid of the contact that will receive the \
message'), True),
(_('message'), _('message contents'), True),
(_('pgp key'), _('if specified, the message will be \
encrypted using this pulic key'), False),
(_('account'), _('if specified, the message will be \
sent using this account'), False),
]
],
'contact_info': [
_('Get detailed info on a contact'),
[
('jid', _('jid of the contact'), True)
]
]
}
if self.argv_len < 2 or \
sys.argv[1] not in self.commands.keys(): # no args or bad args
self.send_error(self.compose_help())
self.command = sys.argv[1]
if self.command == 'help':
if self.argv_len == 3:
print self.help_on_command(sys.argv[2])
else:
print self.compose_help()
sys.exit()
self.init_connection()
self.check_arguments()
if self.command == 'contact_info':
if self.argv_len < 3:
self.send_error(_('Missing argument "contact_jid"'))
try:
id = self.sbus.add_signal_receiver(self.show_vcard_info,
'VcardInfo', INTERFACE, SERVICE, OBJ_PATH)
except:
self.send_error(_('Service not available'))
res = self.call_remote_method()
self.print_result(res)
if self.command == 'contact_info':
gobject.timeout_add(10000, self.gtk_quit) # wait 10 sec for response
gtk.main()
def print_result(self, res):
''' Print retrieved result to the output '''
if res is not None:
if self.command in ['open_chat', 'send_message']:
if self.command == 'send_message':
self.argv_len -= 2
if res == False:
if self.argv_len < 4:
self.send_error(_('\'%s\' is not in your roster.\n\
Please specify account for sending the message.') % sys.argv[2])
else:
self.send_error(_('You have no active account'))
elif self.command == 'list_accounts':
if type(res) == list:
for account in res:
print account
elif self.command == 'list_contacts':
for single_res in res:
accounts = self.unrepr(single_res)
for account_dict in accounts:
print self.print_info(0, account_dict)
elif res:
print res
def init_connection(self):
''' create the onnection to the session dbus,
or exit if it is not possible '''
try:
self.sbus = dbus.SessionBus()
except:
self.send_error(_('Session bus is not available.'))
if _version[1] >= 30 and _version[1] <= 42:
self.object = self.sbus.get_object(SERVICE, OBJ_PATH)
self.interface = dbus.Interface(object, INTERFACE)
elif _version[1] < 30:
self.service = self.sbus.get_service(SERVICE)
self.interface = self.service.get_object(OBJ_PATH, INTERFACE)
else:
send_error(_('Unknow dbus version: %s') % _version)
# get the function asked
self.method = eval('self.interface.'+self.command)
def make_arguments_row(self, args):
''' return arguments list. Mandatory arguments are enclosed with:
'<', '>', optional arguments - with '[', ']' '''
str = ''
for argument in args:
str += ' '
if argument[2]:
str += '<'
else:
str += '['
str += argument[0]
if argument[2]:
str += '>'
else:
str += ']'
return str
def help_on_command(self, command):
''' return help message for a given command '''
if command in self.commands:
command_props = self.commands[command]
arguments_str = self.make_arguments_row(command_props[1])
str = _('Usage: %s %s %s \n\t') % (BASENAME, command,
arguments_str)
str += command_props[0] + '\n\nArguments:\n'
for argument in command_props[1]:
str += ' ' + argument[0] + ' - ' + argument[1] + '\n'
return str
self.send_error(_('%s not found') % command)
def compose_help(self):
''' print usage, and list available commands '''
str = _('Usage: %s command [arguments]\nCommand is one of:\n' ) % BASENAME
for command in self.commands.keys():
str += ' ' + command
for argument in self.commands[command][1]:
str += ' '
if argument[2]:
str += '<'
else:
str += '['
str += argument[0]
if argument[2]:
str += '>'
else:
str += ']'
str += '\n'
return str
def print_info(self, level, prop_dict):
''' return formated string from serialized vcard data '''
if prop_dict is None or type(prop_dict) \
not in [dict, list, tuple]:
return ''
ret_str = ''
if type(prop_dict) in [list, tuple]:
ret_str = ''
spacing = ' ' * level * 4
for val in prop_dict:
if val is None:
ret_str +='\t'
elif type(val) == unicode or type(val) == int or \
type(val) == str:
ret_str +='\t' + str(val)
elif type(val) == list or type(val) == tuple:
res = ''
for items in val:
res += self.print_info(level+1, items)
if res != '':
ret_str += '\t' + res
ret_str = '%s(%s)\n' % (spacing, ret_str[1:])
elif type(prop_dict) is dict:
for key in prop_dict.keys():
val = prop_dict[key]
spacing = ' ' * level * 4
if type(val) == unicode or type(val) == int or \
type(val) == str:
if val is not None:
val = val.strip()
ret_str += '%s%-10s: %s\n' % (spacing, key, val)
elif type(val) == list or type(val) == tuple:
res = ''
for items in val:
res += self.print_info(level+1, items)
if res != '':
ret_str += '%s%s: \n%s' % (spacing, key, res)
elif type(val) == dict:
res = self.print_info(level+1, val)
if res != '':
ret_str += '%s%s: \n%s' % (spacing, key, res)
else:
self.send_warning(_('Unknown type %s ') % type(val))
return ret_str
def unrepr(self, serialized_data):
''' works the same as eval, but only for structural values,
not functions! e.g. dicts, lists, strings, tuples '''
if not serialized_data:
return (None, '')
value = serialized_data.strip()
first_char = value[0]
is_unicode = False
is_int = False
if first_char == 'u':
is_unicode = True
value = value[1:]
first_char = value[0]
elif '0123456789.'.find(first_char) != -1:
is_int = True
_str = first_char
if first_char == '.':
is_float = True
else:
is_float = False
for i in range(len(value) - 1):
chr = value[i+1]
if chr == '.':
is_float = True
elif '0123456789'.find(chr) == -1:
break
_str += chr
if is_float:
return (float(_str), value[len(_str):])
else:
return (int(_str), value[len(_str):])
elif first_char == 'N':
if value[1:4] == 'one':
return (None, value[4:])
else:
return (None, '')
if first_char == "'" or first_char == '"': # handle strings and unicode
if len(value) < 2:
return ('',value[1:])
_str = ''
previous_slash = False
for i in range(len(value) - 1):
chr = value[i+1]
if previous_slash:
previous_slash = False
if chr == '\\':
_str += '\\'
elif chr == 'n':
_str += '\n'
elif chr == 't':
_str += '\t'
elif chr == 'r':
_str += '\r'
elif chr == 'b':
_str += '\b'
elif chr == '\'':
_str += '\''
elif chr == '\"':
_str += '\"'
elif chr == 'u' and is_unicode:
_str += '\\u'
elif chr == first_char:
break
elif chr == '\\':
previous_slash = True
else:
_str += chr
substr_len = len(_str)+2
if is_unicode and _str:
_str = _str.decode('unicode-escape').encode('utf-8')
return (_str, value[substr_len :])
elif first_char == '{': # dict
_dict = {}
while True:
if value[1] == '}':
break
key, next = self.unrepr(value[1:])
if type(key) != str and type(key) != unicode:
send_error('Wrong string: %s' % value)
next = next.strip()
if not next or next[0] != ':':
send_error('Wrong string: %s' % (value))
val, next = self.unrepr(next[1:])
_dict[key] = val
next = next.strip()
if not next:
break
if next[0] == ',':
value = next
elif next[0] == '}':
break
else:
break
return (_dict, next[1:])
elif first_char in ['[', '(']: # return list
_tuple = []
while True:
if value[1] == ']':
break
val, next = self.unrepr(value[1:])
next = next.strip()
if not next:
send_error('Wrong string: %s' % val)
_tuple.append(val)
next = next.strip()
if not next:
break
if next[0] == ',':
value = next
elif next[0] in [']', ')']:
break
return (_tuple, next[1:])
def show_vcard_info(self, *args, **keyword):
''' write user vcart in a formated output '''
props_dict = None
if _version[1] >= 30:
props_dict = self.unrepr(args[0])
else:
if args and len(args) >= 5:
props_dict = self.unrepr(args[4].get_args_list()[0])
if props_dict:
print self.print_info(0,props_dict[0])
# remove_signal_receiver is broken in lower versions (< 0.35),
# so we leave the leak - nothing can be done
if _version[1] >= 41:
self.sbus.remove_signal_receiver(show_vcard_info, 'VcardInfo',
INTERFACE, SERVICE, OBJ_PATH)
gtk.main_quit()
def check_arguments(self):
''' Make check if all necessary arguments are given '''
argv_len = self.argv_len - 2
args = self.commands[self.command][1]
if len(args) > argv_len:
if args[argv_len][2]:
self.send_error(_('Argument "%s" is not specified. \n\
Type "%s help %s" for more info') % (args[argv_len][0], BASENAME, self.command))
def gtk_quit(self):
if _version[1] >= 41:
self.sbus.remove_signal_receiver(show_vcard_info, 'VcardInfo',
INTERFACE, SERVICE, OBJ_PATH)
gtk.main_quit()
# FIXME - didn't find more clever way for the below method.
# method(sys.argv[2:]) doesn't work, cos sys.argv[2:] is a tuple
def call_remote_method(self):
''' calls self.method with arguments from sys.argv[2:] '''
try:
if self.argv_len == 2:
res = self.method()
elif self.argv_len == 3:
res = self.method(sys.argv[2])
elif self.argv_len == 4:
res = self.method(sys.argv[2], sys.argv[3])
elif self.argv_len == 5:
res = self.method(sys.argv[2], sys.argv[3], sys.argv[4])
elif argv_len == 6:
res = self.method(sys.argv[2], sys.argv[3], sys.argv[4],
sys.argv[5])
return res
except:
self.send_error(_('Service not available'))
return None
def send_error(self, error_message):
''' Writes error message to stderr and exits '''
sys.stderr.write(error_message + '\n')
sys.stderr.flush()
sys.exit(1)
if __name__ == '__main__':
GajimRemote()