From 647a2d830ea3ca41289850654cef506816730275 Mon Sep 17 00:00:00 2001 From: Andrea Dell'Amico Date: Fri, 1 Apr 2016 02:19:26 +0200 Subject: [PATCH] library/roles/ckan: Try new modifications to make the geonetwork authentication work. --- ckan/files/base.py | 68 +++-- ckan/files/ckanharvester.py | 574 ++++++++++++++++++++++++++++++++++++ ckan/files/csw.py | 313 ++++++++++++++++++++ ckan/tasks/main.yml | 10 + 4 files changed, 942 insertions(+), 23 deletions(-) create mode 100644 ckan/files/ckanharvester.py create mode 100644 ckan/files/csw.py diff --git a/ckan/files/base.py b/ckan/files/base.py index 5e59a2e..5fcf93a 100644 --- a/ckan/files/base.py +++ b/ckan/files/base.py @@ -2,10 +2,11 @@ import re import cgitb import warnings import urllib2 +import requests import sys import logging from string import Template -from urlparse import urlparse +import urlparse from datetime import datetime import uuid import hashlib @@ -15,8 +16,8 @@ import mimetypes from pylons import config from owslib import wms -import requests -from lxml import etree +import lxml +from xml.dom.minidom import Document from ckan import plugins as p from ckan import model @@ -765,17 +766,28 @@ class SpatialHarvester(HarvesterBase): DEPRECATED: Use _get_content_as_unicode instead ''' - parts = urlparse.urlparse(url) + parts = urlparse.urlsplit(url) if parts.username and parts.password: - auth_url = url.rsplit('/', 1)[0] - auth_url = auth_url + '/xml.user.login' - auth_url = urlparse.urlunparse(( - parts.scheme, - parts.netloc, - parts.path - )) - log.error('Authenticate agains Geonetwork. User is %s and password is %s', parts.username, parts.password) - auth_data = minidom.Document() + url_path = parts.path.rsplit('/', 1)[0] + url_path = url_path + '/xml.user.login' + if parts.port is None: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + url_path, + parts.query, + parts.fragment + )) + else: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.port, + url_path, + parts.query + )) + log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) + auth_data = Document() root = auth_data.createElement('request') auth_data.appendChild(root) username_tag = auth_data.createElement('username') @@ -793,7 +805,6 @@ class SpatialHarvester(HarvesterBase): sess = requests.Session() req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers) opener = urllib2.build_opener() - opener.addheaders.append(('Set-Cookie', req.cookie)) url = url.replace(' ', '%20') if opener: @@ -817,17 +828,28 @@ class SpatialHarvester(HarvesterBase): [1] http://github.com/kennethreitz/requests/blob/63243b1e3b435c7736acf1e51c0f6fa6666d861d/requests/models.py#L811 ''' - parts = urlparse.urlparse(url) + parts = urlparse.urlsplit(url) if parts.username and parts.password: - auth_url = url.rsplit('/', 1)[0] - auth_url = auth_url + '/xml.user.login' - auth_url = urlparse.urlunparse(( - parts.scheme, - parts.netloc, - parts.path - )) + url_path = parts.path.rsplit('/', 1)[0] + url_path = url_path + '/xml.user.login' + if parts.port is None: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + url_path, + parts.query, + parts.fragment + )) + else: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.port, + url_path, + parts.query + )) log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) - auth_data = minidom.Document() + auth_data = Document() root = auth_data.createElement('request') auth_data.appendChild(root) username_tag = auth_data.createElement('username') diff --git a/ckan/files/ckanharvester.py b/ckan/files/ckanharvester.py new file mode 100644 index 0000000..ff1f2fe --- /dev/null +++ b/ckan/files/ckanharvester.py @@ -0,0 +1,574 @@ +import urllib +import urllib2 +import urlparse +import httplib +import datetime +import socket + +from sqlalchemy import exists + +from ckan.lib.base import c +from ckan import model +from ckan.logic import ValidationError, NotFound, get_action +from ckan.lib.helpers import json +from ckan.lib.munge import munge_name +from ckan.plugins import toolkit + +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError + +import logging +log = logging.getLogger(__name__) + +from base import HarvesterBase + + +class CKANHarvester(HarvesterBase): + ''' + A Harvester for CKAN instances + ''' + config = None + + api_version = 2 + action_api_version = 3 + + def _get_action_api_offset(self): + return '/api/%d/action' % self.action_api_version + + def _get_search_api_offset(self): + return '%s/package_search' % self._get_action_api_offset() + + def _get_content(self, url): + http_request = urllib2.Request(url=url) + + api_key = self.config.get('api_key') + if api_key: + http_request.add_header('Authorization', api_key) + + try: + http_response = urllib2.urlopen(http_request) + except urllib2.HTTPError, e: + if e.getcode() == 404: + raise ContentNotFoundError('HTTP error: %s' % e.code) + else: + raise ContentFetchError('HTTP error: %s' % e.code) + except urllib2.URLError, e: + raise ContentFetchError('URL error: %s' % e.reason) + except httplib.HTTPException, e: + raise ContentFetchError('HTTP Exception: %s' % e) + except socket.error, e: + raise ContentFetchError('HTTP socket error: %s' % e) + except Exception, e: + raise ContentFetchError('HTTP general exception: %s' % e) + return http_response.read() + + def _get_group(self, base_url, group_name): + url = base_url + self._get_action_api_offset() + '/group_show?id=' + \ + munge_name(group_name) + try: + content = self._get_content(url) + return json.loads(content) + except (ContentFetchError, ValueError): + log.debug('Could not fetch/decode remote group') + raise RemoteResourceError('Could not fetch/decode remote group') + + def _get_organization(self, base_url, org_name): + url = base_url + self._get_action_api_offset() + \ + '/organization_show?id=' + org_name + try: + content = self._get_content(url) + content_dict = json.loads(content) + return content_dict['result'] + except (ContentFetchError, ValueError, KeyError): + log.debug('Could not fetch/decode remote group') + raise RemoteResourceError( + 'Could not fetch/decode remote organization') + + def _set_config(self, config_str): + if config_str: + self.config = json.loads(config_str) + if 'api_version' in self.config: + self.api_version = int(self.config['api_version']) + + log.debug('Using config: %r', self.config) + else: + self.config = {} + + def info(self): + return { + 'name': 'ckan', + 'title': 'CKAN', + 'description': 'Harvests remote CKAN instances', + 'form_config_interface': 'Text' + } + + def validate_config(self, config): + if not config: + return config + + try: + config_obj = json.loads(config) + + if 'api_version' in config_obj: + try: + int(config_obj['api_version']) + except ValueError: + raise ValueError('api_version must be an integer') + + if 'default_tags' in config_obj: + if not isinstance(config_obj['default_tags'], list): + raise ValueError('default_tags must be a list') + + if 'default_groups' in config_obj: + if not isinstance(config_obj['default_groups'], list): + raise ValueError('default_groups must be a list') + + # Check if default groups exist + context = {'model': model, 'user': c.user} + for group_name in config_obj['default_groups']: + try: + group = get_action('group_show')( + context, {'id': group_name}) + except NotFound, e: + raise ValueError('Default group not found') + + if 'default_extras' in config_obj: + if not isinstance(config_obj['default_extras'], dict): + raise ValueError('default_extras must be a dictionary') + + if 'user' in config_obj: + # Check if user exists + context = {'model': model, 'user': c.user} + try: + user = get_action('user_show')( + context, {'id': config_obj.get('user')}) + except NotFound: + raise ValueError('User not found') + + for key in ('read_only', 'force_all'): + if key in config_obj: + if not isinstance(config_obj[key], bool): + raise ValueError('%s must be boolean' % key) + + except ValueError, e: + raise e + + return config + + def gather_stage(self, harvest_job): + log.debug('In CKANHarvester gather_stage (%s)', + harvest_job.source.url) + toolkit.requires_ckan_version(min_version='2.0') + get_all_packages = True + + self._set_config(harvest_job.source.config) + + # URL cleanup + parts = urlparse.urlsplit(harvest_job.source.url) + if parts.username and parts.password: + if parts.port is None: + cleaned_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + url_path, + parts.query, + parts.fragment + )) + else: + cleaned_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.port, + url_path, + parts.query + )) + + # Get source URL + remote_ckan_base_url = cleaned_url.rstrip('/') + + # Filter in/out datasets from particular organizations + fq_terms = [] + org_filter_include = self.config.get('organizations_filter_include', []) + org_filter_exclude = self.config.get('organizations_filter_exclude', []) + if org_filter_include: + fq_terms.append(' OR '.join( + 'organization:%s' % org_name for org_name in org_filter_include)) + elif org_filter_exclude: + fq_terms.extend( + '-organization:%s' % org_name for org_name in org_filter_exclude) + + # Ideally we can request from the remote CKAN only those datasets + # modified since the last completely successful harvest. + last_error_free_job = self._last_error_free_job(harvest_job) + log.debug('Last error-free job: %r', last_error_free_job) + if (last_error_free_job and + not self.config.get('force_all', False)): + get_all_packages = False + + # Request only the datasets modified since + last_time = last_error_free_job.gather_started + # Note: SOLR works in UTC, and gather_started is also UTC, so + # this should work as long as local and remote clocks are + # relatively accurate. Going back a little earlier, just in case. + get_changes_since = \ + (last_time - datetime.timedelta(hours=1)).isoformat() + log.info('Searching for datasets modified since: %s UTC', + get_changes_since) + + fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \ + .format(since=get_changes_since) + + try: + pkg_dicts = self._search_for_datasets( + remote_ckan_base_url, + fq_terms + [fq_since_last_time]) + except SearchError, e: + log.info('Searching for datasets changed since last time ' + 'gave an error: %s', e) + get_all_packages = True + + if not get_all_packages and not pkg_dicts: + log.info('No datasets have been updated on the remote ' + 'CKAN instance since the last harvest job %s', + last_time) + return None + + # Fall-back option - request all the datasets from the remote CKAN + if get_all_packages: + # Request all remote packages + try: + pkg_dicts = self._search_for_datasets(remote_ckan_base_url, + fq_terms) + except SearchError, e: + log.info('Searching for all datasets gave an error: %s', e) + self._save_gather_error( + 'Unable to search remote CKAN for datasets:%s url:%s' + 'terms:%s' % (e, remote_ckan_base_url, fq_terms), + harvest_job) + return None + if not pkg_dicts: + self._save_gather_error( + 'No datasets found at CKAN: %s' % remote_ckan_base_url, + harvest_job) + return None + + # Create harvest objects for each dataset + try: + package_ids = set() + object_ids = [] + for pkg_dict in pkg_dicts: + if pkg_dict['id'] in package_ids: + log.info('Discarding duplicate dataset %s - probably due ' + 'to datasets being changed at the same time as ' + 'when the harvester was paging through', + pkg_dict['id']) + continue + package_ids.add(pkg_dict['id']) + + log.debug('Creating HarvestObject for %s %s', + pkg_dict['name'], pkg_dict['id']) + obj = HarvestObject(guid=pkg_dict['id'], + job=harvest_job, + content=json.dumps(pkg_dict)) + obj.save() + object_ids.append(obj.id) + + return object_ids + except Exception, e: + self._save_gather_error('%r' % e.message, harvest_job) + + def _search_for_datasets(self, remote_ckan_base_url, fq_terms=None): + '''Does a dataset search on a remote CKAN and returns the results. + + Deals with paging to return all the results, not just the first page. + ''' + base_search_url = remote_ckan_base_url + self._get_search_api_offset() + params = {'rows': '100', 'start': '0'} + # There is the worry that datasets will be changed whilst we are paging + # through them. + # * In SOLR 4.7 there is a cursor, but not using that yet + # because few CKANs are running that version yet. + # * However we sort, then new names added or removed before the current + # page would cause existing names on the next page to be missed or + # double counted. + # * Another approach might be to sort by metadata_modified and always + # ask for changes since (and including) the date of the last item of + # the day before. However if the entire page is of the exact same + # time, then you end up in an infinite loop asking for the same page. + # * We choose a balanced approach of sorting by ID, which means + # datasets are only missed if some are removed, which is far less + # likely than any being added. If some are missed then it is assumed + # they will harvested the next time anyway. When datasets are added, + # we are at risk of seeing datasets twice in the paging, so we detect + # and remove any duplicates. + params['sort'] = 'id asc' + if fq_terms: + params['fq'] = ' '.join(fq_terms) + + pkg_dicts = [] + pkg_ids = set() + previous_content = None + while True: + url = base_search_url + '?' + urllib.urlencode(params) + log.debug('Searching for CKAN datasets: %s', url) + try: + content = self._get_content(url) + except ContentFetchError, e: + raise SearchError( + 'Error sending request to search remote ' + 'CKAN instance %s using URL %r. Error: %s' % + (remote_ckan_base_url, url, e)) + + if previous_content and content == previous_content: + raise SearchError('The paging doesn\'t seem to work. URL: %s' % + url) + try: + response_dict = json.loads(content) + except ValueError: + raise SearchError('Response from remote CKAN was not JSON: %r' + % content) + try: + pkg_dicts_page = response_dict.get('result', {}).get('results', + []) + except ValueError: + raise SearchError('Response JSON did not contain ' + 'result/results: %r' % response_dict) + + # Weed out any datasets found on previous pages (should datasets be + # changing while we page) + ids_in_page = set(p['id'] for p in pkg_dicts_page) + duplicate_ids = ids_in_page & pkg_ids + if duplicate_ids: + pkg_dicts_page = [p for p in pkg_dicts_page + if p['id'] not in duplicate_ids] + pkg_ids |= ids_in_page + + pkg_dicts.extend(pkg_dicts_page) + + if len(pkg_dicts_page) == 0: + break + + params['start'] = str(int(params['start']) + int(params['rows'])) + + return pkg_dicts + + @classmethod + def _last_error_free_job(cls, harvest_job): + # TODO weed out cancelled jobs somehow. + # look for jobs with no gather errors + jobs = \ + model.Session.query(HarvestJob) \ + .filter(HarvestJob.source == harvest_job.source) \ + .filter(HarvestJob.gather_started != None) \ + .filter(HarvestJob.status == 'Finished') \ + .filter(HarvestJob.id != harvest_job.id) \ + .filter( + ~exists().where( + HarvestGatherError.harvest_job_id == HarvestJob.id)) \ + .order_by(HarvestJob.gather_started.desc()) + # now check them until we find one with no fetch/import errors + # (looping rather than doing sql, in case there are lots of objects + # and lots of jobs) + for job in jobs: + for obj in job.objects: + if obj.current is False and \ + obj.report_status != 'not modified': + # unsuccessful, so go onto the next job + break + else: + return job + + def fetch_stage(self, harvest_object): + # Nothing to do here - we got the package dict in the search in the + # gather stage + return True + + def import_stage(self, harvest_object): + log.debug('In CKANHarvester import_stage') + + context = {'model': model, 'session': model.Session, + 'user': self._get_user_name()} + if not harvest_object: + log.error('No harvest object received') + return False + + if harvest_object.content is None: + self._save_object_error('Empty content for object %s' % + harvest_object.id, + harvest_object, 'Import') + return False + + self._set_config(harvest_object.job.source.config) + + try: + package_dict = json.loads(harvest_object.content) + + if package_dict.get('type') == 'harvest': + log.warn('Remote dataset is a harvest source, ignoring...') + return True + + # Set default tags if needed + default_tags = self.config.get('default_tags', []) + if default_tags: + if not 'tags' in package_dict: + package_dict['tags'] = [] + package_dict['tags'].extend( + [t for t in default_tags if t not in package_dict['tags']]) + + remote_groups = self.config.get('remote_groups', None) + if not remote_groups in ('only_local', 'create'): + # Ignore remote groups + package_dict.pop('groups', None) + else: + if not 'groups' in package_dict: + package_dict['groups'] = [] + + # check if remote groups exist locally, otherwise remove + validated_groups = [] + + for group_name in package_dict['groups']: + try: + data_dict = {'id': group_name} + group = get_action('group_show')(context, data_dict) + if self.api_version == 1: + validated_groups.append(group['name']) + else: + validated_groups.append(group['id']) + except NotFound, e: + log.info('Group %s is not available', group_name) + if remote_groups == 'create': + try: + group = self._get_group(harvest_object.source.url, group_name) + except RemoteResourceError: + log.error('Could not get remote group %s', group_name) + continue + + for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name']: + group.pop(key, None) + + get_action('group_create')(context, group) + log.info('Group %s has been newly created', group_name) + if self.api_version == 1: + validated_groups.append(group['name']) + else: + validated_groups.append(group['id']) + + package_dict['groups'] = validated_groups + + + # Local harvest source organization + source_dataset = get_action('package_show')(context, {'id': harvest_object.source.id}) + local_org = source_dataset.get('owner_org') + + remote_orgs = self.config.get('remote_orgs', None) + + if not remote_orgs in ('only_local', 'create'): + # Assign dataset to the source organization + package_dict['owner_org'] = local_org + else: + if not 'owner_org' in package_dict: + package_dict['owner_org'] = None + + # check if remote org exist locally, otherwise remove + validated_org = None + remote_org = package_dict['owner_org'] + + if remote_org: + try: + data_dict = {'id': remote_org} + org = get_action('organization_show')(context, data_dict) + validated_org = org['id'] + except NotFound, e: + log.info('Organization %s is not available', remote_org) + if remote_orgs == 'create': + try: + try: + org = self._get_organization(harvest_object.source.url, remote_org) + except RemoteResourceError: + # fallback if remote CKAN exposes organizations as groups + # this especially targets older versions of CKAN + org = self._get_group(harvest_object.source.url, remote_org) + + for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']: + org.pop(key, None) + get_action('organization_create')(context, org) + log.info('Organization %s has been newly created', remote_org) + validated_org = org['id'] + except (RemoteResourceError, ValidationError): + log.error('Could not get remote org %s', remote_org) + + package_dict['owner_org'] = validated_org or local_org + + # Set default groups if needed + default_groups = self.config.get('default_groups', []) + if default_groups: + if not 'groups' in package_dict: + package_dict['groups'] = [] + package_dict['groups'].extend( + [g for g in default_groups + if g not in package_dict['groups']]) + + # Set default extras if needed + default_extras = self.config.get('default_extras', {}) + def get_extra(key, package_dict): + for extra in package_dict.get('extras', []): + if extra['key'] == key: + return extra + if default_extras: + override_extras = self.config.get('override_extras', False) + if not 'extras' in package_dict: + package_dict['extras'] = {} + for key, value in default_extras.iteritems(): + existing_extra = get_extra(key, package_dict) + if existing_extra and not override_extras: + continue # no need for the default + if existing_extra: + package_dict['extras'].remove(existing_extra) + # Look for replacement strings + if isinstance(value, basestring): + value = value.format( + harvest_source_id=harvest_object.job.source.id, + harvest_source_url= + harvest_object.job.source.url.strip('/'), + harvest_source_title= + harvest_object.job.source.title, + harvest_job_id=harvest_object.job.id, + harvest_object_id=harvest_object.id, + dataset_id=package_dict['id']) + + package_dict['extras'].append({'key': key, 'value': value}) + + for resource in package_dict.get('resources', []): + # Clear remote url_type for resources (eg datastore, upload) as + # we are only creating normal resources with links to the + # remote ones + resource.pop('url_type', None) + + # Clear revision_id as the revision won't exist on this CKAN + # and saving it will cause an IntegrityError with the foreign + # key. + resource.pop('revision_id', None) + + result = self._create_or_update_package( + package_dict, harvest_object, package_dict_form='package_show') + + return result + except ValidationError, e: + self._save_object_error('Invalid package with GUID %s: %r' % + (harvest_object.guid, e.error_dict), + harvest_object, 'Import') + except Exception, e: + self._save_object_error('%s' % e, harvest_object, 'Import') + + +class ContentFetchError(Exception): + pass + +class ContentNotFoundError(ContentFetchError): + pass + +class RemoteResourceError(Exception): + pass + + +class SearchError(Exception): + pass diff --git a/ckan/files/csw.py b/ckan/files/csw.py new file mode 100644 index 0000000..cf329d9 --- /dev/null +++ b/ckan/files/csw.py @@ -0,0 +1,313 @@ +import re +import urllib +import urllib2 +import requests +import urlparse +import lxml +from xml.dom.minidom import Document + +import logging + +from ckan import model + +from ckan.plugins.core import SingletonPlugin, implements + +from ckanext.harvest.interfaces import IHarvester +from ckanext.harvest.model import HarvestObject +from ckanext.harvest.model import HarvestObjectExtra as HOExtra + +from ckanext.spatial.lib.csw_client import CswService +from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback + + +class CSWHarvester(SpatialHarvester, SingletonPlugin): + ''' + A Harvester for CSW servers + ''' + implements(IHarvester) + + csw=None + + def info(self): + return { + 'name': 'csw', + 'title': 'CSW Server', + 'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard' + } + + + def get_original_url(self, harvest_object_id): + obj = model.Session.query(HarvestObject).\ + filter(HarvestObject.id==harvest_object_id).\ + first() + + parts = urlparse.urlparse(obj.source.url) + + params = { + 'SERVICE': 'CSW', + 'VERSION': '2.0.2', + 'REQUEST': 'GetRecordById', + 'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd', + 'OUTPUTFORMAT':'application/xml' , + 'ID': obj.guid + } + + if parts.username and parts.password: + if parts.port is None: + url = urlparse.urlunparse(( + parts.scheme, + parts.hostname, + parts.path, + None, + urllib.urlencode(params), + None + )) + else: + url = urlparse.urlunparse(( + parts.scheme, + parts.hostname, + parts.port, + parts.path, + urllib.urlencode(params), + None + )) + else: + url = urlparse.urlunparse(( + parts.scheme, + parts.netloc, + parts.path, + None, + urllib.urlencode(params), + None + )) + + return url + + def output_schema(self): + return 'gmd' + + def gather_stage(self, harvest_job): + log = logging.getLogger(__name__ + '.CSW.gather') + log.debug('CswHarvester gather_stage for job: %r', harvest_job) + # Get source URL + url = harvest_job.source.url + + parts = urlparse.urlsplit(url) + if parts.username and parts.password: + url_path = parts.path.rsplit('/', 1)[0] + url_path = url_path + '/xml.user.login' + if parts.port is None: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + url_path, + parts.query, + parts.fragment + )) + else: + auth_url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.port, + url_path, + parts.query + )) + log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) + auth_data = Document() + root = auth_data.createElement('request') + auth_data.appendChild(root) + username_tag = auth_data.createElement('username') + user_data = auth_data.createTextNode(parts.username) + username_tag.appendChild(user_data) + root.appendChild(username_tag) + password_tag = auth_data.createElement('password') + password_data = auth_data.createTextNode(parts.password) + password_tag.appendChild(password_data) + root.appendChild(password_tag) + xml_auth_data = auth_data.toprettyxml(indent=" ") + + req_headers = {'Content-Type': 'application/xml'} + + sess = requests.Session() + log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url) + req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers) + log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies) + + if parts.username and parts.password: + if parts.port is None: + url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.path, + parts.query, + parts.fragment + )) + else: + url = urlparse.urlunsplit(( + parts.scheme, + parts.hostname, + parts.port, + parts.path, + parts.query + )) + + self._set_source_config(harvest_job.source.config) + + try: + log.debug('Gather stage. Contacting the geoserver, URL is %s', url) + res = sess.get(url) + log.debug('Gather stage. Geoserver contacted, result is %s', res) + self._setup_csw_client(sess.get(url)) + #self._setup_csw_client(url) + except Exception, e: + self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job) + return None + + query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\ + filter(HarvestObject.current==True).\ + filter(HarvestObject.harvest_source_id==harvest_job.source.id) + guid_to_package_id = {} + + for guid, package_id in query: + guid_to_package_id[guid] = package_id + + guids_in_db = set(guid_to_package_id.keys()) + + # extract cql filter if any + cql = self.source_config.get('cql') + + log.debug('Starting gathering for %s' % url) + guids_in_harvest = set() + try: + for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql): + try: + log.info('Got identifier %s from the CSW', identifier) + if identifier is None: + log.error('CSW returned identifier %r, skipping...' % identifier) + continue + + guids_in_harvest.add(identifier) + except Exception, e: + self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job) + continue + + + except Exception, e: + log.error('Exception: %s' % text_traceback()) + self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job) + return None + + new = guids_in_harvest - guids_in_db + delete = guids_in_db - guids_in_harvest + change = guids_in_db & guids_in_harvest + + ids = [] + for guid in new: + obj = HarvestObject(guid=guid, job=harvest_job, + extras=[HOExtra(key='status', value='new')]) + obj.save() + ids.append(obj.id) + for guid in change: + obj = HarvestObject(guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + extras=[HOExtra(key='status', value='change')]) + obj.save() + ids.append(obj.id) + for guid in delete: + obj = HarvestObject(guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + extras=[HOExtra(key='status', value='delete')]) + model.Session.query(HarvestObject).\ + filter_by(guid=guid).\ + update({'current': False}, False) + obj.save() + ids.append(obj.id) + + if len(ids) == 0: + self._save_gather_error('No records received from the CSW server', harvest_job) + return None + + return ids + + def fetch_stage(self,harvest_object): + + # Check harvest object status + status = self._get_object_extra(harvest_object, 'status') + + if status == 'delete': + # No need to fetch anything, just pass to the import stage + return True + + log = logging.getLogger(__name__ + '.CSW.fetch') + log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id) + + url = harvest_object.source.url + parts = urlparse.urlparse(url) + if parts.username and parts.password: + if parts.port is None: + url = urlparse.urlunparse(( + parts.scheme, + parts.hostname, + parts.path, + None, + None, + None + )) + else: + url = urlparse.urlunparse(( + parts.scheme, + parts.hostname, + parts.port, + parts.path, + None, + None + )) + else: + url = urlparse.urlunparse(( + parts.scheme, + parts.netloc, + parts.path, + None, + None, + None + )) + + try: + self._setup_csw_client(url) + log.debug('Fetch stage. Contacting the geoserver, URL is %s', url) + except Exception, e: + self._save_object_error('Error contacting the CSW server: %s' % e, + harvest_object) + return False + + identifier = harvest_object.guid + try: + record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema()) + except Exception, e: + self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object) + return False + + if record is None: + self._save_object_error('Empty record for GUID %s' % identifier, + harvest_object) + return False + + try: + # Save the fetch contents in the HarvestObject + # Contents come from csw_client already declared and encoded as utf-8 + # Remove original XML declaration + content = re.sub('<\?xml(.*)\?>', '', record['xml']) + + harvest_object.content = content.strip() + harvest_object.save() + except Exception,e: + self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \ + (identifier, e), harvest_object) + return False + + log.debug('XML content saved (len %s)', len(record['xml'])) + return True + + def _setup_csw_client(self, url): + self.csw = CswService(url) + diff --git a/ckan/tasks/main.yml b/ckan/tasks/main.yml index 2ccec2a..8aad47d 100644 --- a/ckan/tasks/main.yml +++ b/ckan/tasks/main.yml @@ -164,6 +164,16 @@ notify: Restart CKAN tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_geo_auth' ] +- name: Overwrite the ckanharvester.py ckanext plugin file to remove the authentication data from URLs + copy: src=ckanharvester.py dest=/usr/lib/ckan/default/src/ckanext-harvest/ckanext/harvest/harvesters/ckanharvester.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes + notify: Restart CKAN + tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanharvester' ] + +- name: Overwrite the csw.py ckanext-spatial plugin file to remove the authentication data from URLs + copy: src=csw.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/csw.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes + notify: Restart CKAN + tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanext_spatial' ] + - name: Restart apache service: name=apache state=restarted enabled=yes when: ( ckan_install | changed )