From 32aa96afa7af2839ce15bec1de6deba444423773 Mon Sep 17 00:00:00 2001 From: pictuga Date: Mon, 6 Apr 2015 23:26:12 +0800 Subject: [PATCH] Cache HTTP content using a custom Handler Much much cleaner. Nothing comparable --- README.md | 11 +- morss/crawler.py | 212 ++++++++++++++++++++++++++++++++++---- morss/feedify.py | 10 +- morss/morss.py | 257 +++++++++++------------------------------------ 4 files changed, 264 insertions(+), 226 deletions(-) diff --git a/README.md b/README.md index 4090977..07498d2 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,7 @@ Using cache and passing arguments: ```python >>> import morss >>> url = 'http://feeds.bbci.co.uk/news/rss.xml' ->>> cache = '/tmp/morss-cache' # cache folder, needs write permission +>>> cache = '/tmp/morss-cache.db' # sqlite cache location >>> options = {'csv':True, 'md':True} >>> xml_string = morss.process(url, cache, options) >>> xml_string[:50] @@ -130,16 +130,15 @@ Using cache and passing arguments: Doing it step-by-step: ```python -import morss +import morss, morss.crawler url = 'http://newspaper.example/feed.xml' options = morss.Options(csv=True, md=True) # arguments -cache_path = '/tmp/morss-cache' # cache folder, needs write permission +morss.crawler.sqlite_default = '/tmp/morss-cache.db' # sqlite cache location -url, cache = morss.Init(url, cache_path, options) # properly create folders and objects -rss = morss.Fetch(url, cache, options) # this only grabs the RSS feed +rss = morss.Fetch(url, options) # this only grabs the RSS feed rss = morss.Before(rss, options) # applies first round of options -rss = morss.Gather(rss, url, cache, options) # this fills the feed and cleans it up +rss = morss.Gather(rss, url, options) # this fills the feed and cleans it up rss = morss.After(rss, options) # applies second round of options output = morss.Format(rss, options) # formats final feed diff --git a/morss/crawler.py b/morss/crawler.py index 3bbc7fd..fa394a4 100644 --- a/morss/crawler.py +++ b/morss/crawler.py @@ -1,20 +1,26 @@ +import sys + import ssl import socket from gzip import GzipFile -from io import BytesIO +from io import BytesIO, StringIO +import re +import sqlite3 +import time try: from urllib2 import BaseHandler, addinfourl, parse_keqv_list, parse_http_list + import mimetools except ImportError: from urllib.request import BaseHandler, addinfourl, parse_keqv_list, parse_http_list - -import re + import email try: basestring except NameError: - basestring = str + basestring = unicode = str + buffer = memoryview MIMETYPE = { @@ -186,27 +192,195 @@ class HTTPRefreshHandler(BaseHandler): https_response = http_response -class EtagHandler(BaseHandler): - def __init__(self, cache="", etag=None, lastmodified=None): - self.cache = cache - self.etag = etag - self.lastmodified = lastmodified +class NotInCache(IOError): + pass + + +class BaseCacheHandler(BaseHandler): + " Cache based on etags/last-modified. Inherit from this to implement actual storage " + + private_cache = False # False to behave like a CDN (or if you just don't care), True like a PC + handler_order = 499 + + def __init__(self, force_min=None): + self.force_min = force_min # force_min (seconds) to bypass http headers, -1 forever, 0 never, -2 do nothing if not in cache, -3 is like -2 but raises an error + + def _load(self, url): + out = list(self.load(url)) + + if sys.version > '3': + out[2] = email.message_from_string(out[2] or unicode()) # headers + else: + out[2] = mimetools.Message(StringIO(out[2] or unicode())) + + out[3] = out[3] or bytes() # data + out[4] = out[4] or 0 # timestamp + + return out + + def load(self, url): + " Return the basic vars (code, msg, headers, data, timestamp) " + return (None, None, None, None, None) + + def _save(self, url, code, msg, headers, data, timestamp): + headers = unicode(headers) + self.save(url, code, msg, headers, data, timestamp) + + def save(self, url, code, msg, headers, data, timestamp): + " Save values to disk " + pass def http_request(self, req): - if self.cache: - if self.etag: - req.add_unredirected_header('If-None-Match', self.etag) - if self.lastmodified: - req.add_unredirected_header('If-Modified-Since', self.lastmodified) + (code, msg, headers, data, timestamp) = self._load(req.get_full_url()) + + if 'etag' in headers: + req.add_unredirected_header('If-None-Match', headers['etag']) + + if 'last-modified' in headers: + req.add_unredirected_header('If-Modified-Since', headers.get('last-modified')) return req + def http_open(self, req): + (code, msg, headers, data, timestamp) = self._load(req.get_full_url()) + + # some info needed to process everything + cache_control = parse_http_list(headers.get('cache-control', ())) + cache_control += parse_http_list(headers.get('pragma', ())) + + cc_list = [x for x in cache_control if '=' not in x] + cc_values = parse_keqv_list([x for x in cache_control if '=' in x]) + + cache_age = time.time() - timestamp + + # list in a simple way what to do when + if self.force_min in (-2, -3): + if code is not None: + # already in cache, perfect, use cache + pass + + else: + # ok then... + if self.force_min == -2: + headers['morss'] = 'from_cache' + resp = addinfourl(BytesIO(), headers, req.get_full_url(), 409) + resp.msg = 'Conflict' + return resp + + elif self.force_min == -3: + raise NotInCache() + + elif code is None: + # cache empty, refresh + return None + + elif self.force_min == -1: + # force use cache + pass + + elif self.force_min == 0: + # force refresh + return None + + elif self.force_min is None and ('no-cache' in cc_list + or 'no-store' in cc_list + or ('private' in cc_list and not self.private)): + # kindly follow web servers indications, refresh + return None + + elif 'max-age' in cc_values and int(cc_values['max-age']) > cache_age: + # server says it's still fine (and we trust him, if not, use force_min=0), use cache + pass + + elif self.force_min is not None and self.force_min > cache_age: + # still recent enough for us, use cache + pass + + else: + # according to the www, we have to refresh when nothing is said + return None + + # return the cache as a response + headers['morss'] = 'from_cache' # TODO delete the morss header from incoming pages, to avoid websites messing up with us + resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code) + resp.msg = msg + + return resp + + def http_response(self, req, resp): + # code for after-fetch, to know whether to save to hard-drive (if stiking to http headers' will) + + if resp.code == 304: + return resp + + if ('cache-control' in resp.headers or 'pragma' in resp.headers) and self.force_min is None: + cache_control = parse_http_list(resp.headers.get('cache-control', ())) + cache_control += parse_http_list(resp.headers.get('pragma', ())) + + cc_list = [x for x in cache_control if '=' not in x] + + if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private): + # kindly follow web servers indications + return resp + + if resp.headers.get('morss') == 'from_cache': + # it comes from cache, so no need to save it again + return resp + + # save to disk + data = resp.read() + self._save(req.get_full_url(), resp.code, resp.msg, resp.headers, data, time.time()) + + fp = BytesIO(data) + old_resp = resp + resp = addinfourl(fp, old_resp.headers, old_resp.url, old_resp.code) + resp.msg = old_resp.msg + + return resp + def http_error_304(self, req, fp, code, msg, headers): - if self.etag: - headers.addheader('etag', self.etag) - if self.lastmodified: - headers.addheader('last-modified', self.lastmodified) - resp = addinfourl(BytesIO(self.cache), headers, req.get_full_url(), 200) + (code, msg, headers, data, timestamp) = self._load(req.get_full_url()) + + resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code) + resp.msg = msg + return resp https_request = http_request + https_open = http_open + https_response = http_response + + +sqlite_default = ':memory' + + +class SQliteCacheHandler(BaseCacheHandler): + def __init__(self, force_min=-1, filename=None): + BaseCacheHandler.__init__(self, force_min) + + self.con = sqlite3.connect(filename or sqlite_default, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False) + self.con.execute('create table if not exists data (url unicode PRIMARY KEY, code int, msg unicode, headers unicode, data bytes, timestamp int)') + self.con.commit() + + def __del__(self): + self.con.close() + + def load(self, url): + row = self.con.execute('select * from data where url=?', (url,)).fetchone() + + if not row: + return (None, None, None, None, None) + + return row[1:] + + def save(self, url, code, msg, headers, data, timestamp): + data = buffer(data) + + if self.con.execute('select code from data where url=?', (url,)).fetchone(): + self.con.execute('update data set code=?, msg=?, headers=?, data=?, timestamp=? where url=?', + (code, msg, headers, data, timestamp, url)) + + else: + self.con.execute('insert into data values (?,?,?,?,?,?)', (url, code, msg, headers, data, timestamp)) + + self.con.commit() diff --git a/morss/feedify.py b/morss/feedify.py index 094fe46..36d230d 100644 --- a/morss/feedify.py +++ b/morss/feedify.py @@ -94,20 +94,20 @@ def format_string(string, getter, error=False): return out -def pre_worker(url, cache): +def pre_worker(url): if urlparse(url).netloc == 'itunes.apple.com': match = re.search('/id([0-9]+)(\?.*)?$', url) if match: iid = match.groups()[0] redirect = 'https://itunes.apple.com/lookup?id={id}'.format(id=iid) - cache.set('redirect', redirect) + return redirect + + return None class Builder(object): - def __init__(self, link, data=None, cache=False): + def __init__(self, link, data=None): self.link = link - self.cache = cache - self.data = data if self.data is None: diff --git a/morss/morss.py b/morss/morss.py index 0939a19..a8a08ac 100644 --- a/morss/morss.py +++ b/morss/morss.py @@ -140,112 +140,16 @@ def parseOptions(options): return out -class Cache: - """ Light, error-prone caching system. """ - - def __init__(self, folder=None, key='cache', lifespan=10 * 24 * 3600): - self._key = key - self._dir = folder - self._lifespan = lifespan - - self._cache = {} - - if self._dir is None: - self._hash = "NO CACHE" - return - - maxsize = os.statvfs('./').f_namemax - len(self._dir) - 1 - 4 # ".tmp" - self._hash = quote_plus(self._key)[:maxsize] - - self._file = self._dir + '/' + self._hash - self._file_tmp = self._file + '.tmp' - - try: - data = open(self._file).read() - if data: - self._cache = json.loads(data) - except IOError: - pass - except ValueError: - log('JSON cache parse fail') - - def __del__(self): - self.save() - - def __contains__(self, key): - return key in self._cache - - def get(self, key): - if key in self._cache: - self._cache[key]['last'] = time.time() - return self._cache[key]['value'] - else: - return None - - def set(self, key, content): - if sys.version > '3' and isinstance(content, bytes): - content = content.decode('utf-8') - - self._cache[key] = {'last': time.time(), 'value': content} - - __getitem__ = get - __setitem__ = set - - def save(self): - if len(self._cache) == 0 or self._dir is None: - return - - if not os.path.exists(self._dir): - os.makedirs(self._dir) - - for i in list(self._cache.keys()): - if time.time() - self._cache[i]['last'] > self._lifespan > -1: - del self._cache[i] - - out = json.dumps(self._cache, indent=4) - - try: - open(self._file_tmp, 'w+').write(out) - os.rename(self._file_tmp, self._file) - except IOError: - log('failed to write cache to tmp file') - except OSError: - log('failed to move cache to file') - - def last(self, key): - if key not in self._cache: - return -1 - - return self._cache[key]['last'] - - def age(self, key): - if key not in self._cache: - return -1 - - return time.time() - self.last(key) - - def new(self, *arg, **karg): - """ Returns a Cache object in the same directory """ - if arg[0] != self._key: - return Cache(self._dir, *arg, **karg) - else: - return self - - default_handlers = [crawler.GZIPHandler(), crawler.UAHandler(DEFAULT_UA), crawler.AutoRefererHandler(), crawler.HTTPEquivHandler(), crawler.HTTPRefreshHandler(), crawler.EncodingFixHandler()] -def accept_handler(*kargs): +def custom_handler(accept, delay=DELAY): handlers = default_handlers[:] - handlers.append(crawler.ContentNegociationHandler(*kargs)) - return handlers + handlers.append(crawler.ContentNegociationHandler(accept)) + handlers.append(crawler.SQliteCacheHandler(delay)) -def etag_handler(accept, strict, cache, etag, lastmodified): - handlers = default_handlers[:] - handlers.append(crawler.ContentNegociationHandler(accept, strict)) - handlers.append(crawler.EtagHandler(cache, etag, lastmodified)) - return handlers + return build_opener(*handlers) def Fix(item, feedurl='/'): @@ -315,7 +219,7 @@ def Fix(item, feedurl='/'): return item -def Fill(item, cache, options, feedurl='/', fast=False): +def Fill(item, options, feedurl='/', fast=False): """ Returns True when it has done its best """ if not item.link: @@ -364,58 +268,44 @@ def Fill(item, cache, options, feedurl='/', fast=False): log('no used link') return True - # check cache and previous errors - if link in cache: - content = cache.get(link) - match = re.search(r'^error-([a-z]{2,10})$', content) - if match: - if cache.age(link) < DELAY and not options.theforce: - log('cached error: %s' % match.groups()[0]) - return True - else: - log('ignored old error: %s' % match.groups()[0]) - else: - log('cached') - item.push_content(cache.get(link)) - return True + # download + delay = -1 - # super-fast mode if fast: + # super-fast mode + delay = -3 + + try: + con = custom_handler(('html', 'text/*'), delay).open(link, timeout=TIMEOUT) + data = con.read() + + except crawler.NotInCache: log('skipped') return False - # download - try: - con = build_opener(*accept_handler(('html', 'text/*'), True)).open(link, timeout=TIMEOUT) - data = con.read() except (IOError, HTTPException) as e: - log('http error: %s' % e.message) - cache.set(link, 'error-http') + log('http error') return True contenttype = con.info().get('Content-Type', '').split(';')[0] if contenttype not in MIMETYPE['html'] and contenttype != 'text/plain': log('non-text page') - cache.set(link, 'error-type') return True out = breadability.readable.Article(data, url=con.url).readable if options.hungry or count_words(out) > max(count_content, count_desc): item.push_content(out) - cache.set(link, out) + else: log('link not bigger enough') - cache.set(link, 'error-length') return True return True -def Init(url, cache_path, options): - # url clean up - log(url) - +def Fetch(url, options): + # basic url clean-up if url is None: raise MorssException('No url provided') @@ -428,91 +318,66 @@ def Init(url, cache_path, options): if isinstance(url, bytes): url = url.decode() - # cache - cache = Cache(cache_path, url) - log(cache._hash) - - return (url, cache) - - -def Fetch(url, cache, options): # do some useful facebook work - feedify.pre_worker(url, cache) - - if 'redirect' in cache: - url = cache.get('redirect') + pre = feedify.pre_worker(url) + if pre: + url = pre log('url redirect') log(url) # fetch feed - if not options.theforce and 'xml' in cache and cache.age('xml') < DELAY and 'style' in cache: - log('xml cached') - xml = cache.get('xml') - style = cache.get('style') - else: - try: - opener = etag_handler(('xml', 'html'), False, cache.get(url), cache.get('etag'), cache.get('lastmodified')) - con = build_opener(*opener).open(url, timeout=TIMEOUT * 2) - xml = con.read() - except (HTTPError) as e: - raise MorssException('Error downloading feed (HTTP Error %s)' % e.code) - except (crawler.InvalidCertificateException) as e: - raise MorssException('Error downloading feed (Invalid SSL Certificate)') - except (IOError, HTTPException): - raise MorssException('Error downloading feed') + delay = DELAY - cache.set('xml', xml) - cache.set('etag', con.info().get('etag')) - cache.set('lastmodified', con.info().get('last-modified')) + if options.theforce: + delay = 0 - contenttype = con.info().get('Content-Type', '').split(';')[0] + try: + con = custom_handler(('xml', 'html'), delay).open(url, timeout=TIMEOUT * 2) + xml = con.read() - if url.startswith('https://itunes.apple.com/lookup?id='): - style = 'itunes' - elif xml.startswith(b' max_time >= 0 or i + 1 > max_item >= 0: if not options.proxy: - if Fill(item, cache, options, url, True) is False: + if Fill(item, options, url, True) is False: item.remove() return else: if not options.proxy: - Fill(item, cache, options, url) + Fill(item, options, url) queue = Queue() @@ -567,7 +432,6 @@ def Gather(rss, url, cache, options): queue.put([i, item]) queue.join() - cache.save() if options.ad: new = rss.items.append() @@ -663,10 +527,10 @@ def process(url, cache=None, options=None): options = [] options = Options(options) - url, cache = Init(url, cache, options) - rss = Fetch(url, cache, options) + if cache: crawler.sqlite_default = cache + rss = Fetch(url, options) rss = Before(rss, options) - rss = Gather(rss, url, cache, options) + rss = Gather(rss, url, options) rss = After(rss, options) return Format(rss, options) @@ -728,10 +592,10 @@ def cgi_app(environ, start_response): else: headers['content-type'] = 'text/xml' - url, cache = Init(url, os.getcwd() + '/cache', options) + crawler.sqlite_default = os.getcwd() + '/morss-cache.db' # get the work done - rss = Fetch(url, cache, options) + rss = Fetch(url, options) if headers['content-type'] == 'text/xml': headers['content-type'] = rss.mimetype @@ -739,7 +603,7 @@ def cgi_app(environ, start_response): start_response(headers['status'], list(headers.items())) rss = Before(rss, options) - rss = Gather(rss, url, cache, options) + rss = Gather(rss, url, options) rss = After(rss, options) out = Format(rss, options) @@ -795,10 +659,11 @@ def cli_app(): global DEBUG DEBUG = options.debug - url, cache = Init(url, os.path.expanduser('~/.cache/morss'), options) - rss = Fetch(url, cache, options) + crawler.sqlite_default = os.path.expanduser('~/.cache/morss-cache.db') + + rss = Fetch(url, options) rss = Before(rss, options) - rss = Gather(rss, url, cache, options) + rss = Gather(rss, url, options) rss = After(rss, options) out = Format(rss, options)