#!/usr/bin/python '''\ Twitter OAuth template application Copyright (c) 2011 jc.unternet.net licensed under GPL Do-nothing webapp which uses Twitter OAuth for login. Uses cookies if available, only over https (or when testing on localnet), due to possibility of hijacking cookies over unsecured connection.''' import sys, os, urllib, urllib2, time, httplib import hmac, hashlib, random, re, base64, cgi, cgitb import json, syslog, netrc, urlparse, pwd, Cookie import struct, socket, pwd if not sys.stdin.isatty(): cgitb.enable if not hasattr(json, 'loads'): json.loads = json.reads README = 'put consumer key and secret as login and password in .netrc file' SCRIPT_NAME = urlparse.urlsplit(os.getenv('REQUEST_URI') or '/').path APIHOST = 'api.twitter.com' SERVER_PORT = os.getenv('SERVER_PORT') or '' SCHEME = 'http' if SERVER_PORT in ['80', ''] else 'https' REMOTE_ADDR = os.getenv('REMOTE_ADDR') or '0.0.0.0' SSL_SESSION_ID = os.getenv('SSL_SESSION_ID') or '' HTTP_COOKIE = os.getenv('HTTP_COOKIE') or '' COOKIE = Cookie.SimpleCookie() COOKIE.load(HTTP_COOKIE) BASEURL = '%s://%s/' % (SCHEME, os.getenv('SERVER_NAME') or 'localhost') SCRIPT_URL = urlparse.urljoin(BASEURL, SCRIPT_NAME) DAY = 24 * 60 * 60 # a day in seconds TEMP_PATH = '/tmp/twitter_oauth' IMAGE_URLS = {'http': 'profile_image_url', 'https': 'profile_image_url_https'} IMAGE_URL = IMAGE_URLS[SCHEME] STATUS = [] LONGEVITY = DAY # session expiration if not os.getenv('HOME'): # needed for netrc module to work # also needed to store member data, not safe in webspace os.environ['HOME'] = pwd.getpwuid(os.geteuid()).pw_dir try: NETRC = netrc.netrc() CONSUMER_KEY, ignored, CONSUMER_SECRET = NETRC.authenticators(APIHOST) except: CONSUMER_KEY, NETRC, CONSUMER_SECRET = 'anonymous', None, 'anonymous' log('netrc file not set up properly, see README') OAUTH_CALLBACK = os.getenv('OAUTH_CALLBACK') or SCRIPT_URL PARAMETERS = { 'oauth_consumer_key': os.getenv('OAUTH_CONSUMER_KEY') or CONSUMER_KEY, 'oauth_signature_method': 'HMAC-SHA1', 'oauth_signature': '', 'oauth_timestamp': os.getenv('OAUTH_TIMESTAMP') or '', 'oauth_nonce': os.getenv('OAUTH_NONCE') or '', 'oauth_version': '1.0', 'oauth_callback': OAUTH_CALLBACK, 'oauth_token': os.getenv('OAUTH_TOKEN') or '', 'oauth_verifier': os.getenv('OAUTH_VERIFIER') or '', } SECRETS = { 'oauth_consumer_secret': os.getenv('OAUTH_CONSUMER_SECRET') or CONSUMER_SECRET, 'oauth_token_secret': os.getenv('OAUTH_TOKEN_SECRET') or '', } DEBUGLEVEL = int(os.getenv('DEBUGLEVEL') or 0) REQUEST_METHOD = (os.getenv('REQUEST_METHOD') or 'GET').upper() INPUT = cgi.FieldStorage() STYLE = """ """ HEADER = """ %s %s
""" % ('Twitter OAuth Test Page', STYLE) HEADLINE = """
Left-hand column is for client-server status messages; the center column is the timeline of posts from around the world. In both cases, the most recent activity is at the top of the column. Advertisements will appear on the right occasionally in order to pay for website fees and provide a minimal income for the website owner.
""" FOOTER = """
""" if PARAMETERS['oauth_consumer_key'] == SECRETS['oauth_consumer_secret']: print >>sys.stderr, 'WARNING: see README to set up .netrc file' sys.exit(1) def nonce(): return '%x' % random.getrandbits(64) def timestamp(): return '%d' % int(time.time()) def update_parameters(force = True): 'generate new nonce and timestamp' if force or not PARAMETERS['oauth_timestamp']: PARAMETERS['oauth_timestamp'] = timestamp() if force or not PARAMETERS['oauth_nonce']: PARAMETERS['oauth_nonce'] = nonce() def make_secure(url): 'make an insecure URL into a secure (SSL) URL' taken_apart = list(urlparse.urlsplit(url)) taken_apart[0] = 'https' return urlparse.urlunsplit(taken_apart) def twitter_oauth(): 'fetch a query token, authorize, get access token and fetch some data' update_parameters(force = False) log('PARAMETERS: %s' % repr(PARAMETERS)) if INPUT.has_key('oauth_verifier'): # from callback return logged_in() elif INPUT.has_key('denied'): return display_login_denied() elif INPUT.has_key('login'): 'initial call from browser to CGI script, redirect to twitter' request_token = get_request_token() log('request_token: %s' % request_token) assert request_token['oauth_callback_confirmed'] == 'true' save_keys(request_token) return authenticate(request_token['oauth_token']) else: return display_home_page() def display_home_page(): 'first visit to page' userid = get_session() if userid: userdata = save_user_info(data = {'user_id': userid}) return display_logged_in(userdata) return show_page() def show_status(): first = True print '
' print '
' for status_line in reversed(STATUS): print div(status_line, 'class="%s"' % ("boxed" + [" underneath", ""][first])) first = False print '
' print '
' def show_contents(): print '
' print '
' print '
' print div('Contents go here', 'class="boxed"') print '
' print '
' print '
' def show_ads(): print '
' print '
' for line in shuffle(readlines('ads.html')): print line print '
' print '
' def send_headers(*headers): for header in headers: sys.stdout.write('%s\r\n' % (header or '')) def logged_in(): if not load_keys(INPUT.getvalue('oauth_token')): log('oauth_token "%s" != "%s"' % (PARAMETERS['oauth_token'], INPUT.getvalue('oauth_token'))) return display_home_page() PARAMETERS['oauth_verifier'] = INPUT.getvalue('oauth_verifier', '') update_parameters() login_data = get_access_token() if login_data['oauth_token'] == PARAMETERS['oauth_token']: log('oauth_token did not update after request for access token!') return display_home_page() else: PARAMETERS['oauth_token'] = login_data['oauth_token'] SECRETS['oauth_token_secret'] = login_data['oauth_token_secret'] update_parameters() save_user_info(login_data) userdata = save_user_info(get_credentials()) return display_logged_in(userdata) def make_cookie(userid): if not 'session' in COOKIE: COOKIE['session'] = make_session(userid) return COOKIE.output() def display_logged_in(userdata): 'setup session and show what we want to show to member' if is_localnet() or secure_connection(): send_headers(make_cookie(userdata['user_id'][0])) status('''
%s is logged in to %s
''' % ( userdata[IMAGE_URL][-1], userdata['screen_name'][-1], SCRIPT_URL)) return show_page() def show_page(): send_headers('Content-type: text/html', None) print HEADER print HEADLINE show_contents() # on left show_ads() # on right show_status() # center print FOOTER return '' def div(html, attributes=''): 'wrap html in div tags' return ('
' % attributes) + html + '
' def p(html): 'wrap html in paragraph tags' return '

' + html + '

' def display_login_denied(): 'user indicated to Twitter that ey wants to cancel' denied = tmpfile(INPUT['denied']) try: os.unlink(denied) except: log('could not unlink "%s"' % denied) status('Twitter login was denied') show_page() def save_keys(tokens): storage = open('/tmp/%s.%s' % (os.getenv('REMOTE_ADDR'), tokens['oauth_token']), 'w') chgrp(storage.name, 'www-data') print >>storage, tokens['oauth_token_secret'] storage.close() def tmpfile(token): return '/tmp/%s.%s' % (os.getenv('REMOTE_ADDR'), token) def load_keys(token): PARAMETERS['oauth_token'] = token try: storage = open(tmpfile(token)) SECRETS['oauth_token_secret'] = storage.readline().strip() except Exception, error: log('could not find token %s: %s' % (token, repr(error))) return False storage.close() try: os.unlink(storage.name) except Exception, error: log('could not remove %s: %s' % (storage.name, repr(error))) return True def get_credentials(): url = 'https://api.twitter.com/1/account/verify_credentials.json' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = SECRETS['oauth_token_secret'] PARAMETERS['oauth_callback'] = '' # zero this out before signature PARAMETERS['oauth_verifier'] = '' PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('GET', url, parameters('signing'))))) request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) log(request.get_header('Authorization')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request) reply = request.read() request.close() return json.loads(reply) def get_access_token(): url = 'https://api.twitter.com/oauth/access_token' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = SECRETS['oauth_token_secret'] PARAMETERS['oauth_callback'] = '' # zero this out before signature PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('POST', url, parameters('signing'))))) if secret == 'L8qq9PZyRg6ieKGEKhZolGC0vJWLw8iEJ88DRdyOg': # testing return {'oauth_signature': PARAMETERS['oauth_signature']} request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request, urllib.urlencode({'oauth_verifier': PARAMETERS['oauth_verifier']})) reply = request.read() request.close() return dict(cgi.parse_qsl(reply)) def authenticate(oauth_token): url = 'https://api.twitter.com/oauth/authenticate' url += '?%s=%s' % ('oauth_token', oauth_token) return 'location: %s\r\n\r\n' % url def get_request_token(): 'ask Twitter for a request token' url = 'https://api.twitter.com/oauth/request_token' update_parameters(force = False) secret = SECRETS['oauth_consumer_secret'] token_secret = '' # we don't have a token secret yet PARAMETERS['oauth_signature'] = sign('&'.join((secret, token_secret)), '&'.join(map(urlencode, ('POST', url, parameters('signing'))))) if secret == 'L8qq9PZyRg6ieKGEKhZolGC0vJWLw8iEJ88DRdyOg': # testing return {'oauth_signature': PARAMETERS['oauth_signature']} request = urllib2.Request(url) request.add_header('Authorization', 'OAuth ' + parameters('header')) opener = urllib2.build_opener(urllib2.HTTPSHandler(debuglevel = DEBUGLEVEL)) request = opener.open(request, urllib.urlencode({})) reply = request.read() request.close() return dict(cgi.parse_qsl(reply)) def byte_encode(match): 'for use with re.sub' return '%%%02X' % ord(match.group()) def urlencode(string): "unreserved = ALPHA, DIGIT, '-', '.', '_', '~'" return re.sub(re.compile('[^0-9A-Za-z._~-]'), byte_encode, string.encode('utf8')) def urldecode(string): 'undo urlencode' return urllib.unquote(string).decode('utf8') def log(message): syslog.syslog(syslog.LOG_DEBUG, message) if sys.stdin.isatty() and DEBUGLEVEL: print >>sys.stderr, message def sign(secret, text): log('signature base string: "%s", secret: %s' % ( repr(text), repr(secret))) digest = hmac.new(secret, text, hashlib.sha1).digest() return base64.encodestring(digest).rstrip() def base64string(hexstring): recoded = base64.encodestring(hexstring.decode('hex')).rstrip() log('recoded: %s' % recoded) return recoded def parameters(format): if format == 'header': formatted = ', '.join(['%s="%s"' % (key, urlencode(value.encode('utf8'))) for key, value in PARAMETERS.items() if value and key not in ['oauth_verifier']]) elif format == 'signing': formatted = '&'.join(sorted(['%s=%s' % (key, urlencode(value.encode('utf8'))) for key, value in (PARAMETERS.items()) if value and key not in ['oauth_signature']])) #print >>sys.stderr, format, formatted return formatted def hmac_sha1_test(): 'from tools.ietf.org/html/rfc2202' assert sign('\x0b' * 20, 'Hi There') == base64string( 'b617318655057264e28bc0b6fb378c8ef146be00') assert sign('Jefe', 'what do ya want for nothing?') == base64string( 'effcdf6ae5eb2fa2d27416d5f184df9c259a7c79') assert sign('\xaa' * 20, '\xdd' * 50) == base64string( '125d7342b9ac11cd91a39af48aa17b4f63f175d3') # last test from http://oauth.net/core/1.0/#rfc.section.9.1.1, app. A.5.2 assert sign('kd94hf93k423kf44&pfkkdhi9sl3r4s00', 'GET&http%3A%2F%2Fphotos.example.net%2Fphotos&file%3Dvacation.jpg%26' + \ 'oauth_consumer_key%3Ddpf43f3p2l4k3l03%26oauth_nonce%3D' + \ 'kllo9940pd9333jh%26oauth_signature_method%3DHMAC-SHA1%26' + \ 'oauth_timestamp%3D1191242096%26oauth_token%3Dnnch734d00sl2jdk%26' + \ 'oauth_version%3D1.0%26size%3Doriginal') == 'tR3+Ty81lMeYAr/Fid0kMTYa/WM=' return True def save_user_info(data): id = data['id_str'] if data.has_key('id_str') else data['user_id'] member_file = open_store(id) if member_file.mode != 'w': try: member_data = json.loads(member_file.read()) except: member_data = {} member_file.seek(0, 0) else: member_data = {} for key in data: if isinstance(data[key], (str, unicode)): if member_data.has_key(key) and not data[key] in member_data[key]: member_data[key].append(data[key]) else: member_data[key] = [data[key]] member_file.write(json.dumps(member_data)) member_file.truncate() member_file.close() return member_data def open_store(filename = 'this_is_a_test'): permissions = 0750 # owner all, group read+execute, everybody else none members = os.path.join(os.getenv('HOME') or '/var/tmp', 'members') if not os.path.exists(members): os.mkdir(members, permissions) parts = [] log('filename: %s' % filename) for index in range(1, len(filename) + 1): parts.append(filename[:index]) new = os.path.join(members, *parts) if not os.path.exists(new): os.mkdir(new, permissions) parts.append(filename) new = os.path.join(members, *parts) if os.path.exists(new): store = open(new, 'r+') else: store = open(new, 'w') chgrp(store.name, 'www-data') return store def get_cookie(): 'check if cookie was offered and if we have it cached' if 'session' in COOKIE: if os.path.exists(temp(COOKIE['session'].value)): status('Found a session from cookie') return read(temp(COOKIE['session'].value)).strip() else: return status('Cookie %s offered, but no session was cached' % HTTP_COOKIE) else: return status('No cookie was offered') def get_ssl_id(): 'check if we have a SSL_SESSION_ID and if a session was cached' if SSL_SESSION_ID: if os.path.exists(temp(SSL_SESSION_ID)): status('Found a session from SSL_SESSION_ID') return read(temp(SSL_SESSION_ID)) else: return status('SSL_SESSION_ID %s present but no session was cached' % SSL_SESSION_ID) else: return status('No SSL_SESSION_ID found') def get_session(userid = None): 'check if session exists and return contents of key if so' if not secure_connection(): return status('No sessions possible without SSL (unsafe)') session_key = get_cookie() or get_ssl_id() if session_key: userid, starttime, remote_addr = session_key.split(':') if time.time() - int(starttime) > LONGEVITY: userid = status('Session expired') if remote_addr != REMOTE_ADDR: userid = status('IP address changed: %s to %s' % (remote_addr, REMOTE_ADDR)) if not userid: status('No valid session found') status('''To post messages, Sign in with Twitter''') return status('''Alternatively, you can obtain a client certificate from CACert.org and sign in with your cert''') return userid def status(message, returned_status = None): 'set status and return status' STATUS.append(message) return returned_status def read(filename, returned = str): 'get file contents and close it properly' input = open(filename) data = getattr(input, ['read', 'readlines'][returned == list])() input.close() return data def readlines(filename): 'return file contents as list, closing file properly' return read(filename, returned = list) def age(filename, file_age = None): if not os.path.exists(filename): file_age = sys.maxint else: mtime = os.stat(filename).st_mtime file_age = time.time() - mtime return file_age def chgrp(filename, groupname): gid = pwd.getpwnam(groupname).pw_gid uid = os.stat(filename).st_uid os.chown(filename, uid, gid) def seed(): 'return a unique unguessable string for generating session keys' seed_path = temp('seed.dat') if age(seed_path) > DAY: seedfile = open(seed_path, 'w') chgrp(seedfile.name, 'www-data') seed = nonce() seedfile.write(seed) seedfile.close() else: seedfile = open(seed_path) seed = seedfile.read() seedfile.close() return seed def temp(filename, temp_path = TEMP_PATH): if not os.path.exists(temp_path): os.mkdir(temp_path) os.chmod(temp_path, 0770) chgrp(temp_path, 'www-data') return os.path.join(temp_path, filename) def make_session(userid): 'create a session key to use in a cookie' syslog.syslog(syslog.LOG_DEBUG, 'userid=%s' % userid) base_string = ':'.join([userid, timestamp(), REMOTE_ADDR]) session_key = sign(seed(), base_string).decode('base64').encode('hex') output = open(temp(session_key), 'w') chgrp(output.name, 'www-data') print >>output, base_string output.close() if SSL_SESSION_ID: # backup method in case cookies not enabled os.symlink(temp(session_key), temp(SSL_SESSION_ID)) return session_key def is_localnet(ipaddr = REMOTE_ADDR): address = struct.unpack('>L', socket.inet_aton(ipaddr))[0] localnet = address / (1 << 24) == 127 if not localnet: return status('Not logged in on localnet, must use SSL to login') else: return status('Bypassing SSL requirement, logged in on localnet', True) def shuffle(some_list): 'just so it does not return None as random.shuffle does' random.shuffle(some_list) return some_list def secure_connection(): 'determine if we are using SSL (TLS) or not' if SCHEME == 'https': return status('Using secure connection', True) else: return status('Insecure connection -- use SSL' % ( make_secure(SCRIPT_URL))) def certlogin(): 'called from certlogin subdirectory' os.chdir('..') return show_page() if __name__ == '__main__': command = os.path.splitext(os.path.basename(sys.argv[0]))[0] if command == 'index': command = 'twitter_oauth' # case of index.cgi log('args: %s' % sys.argv[1:]) print eval(command)(*sys.argv[1:])