morss/feeds.py

402 lines
9.3 KiB
Python

#!/usr/bin/env python
from lxml import etree
import re
NSMAP = {'atom': 'http://www.w3.org/2005/Atom',
'atom03': 'http://purl.org/atom/ns#',
'media': 'http://search.yahoo.com/mrss/',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'slash': 'http://purl.org/rss/1.0/modules/slash/',
'dc': 'http://purl.org/dc/elements/1.1/',
'content': 'http://purl.org/rss/1.0/modules/content/',
'rssfake': 'http://purl.org/rss/1.0/'}
def load(url):
import urllib2
d = urllib2.urlopen(url).read()
return parse(d)
def tagNS(tag, nsmap=NSMAP):
match = re.search(r'^\{([^\}]+)\}(.*)$', tag)
if match:
match = match.groups()
for (key, url) in nsmap.iteritems():
if url == match[0]:
return "%s:%s" % (key, match[1].lower())
else:
match = re.search(r'^([^:]+):([^:]+)$', tag)
if match:
match = match.groups()
if match[0] in nsmap:
return "{%s}%s" % (nsmap[match[0]], match[1].lower())
return tag
def innerHTML(xml):
return (xml.text or '') + ''.join([etree.tostring(child) for child in xml.iterchildren()])
def cleanNode(xml):
[xml.remove(child) for child in xml.iterchildren()]
class FeedException(Exception):
pass
def parse(data):
# encoding
match = re.search('encoding=["\']?([0-9a-zA-Z-]+)', data[:100])
if match:
enc = match.groups()[0].lower()
data = data.decode(enc, 'ignore').encode(enc)
# parse
parser = etree.XMLParser(recover=True)
doc = etree.fromstring(data, parser)
# rss
match = doc.xpath("//atom03:feed|//atom:feed|//channel|//rdf:rdf|//rdf:RDF", namespaces=NSMAP)
if len(match):
mtable = { 'rdf:rdf': FeedParserRSS, 'channel': FeedParserRSS,
'atom03:feed': FeedParserAtom, 'atom:feed': FeedParserAtom }
match = match[0]
tag = tagNS(match.tag)
if tag in mtable:
return mtable[tag](doc, tag)
raise FeedException('unknow feed type')
class FeedBase(object):
"""
Base for xml-related classes, which provides simple wrappers around xpath
selection and item creation
"""
def xpath(self, path):
""" Test xpath rule on xml tree """
return self.root.xpath(path, namespaces=NSMAP)
def xget(self, path):
""" Returns the 1st xpath match """
match = self.xpath(path)
if len(match):
return match[0]
else:
return None
def xval(self, path):
""" Returns the .text of the 1st match """
match = self.xget(path)
if match is not None:
return match.text
else:
return ""
def xgetCreate(self, table):
""" Returns an element, and creates it when not present """
tag = table[self.tag]
match = self.xget(tag)
if match is not None:
return match
else:
element = etree.Element(tagNS(tag))
self.root.append(element)
return element
def tostring(self, **k):
""" Returns string using lxml. Arguments passed to tostring """
return etree.tostring(self.xml, pretty_print=True, **k)
class FeedDescriptor(object):
"""
Descriptor which gives off elements based on "self.getName" and
"self.setName" as getter/setters. Looks far better, and avoids duplicates
"""
def __init__(self, name):
self.name = name
def __get__(self, instance, owner):
getter = getattr(instance, 'get%s' % self.name.title())
return getter()
def __set__(self, instance, value):
setter = getattr(instance, 'set%s' % self.name.title())
return setter(value)
class FeedList(object):
"""
Class to map a list of xml elements against a list of matching objects,
while avoiding to recreate the same matching object over and over again. So
as to avoid extra confusion, list's elements are called "children" here, so
as not to use "items", which is already in use in RSS/Atom related code.
Comes with its very own descriptor.
"""
def __init__(self, getter, tag, childClass):
self.getter = getter
self.childClass = childClass
self.tag = tag
self._children = {} # id(xml) => FeedItem
def getChildren(self):
children = self.getter()
out = []
for child in children:
if id(child) in self._children:
out.append(self._children[id(child)])
else:
new = self.childClass(child, self.tag)
self._children[id(child)] = new
out.append(new)
return out
def __getitem__(self, key):
return self.getChildren()[key]
def __delitem__(self, key):
child = self.getter()[key]
if id(child) in self._children:
self._children[id(child)].remove()
del self._children[id(child)]
else:
child.getparent().remove(child)
def __len__(self):
return len(self.getter())
class FeedListDescriptor(object):
"""
Descriptor for FeedList
"""
def __init__(self, name):
self.name = name
self.items = {} # id(instance) => FeedList
def __get__(self, instance, owner):
key = id(instance)
if key in self.items:
return self.items[key]
else:
getter = getattr(instance, 'get%s' % self.name.title())
self.items[key] = FeedList(getter, instance.tag, eval(instance.itemClass))
return self.items[key]
class FeedParser(FeedBase):
itemClass = 'FeedItem'
mimetype = 'application/xml'
def __init__(self, xml, tag):
self.xml = xml
self.root = self.xml.xpath("//atom03:feed|//atom:feed|//channel|//rssfake:channel", namespaces=NSMAP)[0]
self.tag = tag
def getTitle(self):
return ""
def setTitle(self, value):
pass
def getDesc(self):
pass
def setDesc(self, value):
pass
def getItems(self):
return []
def setItems(self, value):
pass
title = FeedDescriptor('title')
description = desc = FeedDescriptor('desc')
items = FeedListDescriptor('items')
class FeedParserRSS(FeedParser):
"""
RSS Parser
"""
itemClass = 'FeedItemRSS'
mimetype = 'application/rss+xml'
def getTitle(self):
return self.xval('rssfake:title|title')
def setTitle(self, value):
table = { 'rdf:rdf': 'rssfake:title',
'channel': 'title'}
element = self.xgetCreate(table)
element.text = value
def getDesc(self):
return self.xval('rssfake:description|description')
def setDesc(self, value):
table = { 'rdf:rdf': 'rssfake:description',
'channel': 'description'}
element = self.xgetCreate(table)
element.text = value
def getItems(self):
return self.xpath('rssfake:item|item')
class FeedParserAtom(FeedParser):
"""
Atom Parser
"""
itemClass = 'FeedItemAtom'
mimetype = 'application/atom+xml'
def getTitle(self):
return self.xval('atom:title|atom03:title')
def setTitle(self, value):
table = { 'atom:feed': 'atom:title',
'atom03:feed': 'atom03:title'}
element = self.xgetCreate(table)
element.text = value
def getDesc(self):
return self.xval('atom:subtitle|atom03:subtitle')
def setDesc(self, value):
table = { 'atom:feed': 'atom:subtitle',
'atom03:feed': 'atom03:subtitle'}
element = self.xgetCreate(table)
element.text = value
def getItems(self):
return self.xpath('atom:entry|atom03:entry')
class FeedItem(FeedBase):
def __init__(self, xml, tag):
self.root = self.xml = xml
self.tag = tag
def getTitle(self):
return ""
def setTitle(self):
pass
def getDesc(self):
return ""
def setDesc(self, value):
pass
def getContent(self):
return ""
def setContent(self, value):
pass
title = FeedDescriptor('title')
link = FeedDescriptor('link')
description = desc = FeedDescriptor('desc')
content = FeedDescriptor('content')
def remove(self):
self.xml.getparent().remove(self.xml)
class FeedItemRSS(FeedItem):
def getTitle(self):
return self.xval('rssfake:title|title')
def setTitle(self, value):
table = { 'rdf:rdf': 'rssfake:title',
'channel': 'title'}
element = self.xgetCreate(table)
element.text = value
def getLink(self):
return self.xval('rssfake:link|link')
def setLink(self, value):
table = { 'rdf:rdf': 'rssfake:link',
'channel': 'link'}
element = self.xgetCreate(table)
element.text = value
def getDesc(self):
return self.xval('rssfake:description|description')
def setDesc(self, value):
table = { 'rdf:rdf': 'rssfake:description',
'channel': 'description'}
element = self.xgetCreate(table)
element.text = value
def getContent(self):
return self.xval('content:encoded')
def setContent(self, value):
table = { 'rdf:rdf': 'content:encoded',
'channel': 'content:encoded'}
element = self.xgetCreate(table)
element.text = value
class FeedItemAtom(FeedItem):
def getTitle(self):
return self.xval('atom:title|atom03:title')
def setTitle(self, value):
table = { 'atom:feed': 'atom:title',
'atom03:feed': 'atom03:title'}
element = self.xgetCreate(table)
element.text = value
def getLink(self):
return self.xget('atom:link|atom03:link').get('href', '')
def setLink(self, value):
table = { 'atom:feed': 'atom:link',
'atom03:feed': 'atom03:link'}
element = self.xgetCreate(table)
element.attrib['href'] = value
def getDesc(self):
# default "type" is "text"
element = self.xget('atom:summary|atom03:summary')
if element is not None:
return innerHTML(element)
else:
return ""
def setDesc(self, value):
table = { 'atom:feed': 'atom:summary',
'atom03:feed': 'atom03:summary'}
element = self.xgetCreate(table)
if element.attrib.get('type', '') == 'xhtml':
cleanNode(element)
element.attrib['type'] = 'html'
element.text = value
def getContent(self):
element = self.xget('atom:content|atom03:content')
if element is not None:
return innerHTML(element)
else:
return ""
def setContent(self, value):
table = { 'atom:feed': 'atom:content',
'atom03:feed': 'atom03:content'}
element = self.xgetCreate(table)
if element.attrib.get('type', '') == 'xhtml':
cleanNode(element)
element.attrib['type'] = 'html'
element.text = value