#!/usr/bin/python '''\ Twitter OAuth template application Copyright (c) 2011 jc.unternet.net licensed under GPL Do-nothing webapp which uses Twitter OAuth for login. Uses cookies if available, only over https (or when testing on localnet), due to possibility of hijacking cookies over unsecured connection.''' import sys, os, urllib, urllib2, time, httplib import hmac, hashlib, random, re, base64, cgi, cgitb import json, syslog, netrc, urlparse, pwd, Cookie import struct, socket, pwd if not sys.stdin.isatty(): cgitb.enable if not hasattr(json, 'loads'): json.loads = json.reads README = 'put consumer key and secret as login and password in .netrc file' SCRIPT_NAME = urlparse.urlsplit(os.getenv('REQUEST_URI') or '/').path APIHOST = 'api.twitter.com' SERVER_PORT = os.getenv('SERVER_PORT') or '' SCHEME = 'http' if SERVER_PORT in ['80', ''] else 'https' REMOTE_ADDR = os.getenv('REMOTE_ADDR') or '0.0.0.0' SSL_SESSION_ID = os.getenv('SSL_SESSION_ID') or '' HTTP_COOKIE = os.getenv('HTTP_COOKIE') or '' COOKIE = Cookie.SimpleCookie() COOKIE.load(HTTP_COOKIE) BASEURL = '%s://%s/' % (SCHEME, os.getenv('SERVER_NAME') or 'localhost') SCRIPT_URL = urlparse.urljoin(BASEURL, SCRIPT_NAME) DAY = 24 * 60 * 60 # a day in seconds TEMP_PATH = '/tmp/twitter_oauth' IMAGE_URLS = {'http': 'profile_image_url', 'https': 'profile_image_url_https'} IMAGE_URL = IMAGE_URLS[SCHEME] STATUS = [] LONGEVITY = DAY # session expiration if not os.getenv('HOME'): # needed for netrc module to work # also needed to store member data, not safe in webspace os.environ['HOME'] = pwd.getpwuid(os.geteuid()).pw_dir try: NETRC = netrc.netrc() CONSUMER_KEY, ignored, CONSUMER_SECRET = NETRC.authenticators(APIHOST) except: CONSUMER_KEY, NETRC, CONSUMER_SECRET = 'anonymous', None, 'anonymous' log('netrc file not set up properly, see README') OAUTH_CALLBACK = os.getenv('OAUTH_CALLBACK') or SCRIPT_URL PARAMETERS = { 'oauth_consumer_key': os.getenv('OAUTH_CONSUMER_KEY') or CONSUMER_KEY, 'oauth_signature_method': 'HMAC-SHA1', 'oauth_signature': '', 'oauth_timestamp': os.getenv('OAUTH_TIMESTAMP') or '', 'oauth_nonce': os.getenv('OAUTH_NONCE') or '', 'oauth_version': '1.0', 'oauth_callback': OAUTH_CALLBACK, 'oauth_token': os.getenv('OAUTH_TOKEN') or '', 'oauth_verifier': os.getenv('OAUTH_VERIFIER') or '', } SECRETS = { 'oauth_consumer_secret': os.getenv('OAUTH_CONSUMER_SECRET') or CONSUMER_SECRET, 'oauth_token_secret': os.getenv('OAUTH_TOKEN_SECRET') or '', } DEBUGLEVEL = int(os.getenv('DEBUGLEVEL') or 0) REQUEST_METHOD = (os.getenv('REQUEST_METHOD') or 'GET').upper() INPUT = cgi.FieldStorage() STYLE = """ """ HEADER = """
' + html + '
' def display_login_denied(): 'user indicated to Twitter that ey wants to cancel' denied = tmpfile(INPUT['denied']) try: os.unlink(denied) except: log('could not unlink "%s"' % denied) status('Twitter login was denied') show_page() def save_keys(tokens): storage = open('/tmp/%s.%s' % (os.getenv('REMOTE_ADDR'), tokens['oauth_token']), 'w') chgrp(storage.name, 'www-data') print >>storage, tokens['oauth_token_secret'] storage.close() def tmpfile(token): return '/tmp/%s.%s' % (os.getenv('REMOTE_ADDR'), token) def load_keys(token): PARAMETERS['oauth_token'] = token try: storage = open(tmpfile(token)) SECRETS['oauth_token_secret'] = storage.readline().strip() except Exception, error: log('could not find token %s: %s' % (token, repr(error))) return False storage.close() try: os.unlink(storage.name) except Exception, error: log('could not remove %s: %s' % (storage.name, repr(error))) return True def get_credentials(): url = 'https://api.twitter.com/1/account/verify_credentials.json' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = SECRETS['oauth_token_secret'] PARAMETERS['oauth_callback'] = '' # zero this out before signature PARAMETERS['oauth_verifier'] = '' PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('GET', url, parameters('signing'))))) request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) log(request.get_header('Authorization')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request) reply = request.read() request.close() return json.loads(reply) def get_access_token(): url = 'https://api.twitter.com/oauth/access_token' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = SECRETS['oauth_token_secret'] PARAMETERS['oauth_callback'] = '' # zero this out before signature PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('POST', url, parameters('signing'))))) if secret == 'L8qq9PZyRg6ieKGEKhZolGC0vJWLw8iEJ88DRdyOg': # testing return {'oauth_signature': PARAMETERS['oauth_signature']} request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request, urllib.urlencode({'oauth_verifier': PARAMETERS['oauth_verifier']})) reply = request.read() request.close() return dict(cgi.parse_qsl(reply)) def authenticate(oauth_token): url = 'https://api.twitter.com/oauth/authenticate' url += '?%s=%s' % ('oauth_token', oauth_token) return 'location: %s\r\n\r\n' % url def get_request_token(): 'ask Twitter for a request token' url = 'https://api.twitter.com/oauth/request_token' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = '' # we don't have a token secret yet PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('POST', url, parameters('signing'))))) if secret == 'L8qq9PZyRg6ieKGEKhZolGC0vJWLw8iEJ88DRdyOg': # testing return {'oauth_signature': PARAMETERS['oauth_signature']} request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request, urllib.urlencode({})) reply = request.read() request.close() return dict(cgi.parse_qsl(reply)) def byte_encode(match): 'for use with re.sub' return '%%%02X' % ord(match.group()) def urlencode(string): "unreserved = ALPHA, DIGIT, '-', '.', '_', '~'" return re.sub(re.compile('[^0-9A-Za-z._~-]'), byte_encode, string.encode('utf8')) def urldecode(string): 'undo urlencode' return urllib.unquote(string).decode('utf8') def log(message): syslog.syslog(syslog.LOG_DEBUG, message) if sys.stdin.isatty() and DEBUGLEVEL: print >>sys.stderr, message def sign(secret, text): log('signature base string: "%s", secret: %s' % ( repr(text), repr(secret))) digest = hmac.new(secret, text, hashlib.sha1).digest() return base64.encodestring(digest).rstrip() def base64string(hexstring): recoded = base64.encodestring(hexstring.decode('hex')).rstrip() log('recoded: %s' % recoded) return recoded def parameters(format): if format == 'header': formatted = ', '.join(['%s="%s"' % (key, urlencode(value.encode('utf8'))) for key, value in PARAMETERS.items() if value and key not in ['oauth_verifier']]) elif format == 'signing': formatted = '&'.join(sorted(['%s=%s' % (key, urlencode(value.encode('utf8'))) for key, value in (PARAMETERS.items()) if value and key not in ['oauth_signature']])) #print >>sys.stderr, format, formatted return formatted def hmac_sha1_test(): 'from tools.ietf.org/html/rfc2202' assert sign('\x0b' * 20, 'Hi There') == base64string( 'b617318655057264e28bc0b6fb378c8ef146be00') assert sign('Jefe', 'what do ya want for nothing?') == base64string( 'effcdf6ae5eb2fa2d27416d5f184df9c259a7c79') assert sign('\xaa' * 20, '\xdd' * 50) == base64string( '125d7342b9ac11cd91a39af48aa17b4f63f175d3') # last test from http://oauth.net/core/1.0/#rfc.section.9.1.1, app. A.5.2 assert sign('kd94hf93k423kf44&pfkkdhi9sl3r4s00', 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26' + \ 'oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3D' + \ 'kllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26' + \ 'oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26' + \ 'oauth_version%3D1.0%26size%3Doriginal') == 'tR3+Ty81lMeYAr/Fid0kMTYa/WM=' return True def save_user_info(data): id = data['id_str'] if data.has_key('id_str') else data['user_id'] member_file = open_store(id) if member_file.mode != 'w': try: member_data = json.loads(member_file.read()) except: member_data = {} member_file.seek(0, 0) else: member_data = {} for key in data: if isinstance(data[key], (str, unicode)): if member_data.has_key(key) and not data[key] in member_data[key]: member_data[key].append(data[key]) else: member_data[key] = [data[key]] member_file.write(json.dumps(member_data)) member_file.truncate() member_file.close() return member_data def open_store(filename = 'this_is_a_test'): permissions = 0750 # owner all, group read+execute, everybody else none members = os.path.join(os.getenv('HOME') or '/var/tmp', 'members') if not os.path.exists(members): os.mkdir(members, permissions) parts = [] log('filename: %s' % filename) for index in range(1, len(filename) + 1): parts.append(filename[:index]) new = os.path.join(members, *parts) if not os.path.exists(new): os.mkdir(new, permissions) parts.append(filename) new = os.path.join(members, *parts) if os.path.exists(new): store = open(new, 'r+') else: store = open(new, 'w') chgrp(store.name, 'www-data') return store def get_cookie(): 'check if cookie was offered and if we have it cached' if 'session' in COOKIE: if os.path.exists(temp(COOKIE['session'].value)): status('Found a session from cookie') return read(temp(COOKIE['session'].value)).strip() else: return status('Cookie %s offered, but no session was cached' % HTTP_COOKIE) else: return status('No cookie was offered') def get_ssl_id(): 'check if we have a SSL_SESSION_ID and if a session was cached' if SSL_SESSION_ID: if os.path.exists(temp(SSL_SESSION_ID)): status('Found a session from SSL_SESSION_ID') return read(temp(SSL_SESSION_ID)) else: return status('SSL_SESSION_ID %s present but no session was cached' % SSL_SESSION_ID) else: return status('No SSL_SESSION_ID found') def get_session(userid = None): 'check if session exists and return contents of key if so' if not secure_connection(): return status('No sessions possible without SSL (unsafe)') session_key = get_cookie() or get_ssl_id() if session_key: userid, starttime, remote_addr = session_key.split(':') if time.time() - int(starttime) > LONGEVITY: userid = status('Session expired') if remote_addr != REMOTE_ADDR: userid = status('IP address changed: %s to %s' % (remote_addr, REMOTE_ADDR)) if not userid: status('No valid session found') status('''To post messages, ''') return status('''Alternatively, you can obtain a client certificate from CACert.org and sign in with your cert''') return userid def status(message, returned_status = None): 'set status and return status' STATUS.append(message) return returned_status def read(filename, returned = str): 'get file contents and close it properly' input = open(filename) data = getattr(input, ['read', 'readlines'][returned == list])() input.close() return data def readlines(filename): 'return file contents as list, closing file properly' return read(filename, returned = list) def age(filename, file_age = None): if not os.path.exists(filename): file_age = sys.maxint else: mtime = os.stat(filename).st_mtime file_age = time.time() - mtime return file_age def chgrp(filename, groupname): gid = pwd.getpwnam(groupname).pw_gid uid = os.stat(filename).st_uid os.chown(filename, uid, gid) def seed(): 'return a unique unguessable string for generating session keys' seed_path = temp('seed.dat') if age(seed_path) > DAY: seedfile = open(seed_path, 'w') chgrp(seedfile.name, 'www-data') seed = nonce() seedfile.write(seed) seedfile.close() else: seedfile = open(seed_path) seed = seedfile.read() seedfile.close() return seed def temp(filename, temp_path = TEMP_PATH): if not os.path.exists(temp_path): os.mkdir(temp_path) os.chmod(temp_path, 0770) chgrp(temp_path, 'www-data') return os.path.join(temp_path, filename) def make_session(userid): 'create a session key to use in a cookie' syslog.syslog(syslog.LOG_DEBUG, 'userid=%s' % userid) base_string = ':'.join([userid, timestamp(), REMOTE_ADDR]) session_key = sign(seed(), base_string).decode('base64').encode('hex') output = open(temp(session_key), 'w') chgrp(output.name, 'www-data') print >>output, base_string output.close() if SSL_SESSION_ID: # backup method in case cookies not enabled os.symlink(temp(session_key), temp(SSL_SESSION_ID)) return session_key def is_localnet(ipaddr = REMOTE_ADDR): address = struct.unpack('>L', socket.inet_aton(ipaddr))[0] localnet = address / (1 << 24) == 127 if not localnet: return status('Not logged in on localnet, must use SSL to login') else: return status('Bypassing SSL requirement, logged in on localnet', True) def shuffle(some_list): 'just so it does not return None as random.shuffle does' random.shuffle(some_list) return some_list def secure_connection(): 'determine if we are using SSL (TLS) or not' if SCHEME == 'https': return status('Using secure connection', True) else: return status('Insecure connection -- use SSL' % ( make_secure(SCRIPT_URL))) def certlogin(): 'called from certlogin subdirectory' os.chdir('..') return show_page() if __name__ == '__main__': command = os.path.splitext(os.path.basename(sys.argv[0]))[0] if command == 'index': command = 'twitter_oauth' # case of index.cgi log('args: %s' % sys.argv[1:]) print eval(command)(*sys.argv[1:])