User:DRBot/source/mwclient.py

From Wikimedia Commons, the free media repository
Jump to: navigation, search
import urllib, urllib2, urlparse
import cookielib, random

from htmlentitydefs import name2codepoint 
from HTMLParser import HTMLParser

import time

__ver__ = '0.3.1u'

class MediawikiError(StandardError):
    pass
class Request(urllib2.Request):
    def __init__(self, url, data=None, headers={},
        origin_req_host=None, unverifiable=False):
        urllib2.Request.__init__(self, url, data, headers, origin_req_host, unverifiable)
        
        self.add_header('User-Agent', 'MwClient-' + __ver__)
        
class PostRequest(Request):
    def __init__(self, url, data = None):
        Request.__init__(self, url)
        self.add_header('Content-type', 'application/x-www-form-urlencoded; charset=UTF-8')
        self.add_data(data)
        
    def add_data(self, data):
        if data == None:
            return
        if type(data) == str:
            return Request.add_data(self, data)
        raw = []
        for k, v in data.iteritems():
            raw.append(urllib.quote(k) + '=' + urllib.quote(v))
        return Request.add_data(self, '&'.join(raw))
        
def Open(url):
    return urllib2.urlopen(Request(url))
    
def encode_multipart(fields, files):
    boundary = '----%s----' % ''.join((random.choice('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789') for i in xrange(32)))
    res = []
    for k, v in fields:
        res.append('--' + boundary) 
        res.append('Content-Disposition: form-data; name="%s"' % k)
        res.append('')
        res.append(v.encode('utf-8'))
    for k, v, f in files:
        res.append('--' + boundary) 
        res.append('Content-Disposition: form-data; name="%s"; filename="%s"' % (k, v.encode('utf-8')))
        res.append('Content-Type: application/octet-stream')
        res.append('')
        res.append(f)
    res.append('--%s--' % boundary)
    res.append('')
    return boundary, '\r\n'.join(res)
    
class Session(object):
    def __init__(self, baseuri = None, wikicode = None, username = None, password = None):
        self.base = baseuri
        self.wikicode = wikicode
        self.username = username
        self.password = password
        
        self.cookie = cookielib.CookieJar()
        self.login()
    setoptions = __init__
    
    def login(self):
        req = Request(self.base + '?title=Special:Userlogin')
        self.cookie.extract_cookies(urllib2.urlopen(req), req)
        
        req = PostRequest(self.base + '?title=Special:Userlogin&action=submitlogin&type=login')
        req.add_data({ \
            'wpName'        : self.username,
            'wpPassword'    : self.password,
            'wpRemember'    : '1',
            'wpLoginattempt': 'Log in',
            })
        self.cookie.add_cookie_header(req)
        res = urllib2.urlopen(req)
        self.cookie.extract_cookies(res, req)
        
        if not self.checklogin():
            raise MediawikiError, 'Login failed!'
            
        
    def checklogin(self):
        return sum((c.name == self.wikicode + 'UserID' for c in self.cookie))
        
    def open(self, title, raw = False):
        if raw:
            req = Request(title)
        else:
            req = Request(self.base + '?title=' + title)
        self.cookie.add_cookie_header(req)
        try:
            res = urllib2.urlopen(req)
        except urllib2.HTTPError, e:
            if e.code in (500, 502, 503, 504):
                time.sleep(10)
                res = urllib2.urlopen(req)
            else:
                raise
        self.cookie.extract_cookies(res, req)
        if not self.checklogin():
            self.login()
            return self.open(title)
        return res
        
    def post_raw(self, action, data):
        req = PostRequest('://'.join(urlparse.urlparse(self.base)[:2]) + action, data)
        self.cookie.add_cookie_header(req)
        res = urllib2.urlopen(req)
        self.cookie.extract_cookies(res, req)
        if not self.checklogin():
            self.login()
            return self.post_raw(action, data)
        return res
        
    def upload(self, fo, filename, description, license = '', ignore = False):
        post = {}
        post['wpDestFile'] = filename
        post['wpUploadDescription'] = description
        post['wpLicense'] = license
        if ignore: post['wpIgnoreWarning'] = 'true'
        post['wpUpload'] = 'Upload file'
        post['wpSourceType'] = 'file'
        boundary, data = encode_multipart(post.iteritems(), (('wpUploadFile', filename, fo.read()),))
        
        req = PostRequest(self.base + '?title=Special:Upload', data)
        req.add_header('Content-Type', 'multipart/form-data; boundary=' + boundary)
        self.cookie.add_cookie_header(req)
        res = urllib2.urlopen(req)
        self.cookie.extract_cookies(res, req)
        if not self.checklogin():
            self.login()
            fo.seek(0)
            return self.upload(fo, filename, description, license, ignore)
        return res
    
class Page(HTMLParser):
    def __init__(self, url = None, session = None, section = ''):
        HTMLParser.__init__(self)
        self.in_form = False
        self.in_text = False
        self.data = {}
        self.textdata = []
        self.session = session
        self.section = section
        
        if session and url:
            u = session.open(urllib.quote(url.encode('utf-8')) + '&action=edit&section=' + section)
        else:
            u = Open(url)
        if url:
            self.raw = u.read().decode('utf-8', 'ignore')
            self.feed(self.raw)
        
    def handle_starttag(self, tag, attrs):
        if tag == 'form' and (u'id', u'editform') in attrs:
            attrs = dict(attrs)
            self.in_form = True
            self.action = attrs['action']
            
        if tag == 'input' and self.in_form and (u'type', u'submit') not in attrs:
            attrs = dict(attrs)
            if u'name' in attrs: self.data[attrs[u'name']] = attrs.get(u'value', u'')
            
        self.in_text = self.in_form and tag == 'textarea'
            
    def handle_endtag(self, tag):
        if self.in_form and tag == 'form': self.in_form = False
        self.in_text = self.in_text and tag == 'textarea'
            
    def handle_data(self, data):
        if self.in_text: self.textdata.append(data)
    def handle_entityref(self, name):
        if name in name2codepoint: 
            self.handle_data(unichr(name2codepoint[name]))
        else:
            self.handle_data(u'&%s;' % name)
    def handle_charref(name):
        try:
            self.handle_data(unichr(int(name)))
        except ValueError:
            self.handle_data(u'&#$s;' % name)
    
    def __str__(self):
        return u''.join(self.textdata)
            
    def edit(self, data, summary = u''):
        self.data['wpTextbox1'] = data
        self.data['wpSummary'] = summary
        self.data['wpSave'] = 'Save page'
        e = self.action.encode('utf-8') + '&section=' + self.section, '&'.join((urllib.quote(k.encode('utf-8')) + '=' + urllib.quote(v.encode('utf-8')) \
            for k, v in self.data.iteritems()))# if v))
        if self.session:
            return self.session.post_raw(*e), e
        else:
            return e
    
def log(data):
    pass