ansible-roles/ckan/files/csw.py

314 lines
11 KiB
Python
Raw Normal View History

import re
import urllib
import urllib2
import requests
import urlparse
import lxml
from xml.dom.minidom import Document
import logging
from ckan import model
from ckan.plugins.core import SingletonPlugin, implements
from ckanext.harvest.interfaces import IHarvester
from ckanext.harvest.model import HarvestObject
from ckanext.harvest.model import HarvestObjectExtra as HOExtra
from ckanext.spatial.lib.csw_client import CswService
from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback
class CSWHarvester(SpatialHarvester, SingletonPlugin):
'''
A Harvester for CSW servers
'''
implements(IHarvester)
csw=None
def info(self):
return {
'name': 'csw',
'title': 'CSW Server',
'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard'
}
def get_original_url(self, harvest_object_id):
obj = model.Session.query(HarvestObject).\
filter(HarvestObject.id==harvest_object_id).\
first()
parts = urlparse.urlparse(obj.source.url)
params = {
'SERVICE': 'CSW',
'VERSION': '2.0.2',
'REQUEST': 'GetRecordById',
'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd',
'OUTPUTFORMAT':'application/xml' ,
'ID': obj.guid
}
if parts.username and parts.password:
if parts.port is None:
url = urlparse.urlunparse((
parts.scheme,
parts.hostname,
parts.path,
None,
urllib.urlencode(params),
None
))
else:
url = urlparse.urlunparse((
parts.scheme,
parts.hostname,
parts.port,
parts.path,
urllib.urlencode(params),
None
))
else:
url = urlparse.urlunparse((
parts.scheme,
parts.netloc,
parts.path,
None,
urllib.urlencode(params),
None
))
return url
def output_schema(self):
return 'gmd'
def gather_stage(self, harvest_job):
log = logging.getLogger(__name__ + '.CSW.gather')
log.debug('CswHarvester gather_stage for job: %r', harvest_job)
# Get source URL
url = harvest_job.source.url
parts = urlparse.urlsplit(url)
if parts.username and parts.password:
url_path = parts.path.rsplit('/', 1)[0]
url_path = url_path + '/xml.user.login'
if parts.port is None:
auth_url = urlparse.urlunsplit((
parts.scheme,
parts.hostname,
url_path,
parts.query,
parts.fragment
))
else:
auth_url = urlparse.urlunsplit((
parts.scheme,
parts.hostname,
parts.port,
url_path,
parts.query
))
log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
auth_data = Document()
root = auth_data.createElement('request')
auth_data.appendChild(root)
username_tag = auth_data.createElement('username')
user_data = auth_data.createTextNode(parts.username)
username_tag.appendChild(user_data)
root.appendChild(username_tag)
password_tag = auth_data.createElement('password')
password_data = auth_data.createTextNode(parts.password)
password_tag.appendChild(password_data)
root.appendChild(password_tag)
xml_auth_data = auth_data.toprettyxml(indent=" ")
req_headers = {'Content-Type': 'application/xml'}
sess = requests.Session()
log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url)
req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers)
log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies)
if parts.username and parts.password:
if parts.port is None:
url = urlparse.urlunsplit((
parts.scheme,
parts.hostname,
parts.path,
parts.query,
parts.fragment
))
else:
url = urlparse.urlunsplit((
parts.scheme,
parts.hostname,
parts.port,
parts.path,
parts.query
))
self._set_source_config(harvest_job.source.config)
try:
log.debug('Gather stage. Contacting the geoserver, URL is %s', url)
res = sess.get(url)
log.debug('Gather stage. Geoserver contacted, result is %s', res)
self._setup_csw_client(sess.get(url))
#self._setup_csw_client(url)
except Exception, e:
self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job)
return None
query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\
filter(HarvestObject.current==True).\
filter(HarvestObject.harvest_source_id==harvest_job.source.id)
guid_to_package_id = {}
for guid, package_id in query:
guid_to_package_id[guid] = package_id
guids_in_db = set(guid_to_package_id.keys())
# extract cql filter if any
cql = self.source_config.get('cql')
log.debug('Starting gathering for %s' % url)
guids_in_harvest = set()
try:
for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql):
try:
log.info('Got identifier %s from the CSW', identifier)
if identifier is None:
log.error('CSW returned identifier %r, skipping...' % identifier)
continue
guids_in_harvest.add(identifier)
except Exception, e:
self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job)
continue
except Exception, e:
log.error('Exception: %s' % text_traceback())
self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job)
return None
new = guids_in_harvest - guids_in_db
delete = guids_in_db - guids_in_harvest
change = guids_in_db & guids_in_harvest
ids = []
for guid in new:
obj = HarvestObject(guid=guid, job=harvest_job,
extras=[HOExtra(key='status', value='new')])
obj.save()
ids.append(obj.id)
for guid in change:
obj = HarvestObject(guid=guid, job=harvest_job,
package_id=guid_to_package_id[guid],
extras=[HOExtra(key='status', value='change')])
obj.save()
ids.append(obj.id)
for guid in delete:
obj = HarvestObject(guid=guid, job=harvest_job,
package_id=guid_to_package_id[guid],
extras=[HOExtra(key='status', value='delete')])
model.Session.query(HarvestObject).\
filter_by(guid=guid).\
update({'current': False}, False)
obj.save()
ids.append(obj.id)
if len(ids) == 0:
self._save_gather_error('No records received from the CSW server', harvest_job)
return None
return ids
def fetch_stage(self,harvest_object):
# Check harvest object status
status = self._get_object_extra(harvest_object, 'status')
if status == 'delete':
# No need to fetch anything, just pass to the import stage
return True
log = logging.getLogger(__name__ + '.CSW.fetch')
log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id)
url = harvest_object.source.url
parts = urlparse.urlparse(url)
if parts.username and parts.password:
if parts.port is None:
url = urlparse.urlunparse((
parts.scheme,
parts.hostname,
parts.path,
None,
None,
None
))
else:
url = urlparse.urlunparse((
parts.scheme,
parts.hostname,
parts.port,
parts.path,
None,
None
))
else:
url = urlparse.urlunparse((
parts.scheme,
parts.netloc,
parts.path,
None,
None,
None
))
try:
self._setup_csw_client(url)
log.debug('Fetch stage. Contacting the geoserver, URL is %s', url)
except Exception, e:
self._save_object_error('Error contacting the CSW server: %s' % e,
harvest_object)
return False
identifier = harvest_object.guid
try:
record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema())
except Exception, e:
self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object)
return False
if record is None:
self._save_object_error('Empty record for GUID %s' % identifier,
harvest_object)
return False
try:
# Save the fetch contents in the HarvestObject
# Contents come from csw_client already declared and encoded as utf-8
# Remove original XML declaration
content = re.sub('<\?xml(.*)\?>', '', record['xml'])
harvest_object.content = content.strip()
harvest_object.save()
except Exception,e:
self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \
(identifier, e), harvest_object)
return False
log.debug('XML content saved (len %s)', len(record['xml']))
return True
def _setup_csw_client(self, url):
self.csw = CswService(url)