#!/usr/bin/env python # # Patchwork command line client # Copyright (C) 2008 Nate Case # # This file is part of the Patchwork package. # # Patchwork is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Patchwork is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Patchwork; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os import sys import xmlrpclib import argparse import string import tempfile import subprocess import base64 import ConfigParser import shutil import re # Default Patchwork remote XML-RPC server URL # This script will check the PW_XMLRPC_URL environment variable # for the URL to access. If that is unspecified, it will fallback to # the hardcoded default value specified here. DEFAULT_URL = "http://patchwork/xmlrpc/" CONFIG_FILE = os.path.expanduser('~/.pwclientrc') class Filter: """Filter for selecting patches.""" def __init__(self): # These fields refer to specific objects, so they are special # because we have to resolve them to IDs before passing the # filter to the server self.state = "" self.project = "" # The dictionary that gets passed to via XML-RPC self.d = {} def add(self, field, value): if field == 'state': self.state = value elif field == 'project': self.project = value else: # OK to add directly self.d[field] = value def resolve_ids(self, rpc): """Resolve State, Project, and Person IDs based on filter strings.""" if self.state != "": id = state_id_by_name(rpc, self.state) if id == 0: sys.stderr.write("Note: No State found matching %s*, " \ "ignoring filter\n" % self.state) else: self.d['state_id'] = id if self.project != None: id = project_id_by_name(rpc, self.project) if id == 0: sys.stderr.write("Note: No Project found matching %s, " \ "ignoring filter\n" % self.project) else: self.d['project_id'] = id def __str__(self): """Return human-readable description of the filter.""" return str(self.d) class BasicHTTPAuthTransport(xmlrpclib.SafeTransport): def __init__(self, username = None, password = None, use_https = False): self.username = username self.password = password self.use_https = use_https xmlrpclib.SafeTransport.__init__(self) def authenticated(self): return self.username != None and self.password != None def send_host(self, connection, host): xmlrpclib.Transport.send_host(self, connection, host) if not self.authenticated(): return credentials = '%s:%s' % (self.username, self.password) auth = 'Basic ' + base64.encodestring(credentials).strip() connection.putheader('Authorization', auth) def make_connection(self, host): if self.use_https: fn = xmlrpclib.SafeTransport.make_connection else: fn = xmlrpclib.Transport.make_connection return fn(self, host) def project_id_by_name(rpc, linkname): """Given a project short name, look up the Project ID.""" if len(linkname) == 0: return 0 projects = rpc.project_list(linkname, 0) for project in projects: if project['linkname'] == linkname: return project['id'] return 0 def state_id_by_name(rpc, name): """Given a partial state name, look up the state ID.""" if len(name) == 0: return 0 states = rpc.state_list(name, 0) for state in states: if state['name'].lower().startswith(name.lower()): return state['id'] return 0 def person_ids_by_name(rpc, name): """Given a partial name or email address, return a list of the person IDs that match.""" if len(name) == 0: return [] people = rpc.person_list(name, 0) return map(lambda x: x['id'], people) def list_patches(patches, format_str=None): """Dump a list of patches to stdout.""" if format_str: format_field_re = re.compile("%{([a-z0-9_]+)}") def patch_field(matchobj): fieldname = matchobj.group(1) if fieldname == "_msgid_": # naive way to strip < and > from message-id val = string.strip(str(patch["msgid"]), "<>") else: val = str(patch[fieldname]) return val for patch in patches: print(format_field_re.sub(patch_field, format_str)) else: print("%-7s %-12s %s" % ("ID", "State", "Name")) print("%-7s %-12s %s" % ("--", "-----", "----")) for patch in patches: print("%-7d %-12s %s" % (patch['id'], patch['state'], patch['name'])) def action_list(rpc, filter, submitter_str, delegate_str, format_str=None): filter.resolve_ids(rpc) if submitter_str != None: ids = person_ids_by_name(rpc, submitter_str) if len(ids) == 0: sys.stderr.write("Note: Nobody found matching *%s*\n" % \ submitter_str) else: for id in ids: person = rpc.person_get(id) print "Patches submitted by %s <%s>:" % \ (unicode(person['name']).encode("utf-8"), \ unicode(person['email']).encode("utf-8")) f = filter f.add("submitter_id", id) patches = rpc.patch_list(f.d) list_patches(patches, format_str) return if delegate_str != None: ids = person_ids_by_name(rpc, delegate_str) if len(ids) == 0: sys.stderr.write("Note: Nobody found matching *%s*\n" % \ delegate_str) else: for id in ids: person = rpc.person_get(id) print "Patches delegated to %s <%s>:" % \ (person['name'], person['email']) f = filter f.add("delegate_id", id) patches = rpc.patch_list(f.d) list_patches(patches, format_str) return patches = rpc.patch_list(filter.d) list_patches(patches, format_str) def action_projects(rpc): projects = rpc.project_list("", 0) print("%-5s %-24s %s" % ("ID", "Name", "Description")) print("%-5s %-24s %s" % ("--", "----", "-----------")) for project in projects: print("%-5d %-24s %s" % (project['id'], \ project['linkname'], \ project['name'])) def action_states(rpc): states = rpc.state_list("", 0) print("%-5s %s" % ("ID", "Name")) print("%-5s %s" % ("--", "----")) for state in states: print("%-5d %s" % (state['id'], state['name'])) def action_info(rpc, patch_id): patch = rpc.patch_get(patch_id) s = "Information for patch id %d" % (patch_id) print(s) print('-' * len(s)) for key, value in sorted(patch.iteritems()): print("- %- 14s: %s" % (key, unicode(value).encode("utf-8"))) def action_get(rpc, patch_id): patch = rpc.patch_get(patch_id) s = rpc.patch_get_mbox(patch_id) if patch == {} or len(s) == 0: sys.stderr.write("Unable to get patch %d\n" % patch_id) sys.exit(1) base_fname = fname = os.path.basename(patch['filename']) i = 0 while os.path.exists(fname): fname = "%s.%d" % (base_fname, i) i += 1 try: f = open(fname, "w") except: sys.stderr.write("Unable to open %s for writing\n" % fname) sys.exit(1) try: f.write(unicode(s).encode("utf-8")) f.close() print "Saved patch to %s" % fname except: sys.stderr.write("Failed to write to %s\n" % fname) sys.exit(1) def action_apply(rpc, patch_id, apply_cmd=None): patch = rpc.patch_get(patch_id) if patch == {}: sys.stderr.write("Error getting information on patch ID %d\n" % \ patch_id) sys.exit(1) if apply_cmd is None: print "Applying patch #%d to current directory" % patch_id apply_cmd = ['patch', '-p1'] else: print "Applying patch #%d using %s" % ( patch_id, repr(' '.join(apply_cmd))) print "Description: %s" % patch['name'] s = rpc.patch_get_mbox(patch_id) if len(s) > 0: proc = subprocess.Popen(apply_cmd, stdin = subprocess.PIPE) proc.communicate(unicode(s).encode('utf-8')) else: sys.stderr.write("Error: No patch content found\n") sys.exit(1) def action_update_patch(rpc, patch_id, state = None, commit = None): patch = rpc.patch_get(patch_id) if patch == {}: sys.stderr.write("Error getting information on patch ID %d\n" % \ patch_id) sys.exit(1) params = {} if state: state_id = state_id_by_name(rpc, state) if state_id == 0: sys.stderr.write("Error: No State found matching %s*\n" % state) sys.exit(1) params['state'] = state_id if commit: params['commit_ref'] = commit success = False try: success = rpc.patch_set(patch_id, params) except xmlrpclib.Fault, f: sys.stderr.write("Error updating patch: %s\n" % f.faultString) if not success: sys.stderr.write("Patch not updated\n") def patch_id_from_hash(rpc, project, hash): try: patch = rpc.patch_get_by_project_hash(project, hash) except xmlrpclib.Fault: # the server may not have the newer patch_get_by_project_hash function, # so fall back to hash-only. patch = rpc.patch_get_by_hash(hash) if patch == {}: sys.stderr.write("No patch has the hash provided\n") sys.exit(1) patch_id = patch['id'] # be super paranoid try: patch_id = int(patch_id) except: sys.stderr.write("Invalid patch ID obtained from server\n") sys.exit(1) return patch_id auth_actions = ['update'] # unfortunately we currently have to revert to this ugly hack.. class _RecursiveHelpAction(argparse._HelpAction): def __call__(self, parser, namespace, values, option_string=None): parser.print_help() print subparsers_actions = [ action for action in parser._actions if isinstance(action, argparse._SubParsersAction) ] hash_n_id_actions = set(['hash', 'id', 'help']) for subparsers_action in subparsers_actions: for choice, subparser in subparsers_action.choices.items(): # gross but the whole thing is.. if (len(subparser._actions) == 3 \ and set([a.dest for a in subparser._actions]) \ == hash_n_id_actions) \ or len(subparser._actions) == 0: continue print("command '{}'".format(choice)) print(subparser.format_help()) parser.exit() def main(): hash_parser = argparse.ArgumentParser(add_help=False, version=False) hash_parser.add_argument( '-h', metavar='HASH', dest='hash', action='store', help='''Lookup by patch hash''' ) hash_parser.add_argument( 'id', metavar='ID', nargs='*', action='store', type=int, help='Patch ID', ) filter_parser = argparse.ArgumentParser(add_help=False, version=False) filter_parser.add_argument( '-s', metavar='STATE', help='''Filter by patch state (e.g., 'New', 'Accepted', etc.)''' ) filter_parser.add_argument( '-p', metavar='PROJECT', help='''Filter by project name (see 'projects' for list)''' ) filter_parser.add_argument( '-w', metavar='WHO', help='''Filter by submitter (name, e-mail substring search)''' ) filter_parser.add_argument( '-d', metavar='WHO', help='''Filter by delegate (name, e-mail substring search)''' ) filter_parser.add_argument( '-n', metavar='MAX#', type=int, help='''Restrict number of results''' ) filter_parser.add_argument( '-m', metavar='MESSAGEID', help='''Filter by Message-Id''' ) filter_parser.add_argument( '-f', metavar='FORMAT', help='''Print output in the given format. You can use tags matching ''' '''fields, e.g. %%{id}, %%{state}, or %%{msgid}.''' ) filter_parser.add_argument( 'patch_name', metavar='STR', nargs='?', help='substring to search for patches by name', ) help_parser = argparse.ArgumentParser(add_help=False, version=False) help_parser.add_argument( '--help', action='help', help=argparse.SUPPRESS, #help='''show this help message and exit''' ) action_parser = argparse.ArgumentParser( prog='pwclient', add_help=False, version=False, formatter_class=argparse.RawDescriptionHelpFormatter, epilog='''(apply | get | info | view | update) (-h HASH | ID [ID ...])''', ) action_parser.add_argument( '--help', #action='help', action=_RecursiveHelpAction, help='''Print this help text''' ) subparsers = action_parser.add_subparsers( title='Commands', metavar='' ) apply_parser = subparsers.add_parser( 'apply', parents=[hash_parser, help_parser], add_help=False, help='''Apply a patch (in the current dir, using -p1)''' ) apply_parser.set_defaults(subcmd='apply') git_am_parser = subparsers.add_parser( 'git-am', parents=[hash_parser, help_parser], add_help=False, help='''Apply a patch to current git branch using "git am".''' ) git_am_parser.set_defaults(subcmd='git_am') git_am_parser.add_argument( '-s', '--signoff', action='store_true', help='''pass --signoff to git-am''' ) get_parser = subparsers.add_parser( 'get', parents=[hash_parser, help_parser], add_help=False, help='''Download a patch and save it locally''' ) get_parser.set_defaults(subcmd='get') info_parser = subparsers.add_parser( 'info', parents=[hash_parser, help_parser], add_help=False, help='''Display patchwork info about a given patch ID''' ) info_parser.set_defaults(subcmd='info') projects_parser = subparsers.add_parser( 'projects', add_help=False, help='''List all projects''' ) projects_parser.set_defaults(subcmd='projects') states_parser = subparsers.add_parser( 'states', add_help=False, help='''Show list of potential patch states''' ) states_parser.set_defaults(subcmd='states') view_parser = subparsers.add_parser( 'view', parents=[hash_parser, help_parser], add_help=False, help='''View a patch''' ) view_parser.set_defaults(subcmd='view') update_parser = subparsers.add_parser( 'update', parents=[hash_parser, help_parser], add_help=False, help='''Update patch''', epilog='''Using a COMMIT-REF allows for only one ID to be specified''', ) update_parser.add_argument( '-c', metavar='COMMIT-REF', help='''commit reference hash''' ) update_parser.add_argument( '-s', metavar='STATE', required=True, help='''Set patch state (e.g., 'Accepted', 'Superseded' etc.)''' ) update_parser.set_defaults(subcmd='update') list_parser = subparsers.add_parser("list", add_help=False, #aliases=['search'], parents=[filter_parser, help_parser], help='''List patches, using the optional filters specified below and an optional substring to search for patches by name''' ) list_parser.set_defaults(subcmd='list') search_parser = subparsers.add_parser("search", add_help=False, parents=[filter_parser, help_parser], help='''Alias for "list"''' ) # Poor man's argparse aliases: # We register the "search" parser but effectively use "list" for the # help-text. search_parser.set_defaults(subcmd='list') if len(sys.argv) < 2: action_parser.print_help() sys.exit(0) args = action_parser.parse_args() args = dict(vars(args)) action = args.get('subcmd') if args.get('hash') and len(args.get('id')): # mimic mutual exclusive group sys.stderr.write("Error: [-h HASH] and [ID [ID ...]] " + "are mutually exlusive\n") locals()[action + '_parser'].print_help() sys.exit(1) # set defaults filt = Filter() commit_str = None url = DEFAULT_URL state_str = args.get('s') project_str = args.get('p') submitter_str = args.get('w') delegate_str = args.get('d') format_str = args.get('f') hash_str = args.get('hash') patch_ids = args.get('id') msgid_str = args.get('m') if args.get('c'): # update multiple IDs with a single commit-hash does not make sense if action == 'update' and patch_ids and len(patch_ids) > 1: sys.stderr.write( "Declining update with COMMIT-REF on multiple IDs\n" ) update_parser.print_help() sys.exit(1) commit_str = args.get('c') if args.get('n') != None: try: filt.add("max_count", args.get('n')) except: sys.stderr.write("Invalid maximum count '%s'\n" % args.get('n')) action_parser.print_help() sys.exit(1) do_signoff = args.get('signoff') # grab settings from config files config = ConfigParser.ConfigParser() config.read([CONFIG_FILE]) if not config.has_section('options'): sys.stderr.write('~/.pwclientrc is in the old format. Migrating it...') old_project = config.get('base','project') new_config = ConfigParser.ConfigParser() new_config.add_section('options') new_config.set('options','default',old_project) new_config.add_section(old_project) new_config.set(old_project,'url',config.get('base','url')) if config.has_option('auth', 'username'): new_config.set(old_project,'username',config.get('auth','username')) if config.has_option('auth', 'password'): new_config.set(old_project,'password',config.get('auth','password')) old_config_file = CONFIG_FILE + '.orig' shutil.copy2(CONFIG_FILE,old_config_file) with open(CONFIG_FILE, 'wb') as fd: new_config.write(fd) sys.stderr.write(' Done.\n') sys.stderr.write('Your old ~/.pwclientrc was saved to %s\n' % old_config_file) sys.stderr.write('and was converted to the new format. You may want to\n') sys.stderr.write('inspect it before continuing.\n') sys.exit(1) if not project_str: try: project_str = config.get('options', 'default') except: sys.stderr.write("No default project configured in ~/.pwclientrc\n") action_parser.print_help() sys.exit(1) if not config.has_section(project_str): sys.stderr.write("No section for project %s\n" % project_str) sys.exit(1) if not config.has_option(project_str, 'url'): sys.stderr.write("No URL for project %s\n" % project_str) sys.exit(1) if not do_signoff and config.has_option('options', 'signoff'): do_signoff = config.getboolean('options', 'signoff') if not do_signoff and config.has_option(project_str, 'signoff'): do_signoff = config.getboolean(project_str, 'signoff') url = config.get(project_str, 'url') (username, password) = (None, None) transport = None if action in auth_actions: if config.has_option(project_str, 'username') and \ config.has_option(project_str, 'password'): use_https = url.startswith('https') transport = BasicHTTPAuthTransport( \ config.get(project_str, 'username'), config.get(project_str, 'password'), use_https) else: sys.stderr.write(("The %s action requires authentication, " "but no username or password\nis configured\n") % action) sys.exit(1) if project_str: filt.add("project", project_str) if state_str: filt.add("state", state_str) if msgid_str: filt.add("msgid", msgid_str) try: rpc = xmlrpclib.Server(url, transport = transport) except: sys.stderr.write("Unable to connect to %s\n" % url) sys.exit(1) # It should be safe to assume hash_str is not zero, but who knows.. if hash_str != None: patch_ids = [patch_id_from_hash(rpc, project_str, hash_str)] # helper for non_empty() to print correct helptext h = locals()[action + '_parser'] # Require either hash_str or IDs for def non_empty(h, patch_ids): """Error out if no patch IDs were specified""" if patch_ids == None or len(patch_ids) < 1: sys.stderr.write("Error: Missing Argument! " + "Either [-h HASH] or [ID [ID ...]] are required\n") if h: h.print_help() sys.exit(1) return patch_ids if action == 'list' or action == 'search': if args.get('patch_name') != None: filt.add("name__icontains", args.get('patch_name')) action_list(rpc, filt, submitter_str, delegate_str, format_str) elif action.startswith('project'): action_projects(rpc) elif action.startswith('state'): action_states(rpc) elif action == 'view': for patch_id in non_empty(h, patch_ids): s = rpc.patch_get_mbox(patch_id) if len(s) > 0: print unicode(s).encode("utf-8") elif action == 'info': for patch_id in non_empty(h, patch_ids): action_info(rpc, patch_id) elif action == 'get': for patch_id in non_empty(h, patch_ids): action_get(rpc, patch_id) elif action == 'apply': for patch_id in non_empty(h, patch_ids): action_apply(rpc, patch_id) elif action == 'git_am': cmd = ['git', 'am'] if do_signoff: cmd.append('-s') for patch_id in non_empty(h, patch_ids): action_apply(rpc, patch_id, cmd) elif action == 'update': for patch_id in non_empty(h, patch_ids): action_update_patch(rpc, patch_id, state = state_str, commit = commit_str ) else: sys.stderr.write("Unknown action '%s'\n" % action) action_parser.print_help() sys.exit(1) if __name__ == "__main__": main()