import re import urllib import urllib2 import requests import urlparse import lxml from xml.dom.minidom import Document import logging from ckan import model from ckan.plugins.core import SingletonPlugin, implements from ckanext.harvest.interfaces import IHarvester from ckanext.harvest.model import HarvestObject from ckanext.harvest.model import HarvestObjectExtra as HOExtra from ckanext.spatial.lib.csw_client import CswService from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback class CSWHarvester(SpatialHarvester, SingletonPlugin): ''' A Harvester for CSW servers ''' implements(IHarvester) csw=None def info(self): return { 'name': 'csw', 'title': 'CSW Server', 'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard' } def get_original_url(self, harvest_object_id): obj = model.Session.query(HarvestObject).\ filter(HarvestObject.id==harvest_object_id).\ first() parts = urlparse.urlparse(obj.source.url) params = { 'SERVICE': 'CSW', 'VERSION': '2.0.2', 'REQUEST': 'GetRecordById', 'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd', 'OUTPUTFORMAT':'application/xml' , 'ID': obj.guid } if parts.username and parts.password: if parts.port is None: url = urlparse.urlunparse(( parts.scheme, parts.hostname, parts.path, None, urllib.urlencode(params), None )) else: url = urlparse.urlunparse(( parts.scheme, parts.hostname, parts.port, parts.path, urllib.urlencode(params), None )) else: url = urlparse.urlunparse(( parts.scheme, parts.netloc, parts.path, None, urllib.urlencode(params), None )) return url def output_schema(self): return 'gmd' def gather_stage(self, harvest_job): log = logging.getLogger(__name__ + '.CSW.gather') log.debug('CswHarvester gather_stage for job: %r', harvest_job) # Get source URL url = harvest_job.source.url parts = urlparse.urlsplit(url) if parts.username and parts.password: url_path = parts.path.rsplit('/', 1)[0] url_path = url_path + '/xml.user.login' if parts.port is None: auth_url = urlparse.urlunsplit(( parts.scheme, parts.hostname, url_path, parts.query, parts.fragment )) else: auth_url = urlparse.urlunsplit(( parts.scheme, parts.hostname, parts.port, url_path, parts.query )) log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) auth_data = Document() root = auth_data.createElement('request') auth_data.appendChild(root) username_tag = auth_data.createElement('username') user_data = auth_data.createTextNode(parts.username) username_tag.appendChild(user_data) root.appendChild(username_tag) password_tag = auth_data.createElement('password') password_data = auth_data.createTextNode(parts.password) password_tag.appendChild(password_data) root.appendChild(password_tag) xml_auth_data = auth_data.toprettyxml(indent=" ") req_headers = {'Content-Type': 'application/xml'} sess = requests.Session() log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url) req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers) log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies) if parts.username and parts.password: if parts.port is None: url = urlparse.urlunsplit(( parts.scheme, parts.hostname, parts.path, parts.query, parts.fragment )) else: url = urlparse.urlunsplit(( parts.scheme, parts.hostname, parts.port, parts.path, parts.query )) self._set_source_config(harvest_job.source.config) try: log.debug('Gather stage. Contacting the geoserver, URL is %s', url) res = sess.get(url) log.debug('Gather stage. Geoserver contacted, result is %s', res) self._setup_csw_client(sess.get(url)) #self._setup_csw_client(url) except Exception, e: self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job) return None query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\ filter(HarvestObject.current==True).\ filter(HarvestObject.harvest_source_id==harvest_job.source.id) guid_to_package_id = {} for guid, package_id in query: guid_to_package_id[guid] = package_id guids_in_db = set(guid_to_package_id.keys()) # extract cql filter if any cql = self.source_config.get('cql') log.debug('Starting gathering for %s' % url) guids_in_harvest = set() try: for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql): try: log.info('Got identifier %s from the CSW', identifier) if identifier is None: log.error('CSW returned identifier %r, skipping...' % identifier) continue guids_in_harvest.add(identifier) except Exception, e: self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job) continue except Exception, e: log.error('Exception: %s' % text_traceback()) self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job) return None new = guids_in_harvest - guids_in_db delete = guids_in_db - guids_in_harvest change = guids_in_db & guids_in_harvest ids = [] for guid in new: obj = HarvestObject(guid=guid, job=harvest_job, extras=[HOExtra(key='status', value='new')]) obj.save() ids.append(obj.id) for guid in change: obj = HarvestObject(guid=guid, job=harvest_job, package_id=guid_to_package_id[guid], extras=[HOExtra(key='status', value='change')]) obj.save() ids.append(obj.id) for guid in delete: obj = HarvestObject(guid=guid, job=harvest_job, package_id=guid_to_package_id[guid], extras=[HOExtra(key='status', value='delete')]) model.Session.query(HarvestObject).\ filter_by(guid=guid).\ update({'current': False}, False) obj.save() ids.append(obj.id) if len(ids) == 0: self._save_gather_error('No records received from the CSW server', harvest_job) return None return ids def fetch_stage(self,harvest_object): # Check harvest object status status = self._get_object_extra(harvest_object, 'status') if status == 'delete': # No need to fetch anything, just pass to the import stage return True log = logging.getLogger(__name__ + '.CSW.fetch') log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id) url = harvest_object.source.url parts = urlparse.urlparse(url) if parts.username and parts.password: if parts.port is None: url = urlparse.urlunparse(( parts.scheme, parts.hostname, parts.path, None, None, None )) else: url = urlparse.urlunparse(( parts.scheme, parts.hostname, parts.port, parts.path, None, None )) else: url = urlparse.urlunparse(( parts.scheme, parts.netloc, parts.path, None, None, None )) try: self._setup_csw_client(url) log.debug('Fetch stage. Contacting the geoserver, URL is %s', url) except Exception, e: self._save_object_error('Error contacting the CSW server: %s' % e, harvest_object) return False identifier = harvest_object.guid try: record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema()) except Exception, e: self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object) return False if record is None: self._save_object_error('Empty record for GUID %s' % identifier, harvest_object) return False try: # Save the fetch contents in the HarvestObject # Contents come from csw_client already declared and encoded as utf-8 # Remove original XML declaration content = re.sub('<\?xml(.*)\?>', '', record['xml']) harvest_object.content = content.strip() harvest_object.save() except Exception,e: self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \ (identifier, e), harvest_object) return False log.debug('XML content saved (len %s)', len(record['xml'])) return True def _setup_csw_client(self, url): self.csw = CswService(url)