diff --git a/ckan/ckan/files/base.py b/ckan/ckan/files/base.py deleted file mode 100644 index 5fcf93a6..00000000 --- a/ckan/ckan/files/base.py +++ /dev/null @@ -1,916 +0,0 @@ -import re -import cgitb -import warnings -import urllib2 -import requests -import sys -import logging -from string import Template -import urlparse -from datetime import datetime -import uuid -import hashlib -import dateutil -import mimetypes - - -from pylons import config -from owslib import wms -import lxml -from xml.dom.minidom import Document - -from ckan import plugins as p -from ckan import model -from ckan.lib.helpers import json -from ckan import logic -from ckan.lib.navl.validators import not_empty -from ckan.lib.search.index import PackageSearchIndex - -from ckanext.harvest.harvesters.base import HarvesterBase -from ckanext.harvest.model import HarvestObject - -from ckanext.spatial.validation import Validators, all_validators -from ckanext.spatial.model import ISODocument -from ckanext.spatial.interfaces import ISpatialHarvester - -log = logging.getLogger(__name__) - -DEFAULT_VALIDATOR_PROFILES = ['iso19139'] - - -def text_traceback(): - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - res = 'the original traceback:'.join( - cgitb.text(sys.exc_info()).split('the original traceback:')[1:] - ).strip() - return res - - -def guess_standard(content): - lowered = content.lower() - if ''.lower() in lowered: - return 'iso' - if ''.lower() in lowered: - return 'iso' - if ''.lower() in lowered: - return 'fgdc' - return 'unknown' - - -def guess_resource_format(url, use_mimetypes=True): - ''' - Given a URL try to guess the best format to assign to the resource - - The function looks for common patterns in popular geospatial services and - file extensions, so it may not be 100% accurate. It just looks at the - provided URL, it does not attempt to perform any remote check. - - if 'use_mimetypes' is True (default value), the mimetypes module will be - used if no match was found before. - - Returns None if no format could be guessed. - - ''' - url = url.lower().strip() - - resource_types = { - # OGC - 'wms': ('service=wms', 'geoserver/wms', 'mapserver/wmsserver', 'com.esri.wms.Esrimap', 'service/wms'), - 'wfs': ('service=wfs', 'geoserver/wfs', 'mapserver/wfsserver', 'com.esri.wfs.Esrimap'), - 'wcs': ('service=wcs', 'geoserver/wcs', 'imageserver/wcsserver', 'mapserver/wcsserver'), - 'sos': ('service=sos',), - 'csw': ('service=csw',), - # ESRI - 'kml': ('mapserver/generatekml',), - 'arcims': ('com.esri.esrimap.esrimap',), - 'arcgis_rest': ('arcgis/rest/services',), - } - - for resource_type, parts in resource_types.iteritems(): - if any(part in url for part in parts): - return resource_type - - file_types = { - 'kml' : ('kml',), - 'kmz': ('kmz',), - 'gml': ('gml',), - } - - for file_type, extensions in file_types.iteritems(): - if any(url.endswith(extension) for extension in extensions): - return file_type - - resource_format, encoding = mimetypes.guess_type(url) - if resource_format: - return resource_format - - return None - - -class SpatialHarvester(HarvesterBase): - - _user_name = None - - _site_user = None - - source_config = {} - - force_import = False - - extent_template = Template(''' - {"type": "Polygon", "coordinates": [[[$xmin, $ymin], [$xmax, $ymin], [$xmax, $ymax], [$xmin, $ymax], [$xmin, $ymin]]]} - ''') - - ## IHarvester - - def validate_config(self, source_config): - if not source_config: - return source_config - - try: - source_config_obj = json.loads(source_config) - - if 'validator_profiles' in source_config_obj: - if not isinstance(source_config_obj['validator_profiles'], list): - raise ValueError('validator_profiles must be a list') - - # Check if all profiles exist - existing_profiles = [v.name for v in all_validators] - unknown_profiles = set(source_config_obj['validator_profiles']) - set(existing_profiles) - - if len(unknown_profiles) > 0: - raise ValueError('Unknown validation profile(s): %s' % ','.join(unknown_profiles)) - - if 'default_tags' in source_config_obj: - if not isinstance(source_config_obj['default_tags'],list): - raise ValueError('default_tags must be a list') - - if 'default_extras' in source_config_obj: - if not isinstance(source_config_obj['default_extras'],dict): - raise ValueError('default_extras must be a dictionary') - - for key in ('override_extras'): - if key in source_config_obj: - if not isinstance(source_config_obj[key],bool): - raise ValueError('%s must be boolean' % key) - - except ValueError, e: - raise e - - return source_config - - ## - - ## SpatialHarvester - - - def get_package_dict(self, iso_values, harvest_object): - ''' - Constructs a package_dict suitable to be passed to package_create or - package_update. See documentation on - ckan.logic.action.create.package_create for more details - - Extensions willing to modify the dict should do so implementing the - ISpatialHarvester interface - - import ckan.plugins as p - from ckanext.spatial.interfaces import ISpatialHarvester - - class MyHarvester(p.SingletonPlugin): - - p.implements(ISpatialHarvester, inherit=True) - - def get_package_dict(self, context, data_dict): - - package_dict = data_dict['package_dict'] - - package_dict['extras'].append( - {'key': 'my-custom-extra', 'value': 'my-custom-value'} - ) - - return package_dict - - If a dict is not returned by this function, the import stage will be cancelled. - - :param iso_values: Dictionary with parsed values from the ISO 19139 - XML document - :type iso_values: dict - :param harvest_object: HarvestObject domain object (with access to - job and source objects) - :type harvest_object: HarvestObject - - :returns: A dataset dictionary (package_dict) - :rtype: dict - ''' - - tags = [] - if 'tags' in iso_values: - for tag in iso_values['tags']: - tag = tag[:50] if len(tag) > 50 else tag - tags.append({'name': tag}) - - # Add default_tags from config - default_tags = self.source_config.get('default_tags',[]) - if default_tags: - for tag in default_tags: - tags.append({'name': tag}) - - package_dict = { - 'title': iso_values['title'], - 'notes': iso_values['abstract'], - 'tags': tags, - 'resources': [], - } - - # We need to get the owner organization (if any) from the harvest - # source dataset - source_dataset = model.Package.get(harvest_object.source.id) - if source_dataset.owner_org: - package_dict['owner_org'] = source_dataset.owner_org - - # Package name - package = harvest_object.package - if package is None or package.title != iso_values['title']: - name = self._gen_new_name(iso_values['title']) - if not name: - name = self._gen_new_name(str(iso_values['guid'])) - if not name: - raise Exception('Could not generate a unique name from the title or the GUID. Please choose a more unique title.') - package_dict['name'] = name - else: - package_dict['name'] = package.name - - extras = { - 'guid': harvest_object.guid, - 'spatial_harvester': True, - } - - # Just add some of the metadata as extras, not the whole lot - for name in [ - # Essentials - 'spatial-reference-system', - 'guid', - # Usefuls - 'dataset-reference-date', - 'metadata-language', # Language - 'metadata-date', # Released - 'coupled-resource', - 'contact-email', - 'frequency-of-update', - 'spatial-data-service-type', - ]: - extras[name] = iso_values[name] - - if len(iso_values.get('progress', [])): - extras['progress'] = iso_values['progress'][0] - else: - extras['progress'] = '' - - if len(iso_values.get('resource-type', [])): - extras['resource-type'] = iso_values['resource-type'][0] - else: - extras['resource-type'] = '' - - extras['licence'] = iso_values.get('use-constraints', '') - - def _extract_first_license_url(licences): - for licence in licences: - o = urlparse(licence) - if o.scheme and o.netloc: - return licence - return None - - if len(extras['licence']): - license_url_extracted = _extract_first_license_url(extras['licence']) - if license_url_extracted: - extras['licence_url'] = license_url_extracted - - - # Metadata license ID check for package - use_constraints = iso_values.get('use-constraints') - if use_constraints: - - context = {'model': model, 'session': model.Session, 'user': self._get_user_name()} - license_list = p.toolkit.get_action('license_list')(context, {}) - - for constraint in use_constraints: - package_license = None - - for license in license_list: - if constraint.lower() == license.get('id') or constraint == license.get('url'): - package_license = license.get('id') - break - - if package_license: - package_dict['license_id'] = package_license - break - - - extras['access_constraints'] = iso_values.get('limitations-on-public-access', '') - - # Grpahic preview - browse_graphic = iso_values.get('browse-graphic') - if browse_graphic: - browse_graphic = browse_graphic[0] - extras['graphic-preview-file'] = browse_graphic.get('file') - if browse_graphic.get('description'): - extras['graphic-preview-description'] = browse_graphic.get('description') - if browse_graphic.get('type'): - extras['graphic-preview-type'] = browse_graphic.get('type') - - - for key in ['temporal-extent-begin', 'temporal-extent-end']: - if len(iso_values[key]) > 0: - extras[key] = iso_values[key][0] - - # Save responsible organization roles - if iso_values['responsible-organisation']: - parties = {} - for party in iso_values['responsible-organisation']: - if party['organisation-name'] in parties: - if not party['role'] in parties[party['organisation-name']]: - parties[party['organisation-name']].append(party['role']) - else: - parties[party['organisation-name']] = [party['role']] - extras['responsible-party'] = [{'name': k, 'roles': v} for k, v in parties.iteritems()] - - if len(iso_values['bbox']) > 0: - bbox = iso_values['bbox'][0] - extras['bbox-east-long'] = bbox['east'] - extras['bbox-north-lat'] = bbox['north'] - extras['bbox-south-lat'] = bbox['south'] - extras['bbox-west-long'] = bbox['west'] - - try: - xmin = float(bbox['west']) - xmax = float(bbox['east']) - ymin = float(bbox['south']) - ymax = float(bbox['north']) - except ValueError, e: - self._save_object_error('Error parsing bounding box value: {0}'.format(str(e)), - harvest_object, 'Import') - else: - # Construct a GeoJSON extent so ckanext-spatial can register the extent geometry - - # Some publishers define the same two corners for the bbox (ie a point), - # that causes problems in the search if stored as polygon - if xmin == xmax or ymin == ymax: - extent_string = Template('{"type": "Point", "coordinates": [$x, $y]}').substitute( - x=xmin, y=ymin - ) - self._save_object_error('Point extent defined instead of polygon', - harvest_object, 'Import') - else: - extent_string = self.extent_template.substitute( - xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax - ) - - extras['spatial'] = extent_string.strip() - else: - log.debug('No spatial extent defined for this object') - - resource_locators = iso_values.get('resource-locator', []) +\ - iso_values.get('resource-locator-identification', []) - - if len(resource_locators): - for resource_locator in resource_locators: - url = resource_locator.get('url', '').strip() - if url: - resource = {} - resource['format'] = guess_resource_format(url) - if resource['format'] == 'wms' and config.get('ckanext.spatial.harvest.validate_wms', False): - # Check if the service is a view service - test_url = url.split('?')[0] if '?' in url else url - if self._is_wms(test_url): - resource['verified'] = True - resource['verified_date'] = datetime.now().isoformat() - - resource.update( - { - 'url': url, - 'name': resource_locator.get('name') or p.toolkit._('Unnamed resource'), - 'description': resource_locator.get('description') or '', - 'resource_locator_protocol': resource_locator.get('protocol') or '', - 'resource_locator_function': resource_locator.get('function') or '', - }) - package_dict['resources'].append(resource) - - - # Add default_extras from config - default_extras = self.source_config.get('default_extras',{}) - if default_extras: - override_extras = self.source_config.get('override_extras',False) - for key,value in default_extras.iteritems(): - log.debug('Processing extra %s', key) - if not key in extras or override_extras: - # Look for replacement strings - if isinstance(value,basestring): - value = value.format(harvest_source_id=harvest_object.job.source.id, - harvest_source_url=harvest_object.job.source.url.strip('/'), - harvest_source_title=harvest_object.job.source.title, - harvest_job_id=harvest_object.job.id, - harvest_object_id=harvest_object.id) - extras[key] = value - - extras_as_dict = [] - for key, value in extras.iteritems(): - if isinstance(value, (list, dict)): - extras_as_dict.append({'key': key, 'value': json.dumps(value)}) - else: - extras_as_dict.append({'key': key, 'value': value}) - - package_dict['extras'] = extras_as_dict - - return package_dict - - def transform_to_iso(self, original_document, original_format, harvest_object): - ''' - DEPRECATED: Use the transform_to_iso method of the ISpatialHarvester - interface - ''' - self.__base_transform_to_iso_called = True - return None - - def import_stage(self, harvest_object): - context = { - 'model': model, - 'session': model.Session, - 'user': self._get_user_name(), - } - - log = logging.getLogger(__name__ + '.import') - log.debug('Import stage for harvest object: %s', harvest_object.id) - - if not harvest_object: - log.error('No harvest object received') - return False - - self._set_source_config(harvest_object.source.config) - - if self.force_import: - status = 'change' - else: - status = self._get_object_extra(harvest_object, 'status') - - # Get the last harvested object (if any) - previous_object = model.Session.query(HarvestObject) \ - .filter(HarvestObject.guid==harvest_object.guid) \ - .filter(HarvestObject.current==True) \ - .first() - - if status == 'delete': - # Delete package - context.update({ - 'ignore_auth': True, - }) - p.toolkit.get_action('package_delete')(context, {'id': harvest_object.package_id}) - log.info('Deleted package {0} with guid {1}'.format(harvest_object.package_id, harvest_object.guid)) - - return True - - # Check if it is a non ISO document - original_document = self._get_object_extra(harvest_object, 'original_document') - original_format = self._get_object_extra(harvest_object, 'original_format') - if original_document and original_format: - #DEPRECATED use the ISpatialHarvester interface method - self.__base_transform_to_iso_called = False - content = self.transform_to_iso(original_document, original_format, harvest_object) - if not self.__base_transform_to_iso_called: - log.warn('Deprecation warning: calling transform_to_iso directly is deprecated. ' + - 'Please use the ISpatialHarvester interface method instead.') - - for harvester in p.PluginImplementations(ISpatialHarvester): - content = harvester.transform_to_iso(original_document, original_format, harvest_object) - - if content: - harvest_object.content = content - else: - self._save_object_error('Transformation to ISO failed', harvest_object, 'Import') - return False - else: - if harvest_object.content is None: - self._save_object_error('Empty content for object {0}'.format(harvest_object.id), harvest_object, 'Import') - return False - - # Validate ISO document - is_valid, profile, errors = self._validate_document(harvest_object.content, harvest_object) - if not is_valid: - # If validation errors were found, import will stop unless - # configuration per source or per instance says otherwise - continue_import = p.toolkit.asbool(config.get('ckanext.spatial.harvest.continue_on_validation_errors', False)) or \ - self.source_config.get('continue_on_validation_errors') - if not continue_import: - return False - - # Parse ISO document - try: - - iso_parser = ISODocument(harvest_object.content) - iso_values = iso_parser.read_values() - except Exception, e: - self._save_object_error('Error parsing ISO document for object {0}: {1}'.format(harvest_object.id, str(e)), - harvest_object, 'Import') - return False - - # Flag previous object as not current anymore - if previous_object and not self.force_import: - previous_object.current = False - previous_object.add() - - # Update GUID with the one on the document - iso_guid = iso_values['guid'] - if iso_guid and harvest_object.guid != iso_guid: - # First make sure there already aren't current objects - # with the same guid - existing_object = model.Session.query(HarvestObject.id) \ - .filter(HarvestObject.guid==iso_guid) \ - .filter(HarvestObject.current==True) \ - .first() - if existing_object: - self._save_object_error('Object {0} already has this guid {1}'.format(existing_object.id, iso_guid), - harvest_object, 'Import') - return False - - harvest_object.guid = iso_guid - harvest_object.add() - - # Generate GUID if not present (i.e. it's a manual import) - if not harvest_object.guid: - m = hashlib.md5() - m.update(harvest_object.content.encode('utf8', 'ignore')) - harvest_object.guid = m.hexdigest() - harvest_object.add() - - # Get document modified date - try: - metadata_modified_date = dateutil.parser.parse(iso_values['metadata-date'], ignoretz=True) - except ValueError: - self._save_object_error('Could not extract reference date for object {0} ({1})' - .format(harvest_object.id, iso_values['metadata-date']), harvest_object, 'Import') - return False - - harvest_object.metadata_modified_date = metadata_modified_date - harvest_object.add() - - - # Build the package dict - package_dict = self.get_package_dict(iso_values, harvest_object) - for harvester in p.PluginImplementations(ISpatialHarvester): - package_dict = harvester.get_package_dict(context, { - 'package_dict': package_dict, - 'iso_values': iso_values, - 'xml_tree': iso_parser.xml_tree, - 'harvest_object': harvest_object, - }) - if not package_dict: - log.error('No package dict returned, aborting import for object {0}'.format(harvest_object.id)) - return False - - # Create / update the package - context.update({ - 'extras_as_string': True, - 'api_version': '2', - 'return_id_only': True}) - - if self._site_user and context['user'] == self._site_user['name']: - context['ignore_auth'] = True - - - # The default package schema does not like Upper case tags - tag_schema = logic.schema.default_tags_schema() - tag_schema['name'] = [not_empty, unicode] - - # Flag this object as the current one - harvest_object.current = True - harvest_object.add() - - if status == 'new': - package_schema = logic.schema.default_create_package_schema() - package_schema['tags'] = tag_schema - context['schema'] = package_schema - - # We need to explicitly provide a package ID, otherwise ckanext-spatial - # won't be be able to link the extent to the package. - package_dict['id'] = unicode(uuid.uuid4()) - package_schema['id'] = [unicode] - - # Save reference to the package on the object - harvest_object.package_id = package_dict['id'] - harvest_object.add() - # Defer constraints and flush so the dataset can be indexed with - # the harvest object id (on the after_show hook from the harvester - # plugin) - model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') - model.Session.flush() - - try: - package_id = p.toolkit.get_action('package_create')(context, package_dict) - log.info('Created new package %s with guid %s', package_id, harvest_object.guid) - except p.toolkit.ValidationError, e: - self._save_object_error('Validation Error: %s' % str(e.error_summary), harvest_object, 'Import') - return False - - elif status == 'change': - - # Check if the modified date is more recent - if not self.force_import and previous_object and harvest_object.metadata_modified_date <= previous_object.metadata_modified_date: - - # Assign the previous job id to the new object to - # avoid losing history - harvest_object.harvest_job_id = previous_object.job.id - harvest_object.add() - - # Delete the previous object to avoid cluttering the object table - previous_object.delete() - - # Reindex the corresponding package to update the reference to the - # harvest object - if ((config.get('ckanext.spatial.harvest.reindex_unchanged', True) != 'False' - or self.source_config.get('reindex_unchanged') != 'False') - and harvest_object.package_id): - context.update({'validate': False, 'ignore_auth': True}) - try: - package_dict = logic.get_action('package_show')(context, - {'id': harvest_object.package_id}) - except p.toolkit.ObjectNotFound: - pass - else: - for extra in package_dict.get('extras', []): - if extra['key'] == 'harvest_object_id': - extra['value'] = harvest_object.id - if package_dict: - package_index = PackageSearchIndex() - package_index.index_package(package_dict) - - log.info('Document with GUID %s unchanged, skipping...' % (harvest_object.guid)) - else: - package_schema = logic.schema.default_update_package_schema() - package_schema['tags'] = tag_schema - context['schema'] = package_schema - - package_dict['id'] = harvest_object.package_id - try: - package_id = p.toolkit.get_action('package_update')(context, package_dict) - log.info('Updated package %s with guid %s', package_id, harvest_object.guid) - except p.toolkit.ValidationError, e: - self._save_object_error('Validation Error: %s' % str(e.error_summary), harvest_object, 'Import') - return False - - model.Session.commit() - - return True - ## - - def _is_wms(self, url): - ''' - Checks if the provided URL actually points to a Web Map Service. - Uses owslib WMS reader to parse the response. - ''' - try: - capabilities_url = wms.WMSCapabilitiesReader().capabilities_url(url) - res = urllib2.urlopen(capabilities_url, None, 10) - xml = res.read() - - s = wms.WebMapService(url, xml=xml) - return isinstance(s.contents, dict) and s.contents != {} - except Exception, e: - log.error('WMS check for %s failed with exception: %s' % (url, str(e))) - return False - - def _get_object_extra(self, harvest_object, key): - ''' - Helper function for retrieving the value from a harvest object extra, - given the key - ''' - for extra in harvest_object.extras: - if extra.key == key: - return extra.value - return None - - def _set_source_config(self, config_str): - ''' - Loads the source configuration JSON object into a dict for - convenient access - ''' - if config_str: - self.source_config = json.loads(config_str) - log.debug('Using config: %r', self.source_config) - else: - self.source_config = {} - - def _get_validator(self): - ''' - Returns the validator object using the relevant profiles - - The profiles to be used are assigned in the following order: - - 1. 'validator_profiles' property of the harvest source config object - 2. 'ckan.spatial.validator.profiles' configuration option in the ini file - 3. Default value as defined in DEFAULT_VALIDATOR_PROFILES - ''' - if not hasattr(self, '_validator'): - if hasattr(self, 'source_config') and self.source_config.get('validator_profiles', None): - profiles = self.source_config.get('validator_profiles') - elif config.get('ckan.spatial.validator.profiles', None): - profiles = [ - x.strip() for x in - config.get('ckan.spatial.validator.profiles').split(',') - ] - else: - profiles = DEFAULT_VALIDATOR_PROFILES - self._validator = Validators(profiles=profiles) - - # Add any custom validators from extensions - for plugin_with_validators in p.PluginImplementations(ISpatialHarvester): - custom_validators = plugin_with_validators.get_validators() - for custom_validator in custom_validators: - if custom_validator not in all_validators: - self._validator.add_validator(custom_validator) - - - return self._validator - - def _get_user_name(self): - ''' - Returns the name of the user that will perform the harvesting actions - (deleting, updating and creating datasets) - - By default this will be the internal site admin user. This is the - recommended setting, but if necessary it can be overridden with the - `ckanext.spatial.harvest.user_name` config option, eg to support the - old hardcoded 'harvest' user: - - ckanext.spatial.harvest.user_name = harvest - - ''' - if self._user_name: - return self._user_name - - context = {'model': model, - 'ignore_auth': True, - 'defer_commit': True, # See ckan/ckan#1714 - } - self._site_user = p.toolkit.get_action('get_site_user')(context, {}) - - config_user_name = config.get('ckanext.spatial.harvest.user_name') - if config_user_name: - self._user_name = config_user_name - else: - self._user_name = self._site_user['name'] - - return self._user_name - - def _get_content(self, url): - ''' - DEPRECATED: Use _get_content_as_unicode instead - ''' - - parts = urlparse.urlsplit(url) - if parts.username and parts.password: - url_path = parts.path.rsplit('/', 1)[0] - url_path = url_path + '/xml.user.login' - if parts.port is None: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - url_path, - parts.query, - parts.fragment - )) - else: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.port, - url_path, - parts.query - )) - log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) - auth_data = Document() - root = auth_data.createElement('request') - auth_data.appendChild(root) - username_tag = auth_data.createElement('username') - user_data = auth_data.createTextNode(parts.username) - username_tag.appendChild(user_data) - root.appendChild(username_tag) - password_tag = auth_data.createElement('password') - password_data = auth_data.createTextNode(parts.password) - password_tag.appendChild(password_data) - root.appendChild(password_tag) - xml_auth_data = auth_data.toprettyxml(indent=" ") - - req_headers = {'Content-Type': 'application/xml'} - - sess = requests.Session() - req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers) - opener = urllib2.build_opener() - - url = url.replace(' ', '%20') - if opener: - http_response = opener.open(url) - else: - http_response = urllib2.urlopen(url) - return http_response.read() - - def _get_content_as_unicode(self, url): - ''' - Get remote content as unicode. - - We let requests handle the conversion [1] , which will use the - content-type header first or chardet if the header is missing - (requests uses its own embedded chardet version). - - As we will be storing and serving the contents as unicode, we actually - replace the original XML encoding declaration with an UTF-8 one. - - - [1] http://github.com/kennethreitz/requests/blob/63243b1e3b435c7736acf1e51c0f6fa6666d861d/requests/models.py#L811 - - ''' - parts = urlparse.urlsplit(url) - if parts.username and parts.password: - url_path = parts.path.rsplit('/', 1)[0] - url_path = url_path + '/xml.user.login' - if parts.port is None: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - url_path, - parts.query, - parts.fragment - )) - else: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.port, - url_path, - parts.query - )) - log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) - auth_data = Document() - root = auth_data.createElement('request') - auth_data.appendChild(root) - username_tag = auth_data.createElement('username') - user_data = auth_data.createTextNode(parts.username) - username_tag.appendChild(user_data) - root.appendChild(username_tag) - password_tag = auth_data.createElement('password') - password_data = auth_data.createTextNode(parts.password) - password_tag.appendChild(password_data) - root.appendChild(password_tag) - xml_auth_data = auth_data.toprettyxml(indent=" ") - - req_headers = {'Content-Type': 'application/xml'} - - geo_session = requests.Session() - geo_session.post(url=auth_url, data=xml_auth_data, headers=req_headers) - - url = url.replace(' ', '%20') - if geo_session: - response = geo_session.get(url, timeout=10) - else: - response = requests.get(url, timeout=10) - - content = response.text - - # Remove original XML declaration - content = re.sub('<\?xml(.*)\?>', '', content) - - # Get rid of the BOM and other rubbish at the beginning of the file - content = re.sub('.*?<', '<', content, 1) - content = content[content.index('<'):] - - return content - - def _validate_document(self, document_string, harvest_object, validator=None): - ''' - Validates an XML document with the default, or if present, the - provided validators. - - It will create a HarvestObjectError for each validation error found, - so they can be shown properly on the frontend. - - Returns a tuple, with a boolean showing whether the validation passed - or not, the profile used and a list of errors (tuples with error - message and error lines if present). - ''' - if not validator: - validator = self._get_validator() - - document_string = re.sub('<\?xml(.*)\?>', '', document_string) - - try: - xml = etree.fromstring(document_string) - except etree.XMLSyntaxError, e: - self._save_object_error('Could not parse XML file: {0}'.format(str(e)), harvest_object, 'Import') - return False, None, [] - - valid, profile, errors = validator.is_valid(xml) - if not valid: - log.error('Validation errors found using profile {0} for object with GUID {1}'.format(profile, harvest_object.guid)) - for error in errors: - self._save_object_error(error[0], harvest_object, 'Validation', line=error[1]) - - return valid, profile, errors diff --git a/ckan/ckan/files/ckanharvester.py b/ckan/ckan/files/ckanharvester.py deleted file mode 100644 index ff1f2fee..00000000 --- a/ckan/ckan/files/ckanharvester.py +++ /dev/null @@ -1,574 +0,0 @@ -import urllib -import urllib2 -import urlparse -import httplib -import datetime -import socket - -from sqlalchemy import exists - -from ckan.lib.base import c -from ckan import model -from ckan.logic import ValidationError, NotFound, get_action -from ckan.lib.helpers import json -from ckan.lib.munge import munge_name -from ckan.plugins import toolkit - -from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError - -import logging -log = logging.getLogger(__name__) - -from base import HarvesterBase - - -class CKANHarvester(HarvesterBase): - ''' - A Harvester for CKAN instances - ''' - config = None - - api_version = 2 - action_api_version = 3 - - def _get_action_api_offset(self): - return '/api/%d/action' % self.action_api_version - - def _get_search_api_offset(self): - return '%s/package_search' % self._get_action_api_offset() - - def _get_content(self, url): - http_request = urllib2.Request(url=url) - - api_key = self.config.get('api_key') - if api_key: - http_request.add_header('Authorization', api_key) - - try: - http_response = urllib2.urlopen(http_request) - except urllib2.HTTPError, e: - if e.getcode() == 404: - raise ContentNotFoundError('HTTP error: %s' % e.code) - else: - raise ContentFetchError('HTTP error: %s' % e.code) - except urllib2.URLError, e: - raise ContentFetchError('URL error: %s' % e.reason) - except httplib.HTTPException, e: - raise ContentFetchError('HTTP Exception: %s' % e) - except socket.error, e: - raise ContentFetchError('HTTP socket error: %s' % e) - except Exception, e: - raise ContentFetchError('HTTP general exception: %s' % e) - return http_response.read() - - def _get_group(self, base_url, group_name): - url = base_url + self._get_action_api_offset() + '/group_show?id=' + \ - munge_name(group_name) - try: - content = self._get_content(url) - return json.loads(content) - except (ContentFetchError, ValueError): - log.debug('Could not fetch/decode remote group') - raise RemoteResourceError('Could not fetch/decode remote group') - - def _get_organization(self, base_url, org_name): - url = base_url + self._get_action_api_offset() + \ - '/organization_show?id=' + org_name - try: - content = self._get_content(url) - content_dict = json.loads(content) - return content_dict['result'] - except (ContentFetchError, ValueError, KeyError): - log.debug('Could not fetch/decode remote group') - raise RemoteResourceError( - 'Could not fetch/decode remote organization') - - def _set_config(self, config_str): - if config_str: - self.config = json.loads(config_str) - if 'api_version' in self.config: - self.api_version = int(self.config['api_version']) - - log.debug('Using config: %r', self.config) - else: - self.config = {} - - def info(self): - return { - 'name': 'ckan', - 'title': 'CKAN', - 'description': 'Harvests remote CKAN instances', - 'form_config_interface': 'Text' - } - - def validate_config(self, config): - if not config: - return config - - try: - config_obj = json.loads(config) - - if 'api_version' in config_obj: - try: - int(config_obj['api_version']) - except ValueError: - raise ValueError('api_version must be an integer') - - if 'default_tags' in config_obj: - if not isinstance(config_obj['default_tags'], list): - raise ValueError('default_tags must be a list') - - if 'default_groups' in config_obj: - if not isinstance(config_obj['default_groups'], list): - raise ValueError('default_groups must be a list') - - # Check if default groups exist - context = {'model': model, 'user': c.user} - for group_name in config_obj['default_groups']: - try: - group = get_action('group_show')( - context, {'id': group_name}) - except NotFound, e: - raise ValueError('Default group not found') - - if 'default_extras' in config_obj: - if not isinstance(config_obj['default_extras'], dict): - raise ValueError('default_extras must be a dictionary') - - if 'user' in config_obj: - # Check if user exists - context = {'model': model, 'user': c.user} - try: - user = get_action('user_show')( - context, {'id': config_obj.get('user')}) - except NotFound: - raise ValueError('User not found') - - for key in ('read_only', 'force_all'): - if key in config_obj: - if not isinstance(config_obj[key], bool): - raise ValueError('%s must be boolean' % key) - - except ValueError, e: - raise e - - return config - - def gather_stage(self, harvest_job): - log.debug('In CKANHarvester gather_stage (%s)', - harvest_job.source.url) - toolkit.requires_ckan_version(min_version='2.0') - get_all_packages = True - - self._set_config(harvest_job.source.config) - - # URL cleanup - parts = urlparse.urlsplit(harvest_job.source.url) - if parts.username and parts.password: - if parts.port is None: - cleaned_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - url_path, - parts.query, - parts.fragment - )) - else: - cleaned_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.port, - url_path, - parts.query - )) - - # Get source URL - remote_ckan_base_url = cleaned_url.rstrip('/') - - # Filter in/out datasets from particular organizations - fq_terms = [] - org_filter_include = self.config.get('organizations_filter_include', []) - org_filter_exclude = self.config.get('organizations_filter_exclude', []) - if org_filter_include: - fq_terms.append(' OR '.join( - 'organization:%s' % org_name for org_name in org_filter_include)) - elif org_filter_exclude: - fq_terms.extend( - '-organization:%s' % org_name for org_name in org_filter_exclude) - - # Ideally we can request from the remote CKAN only those datasets - # modified since the last completely successful harvest. - last_error_free_job = self._last_error_free_job(harvest_job) - log.debug('Last error-free job: %r', last_error_free_job) - if (last_error_free_job and - not self.config.get('force_all', False)): - get_all_packages = False - - # Request only the datasets modified since - last_time = last_error_free_job.gather_started - # Note: SOLR works in UTC, and gather_started is also UTC, so - # this should work as long as local and remote clocks are - # relatively accurate. Going back a little earlier, just in case. - get_changes_since = \ - (last_time - datetime.timedelta(hours=1)).isoformat() - log.info('Searching for datasets modified since: %s UTC', - get_changes_since) - - fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \ - .format(since=get_changes_since) - - try: - pkg_dicts = self._search_for_datasets( - remote_ckan_base_url, - fq_terms + [fq_since_last_time]) - except SearchError, e: - log.info('Searching for datasets changed since last time ' - 'gave an error: %s', e) - get_all_packages = True - - if not get_all_packages and not pkg_dicts: - log.info('No datasets have been updated on the remote ' - 'CKAN instance since the last harvest job %s', - last_time) - return None - - # Fall-back option - request all the datasets from the remote CKAN - if get_all_packages: - # Request all remote packages - try: - pkg_dicts = self._search_for_datasets(remote_ckan_base_url, - fq_terms) - except SearchError, e: - log.info('Searching for all datasets gave an error: %s', e) - self._save_gather_error( - 'Unable to search remote CKAN for datasets:%s url:%s' - 'terms:%s' % (e, remote_ckan_base_url, fq_terms), - harvest_job) - return None - if not pkg_dicts: - self._save_gather_error( - 'No datasets found at CKAN: %s' % remote_ckan_base_url, - harvest_job) - return None - - # Create harvest objects for each dataset - try: - package_ids = set() - object_ids = [] - for pkg_dict in pkg_dicts: - if pkg_dict['id'] in package_ids: - log.info('Discarding duplicate dataset %s - probably due ' - 'to datasets being changed at the same time as ' - 'when the harvester was paging through', - pkg_dict['id']) - continue - package_ids.add(pkg_dict['id']) - - log.debug('Creating HarvestObject for %s %s', - pkg_dict['name'], pkg_dict['id']) - obj = HarvestObject(guid=pkg_dict['id'], - job=harvest_job, - content=json.dumps(pkg_dict)) - obj.save() - object_ids.append(obj.id) - - return object_ids - except Exception, e: - self._save_gather_error('%r' % e.message, harvest_job) - - def _search_for_datasets(self, remote_ckan_base_url, fq_terms=None): - '''Does a dataset search on a remote CKAN and returns the results. - - Deals with paging to return all the results, not just the first page. - ''' - base_search_url = remote_ckan_base_url + self._get_search_api_offset() - params = {'rows': '100', 'start': '0'} - # There is the worry that datasets will be changed whilst we are paging - # through them. - # * In SOLR 4.7 there is a cursor, but not using that yet - # because few CKANs are running that version yet. - # * However we sort, then new names added or removed before the current - # page would cause existing names on the next page to be missed or - # double counted. - # * Another approach might be to sort by metadata_modified and always - # ask for changes since (and including) the date of the last item of - # the day before. However if the entire page is of the exact same - # time, then you end up in an infinite loop asking for the same page. - # * We choose a balanced approach of sorting by ID, which means - # datasets are only missed if some are removed, which is far less - # likely than any being added. If some are missed then it is assumed - # they will harvested the next time anyway. When datasets are added, - # we are at risk of seeing datasets twice in the paging, so we detect - # and remove any duplicates. - params['sort'] = 'id asc' - if fq_terms: - params['fq'] = ' '.join(fq_terms) - - pkg_dicts = [] - pkg_ids = set() - previous_content = None - while True: - url = base_search_url + '?' + urllib.urlencode(params) - log.debug('Searching for CKAN datasets: %s', url) - try: - content = self._get_content(url) - except ContentFetchError, e: - raise SearchError( - 'Error sending request to search remote ' - 'CKAN instance %s using URL %r. Error: %s' % - (remote_ckan_base_url, url, e)) - - if previous_content and content == previous_content: - raise SearchError('The paging doesn\'t seem to work. URL: %s' % - url) - try: - response_dict = json.loads(content) - except ValueError: - raise SearchError('Response from remote CKAN was not JSON: %r' - % content) - try: - pkg_dicts_page = response_dict.get('result', {}).get('results', - []) - except ValueError: - raise SearchError('Response JSON did not contain ' - 'result/results: %r' % response_dict) - - # Weed out any datasets found on previous pages (should datasets be - # changing while we page) - ids_in_page = set(p['id'] for p in pkg_dicts_page) - duplicate_ids = ids_in_page & pkg_ids - if duplicate_ids: - pkg_dicts_page = [p for p in pkg_dicts_page - if p['id'] not in duplicate_ids] - pkg_ids |= ids_in_page - - pkg_dicts.extend(pkg_dicts_page) - - if len(pkg_dicts_page) == 0: - break - - params['start'] = str(int(params['start']) + int(params['rows'])) - - return pkg_dicts - - @classmethod - def _last_error_free_job(cls, harvest_job): - # TODO weed out cancelled jobs somehow. - # look for jobs with no gather errors - jobs = \ - model.Session.query(HarvestJob) \ - .filter(HarvestJob.source == harvest_job.source) \ - .filter(HarvestJob.gather_started != None) \ - .filter(HarvestJob.status == 'Finished') \ - .filter(HarvestJob.id != harvest_job.id) \ - .filter( - ~exists().where( - HarvestGatherError.harvest_job_id == HarvestJob.id)) \ - .order_by(HarvestJob.gather_started.desc()) - # now check them until we find one with no fetch/import errors - # (looping rather than doing sql, in case there are lots of objects - # and lots of jobs) - for job in jobs: - for obj in job.objects: - if obj.current is False and \ - obj.report_status != 'not modified': - # unsuccessful, so go onto the next job - break - else: - return job - - def fetch_stage(self, harvest_object): - # Nothing to do here - we got the package dict in the search in the - # gather stage - return True - - def import_stage(self, harvest_object): - log.debug('In CKANHarvester import_stage') - - context = {'model': model, 'session': model.Session, - 'user': self._get_user_name()} - if not harvest_object: - log.error('No harvest object received') - return False - - if harvest_object.content is None: - self._save_object_error('Empty content for object %s' % - harvest_object.id, - harvest_object, 'Import') - return False - - self._set_config(harvest_object.job.source.config) - - try: - package_dict = json.loads(harvest_object.content) - - if package_dict.get('type') == 'harvest': - log.warn('Remote dataset is a harvest source, ignoring...') - return True - - # Set default tags if needed - default_tags = self.config.get('default_tags', []) - if default_tags: - if not 'tags' in package_dict: - package_dict['tags'] = [] - package_dict['tags'].extend( - [t for t in default_tags if t not in package_dict['tags']]) - - remote_groups = self.config.get('remote_groups', None) - if not remote_groups in ('only_local', 'create'): - # Ignore remote groups - package_dict.pop('groups', None) - else: - if not 'groups' in package_dict: - package_dict['groups'] = [] - - # check if remote groups exist locally, otherwise remove - validated_groups = [] - - for group_name in package_dict['groups']: - try: - data_dict = {'id': group_name} - group = get_action('group_show')(context, data_dict) - if self.api_version == 1: - validated_groups.append(group['name']) - else: - validated_groups.append(group['id']) - except NotFound, e: - log.info('Group %s is not available', group_name) - if remote_groups == 'create': - try: - group = self._get_group(harvest_object.source.url, group_name) - except RemoteResourceError: - log.error('Could not get remote group %s', group_name) - continue - - for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name']: - group.pop(key, None) - - get_action('group_create')(context, group) - log.info('Group %s has been newly created', group_name) - if self.api_version == 1: - validated_groups.append(group['name']) - else: - validated_groups.append(group['id']) - - package_dict['groups'] = validated_groups - - - # Local harvest source organization - source_dataset = get_action('package_show')(context, {'id': harvest_object.source.id}) - local_org = source_dataset.get('owner_org') - - remote_orgs = self.config.get('remote_orgs', None) - - if not remote_orgs in ('only_local', 'create'): - # Assign dataset to the source organization - package_dict['owner_org'] = local_org - else: - if not 'owner_org' in package_dict: - package_dict['owner_org'] = None - - # check if remote org exist locally, otherwise remove - validated_org = None - remote_org = package_dict['owner_org'] - - if remote_org: - try: - data_dict = {'id': remote_org} - org = get_action('organization_show')(context, data_dict) - validated_org = org['id'] - except NotFound, e: - log.info('Organization %s is not available', remote_org) - if remote_orgs == 'create': - try: - try: - org = self._get_organization(harvest_object.source.url, remote_org) - except RemoteResourceError: - # fallback if remote CKAN exposes organizations as groups - # this especially targets older versions of CKAN - org = self._get_group(harvest_object.source.url, remote_org) - - for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']: - org.pop(key, None) - get_action('organization_create')(context, org) - log.info('Organization %s has been newly created', remote_org) - validated_org = org['id'] - except (RemoteResourceError, ValidationError): - log.error('Could not get remote org %s', remote_org) - - package_dict['owner_org'] = validated_org or local_org - - # Set default groups if needed - default_groups = self.config.get('default_groups', []) - if default_groups: - if not 'groups' in package_dict: - package_dict['groups'] = [] - package_dict['groups'].extend( - [g for g in default_groups - if g not in package_dict['groups']]) - - # Set default extras if needed - default_extras = self.config.get('default_extras', {}) - def get_extra(key, package_dict): - for extra in package_dict.get('extras', []): - if extra['key'] == key: - return extra - if default_extras: - override_extras = self.config.get('override_extras', False) - if not 'extras' in package_dict: - package_dict['extras'] = {} - for key, value in default_extras.iteritems(): - existing_extra = get_extra(key, package_dict) - if existing_extra and not override_extras: - continue # no need for the default - if existing_extra: - package_dict['extras'].remove(existing_extra) - # Look for replacement strings - if isinstance(value, basestring): - value = value.format( - harvest_source_id=harvest_object.job.source.id, - harvest_source_url= - harvest_object.job.source.url.strip('/'), - harvest_source_title= - harvest_object.job.source.title, - harvest_job_id=harvest_object.job.id, - harvest_object_id=harvest_object.id, - dataset_id=package_dict['id']) - - package_dict['extras'].append({'key': key, 'value': value}) - - for resource in package_dict.get('resources', []): - # Clear remote url_type for resources (eg datastore, upload) as - # we are only creating normal resources with links to the - # remote ones - resource.pop('url_type', None) - - # Clear revision_id as the revision won't exist on this CKAN - # and saving it will cause an IntegrityError with the foreign - # key. - resource.pop('revision_id', None) - - result = self._create_or_update_package( - package_dict, harvest_object, package_dict_form='package_show') - - return result - except ValidationError, e: - self._save_object_error('Invalid package with GUID %s: %r' % - (harvest_object.guid, e.error_dict), - harvest_object, 'Import') - except Exception, e: - self._save_object_error('%s' % e, harvest_object, 'Import') - - -class ContentFetchError(Exception): - pass - -class ContentNotFoundError(ContentFetchError): - pass - -class RemoteResourceError(Exception): - pass - - -class SearchError(Exception): - pass diff --git a/ckan/ckan/files/csw.py b/ckan/ckan/files/csw.py deleted file mode 100644 index cf329d9e..00000000 --- a/ckan/ckan/files/csw.py +++ /dev/null @@ -1,313 +0,0 @@ -import re -import urllib -import urllib2 -import requests -import urlparse -import lxml -from xml.dom.minidom import Document - -import logging - -from ckan import model - -from ckan.plugins.core import SingletonPlugin, implements - -from ckanext.harvest.interfaces import IHarvester -from ckanext.harvest.model import HarvestObject -from ckanext.harvest.model import HarvestObjectExtra as HOExtra - -from ckanext.spatial.lib.csw_client import CswService -from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback - - -class CSWHarvester(SpatialHarvester, SingletonPlugin): - ''' - A Harvester for CSW servers - ''' - implements(IHarvester) - - csw=None - - def info(self): - return { - 'name': 'csw', - 'title': 'CSW Server', - 'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard' - } - - - def get_original_url(self, harvest_object_id): - obj = model.Session.query(HarvestObject).\ - filter(HarvestObject.id==harvest_object_id).\ - first() - - parts = urlparse.urlparse(obj.source.url) - - params = { - 'SERVICE': 'CSW', - 'VERSION': '2.0.2', - 'REQUEST': 'GetRecordById', - 'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd', - 'OUTPUTFORMAT':'application/xml' , - 'ID': obj.guid - } - - if parts.username and parts.password: - if parts.port is None: - url = urlparse.urlunparse(( - parts.scheme, - parts.hostname, - parts.path, - None, - urllib.urlencode(params), - None - )) - else: - url = urlparse.urlunparse(( - parts.scheme, - parts.hostname, - parts.port, - parts.path, - urllib.urlencode(params), - None - )) - else: - url = urlparse.urlunparse(( - parts.scheme, - parts.netloc, - parts.path, - None, - urllib.urlencode(params), - None - )) - - return url - - def output_schema(self): - return 'gmd' - - def gather_stage(self, harvest_job): - log = logging.getLogger(__name__ + '.CSW.gather') - log.debug('CswHarvester gather_stage for job: %r', harvest_job) - # Get source URL - url = harvest_job.source.url - - parts = urlparse.urlsplit(url) - if parts.username and parts.password: - url_path = parts.path.rsplit('/', 1)[0] - url_path = url_path + '/xml.user.login' - if parts.port is None: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - url_path, - parts.query, - parts.fragment - )) - else: - auth_url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.port, - url_path, - parts.query - )) - log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password) - auth_data = Document() - root = auth_data.createElement('request') - auth_data.appendChild(root) - username_tag = auth_data.createElement('username') - user_data = auth_data.createTextNode(parts.username) - username_tag.appendChild(user_data) - root.appendChild(username_tag) - password_tag = auth_data.createElement('password') - password_data = auth_data.createTextNode(parts.password) - password_tag.appendChild(password_data) - root.appendChild(password_tag) - xml_auth_data = auth_data.toprettyxml(indent=" ") - - req_headers = {'Content-Type': 'application/xml'} - - sess = requests.Session() - log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url) - req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers) - log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies) - - if parts.username and parts.password: - if parts.port is None: - url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.path, - parts.query, - parts.fragment - )) - else: - url = urlparse.urlunsplit(( - parts.scheme, - parts.hostname, - parts.port, - parts.path, - parts.query - )) - - self._set_source_config(harvest_job.source.config) - - try: - log.debug('Gather stage. Contacting the geoserver, URL is %s', url) - res = sess.get(url) - log.debug('Gather stage. Geoserver contacted, result is %s', res) - self._setup_csw_client(sess.get(url)) - #self._setup_csw_client(url) - except Exception, e: - self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job) - return None - - query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\ - filter(HarvestObject.current==True).\ - filter(HarvestObject.harvest_source_id==harvest_job.source.id) - guid_to_package_id = {} - - for guid, package_id in query: - guid_to_package_id[guid] = package_id - - guids_in_db = set(guid_to_package_id.keys()) - - # extract cql filter if any - cql = self.source_config.get('cql') - - log.debug('Starting gathering for %s' % url) - guids_in_harvest = set() - try: - for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql): - try: - log.info('Got identifier %s from the CSW', identifier) - if identifier is None: - log.error('CSW returned identifier %r, skipping...' % identifier) - continue - - guids_in_harvest.add(identifier) - except Exception, e: - self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job) - continue - - - except Exception, e: - log.error('Exception: %s' % text_traceback()) - self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job) - return None - - new = guids_in_harvest - guids_in_db - delete = guids_in_db - guids_in_harvest - change = guids_in_db & guids_in_harvest - - ids = [] - for guid in new: - obj = HarvestObject(guid=guid, job=harvest_job, - extras=[HOExtra(key='status', value='new')]) - obj.save() - ids.append(obj.id) - for guid in change: - obj = HarvestObject(guid=guid, job=harvest_job, - package_id=guid_to_package_id[guid], - extras=[HOExtra(key='status', value='change')]) - obj.save() - ids.append(obj.id) - for guid in delete: - obj = HarvestObject(guid=guid, job=harvest_job, - package_id=guid_to_package_id[guid], - extras=[HOExtra(key='status', value='delete')]) - model.Session.query(HarvestObject).\ - filter_by(guid=guid).\ - update({'current': False}, False) - obj.save() - ids.append(obj.id) - - if len(ids) == 0: - self._save_gather_error('No records received from the CSW server', harvest_job) - return None - - return ids - - def fetch_stage(self,harvest_object): - - # Check harvest object status - status = self._get_object_extra(harvest_object, 'status') - - if status == 'delete': - # No need to fetch anything, just pass to the import stage - return True - - log = logging.getLogger(__name__ + '.CSW.fetch') - log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id) - - url = harvest_object.source.url - parts = urlparse.urlparse(url) - if parts.username and parts.password: - if parts.port is None: - url = urlparse.urlunparse(( - parts.scheme, - parts.hostname, - parts.path, - None, - None, - None - )) - else: - url = urlparse.urlunparse(( - parts.scheme, - parts.hostname, - parts.port, - parts.path, - None, - None - )) - else: - url = urlparse.urlunparse(( - parts.scheme, - parts.netloc, - parts.path, - None, - None, - None - )) - - try: - self._setup_csw_client(url) - log.debug('Fetch stage. Contacting the geoserver, URL is %s', url) - except Exception, e: - self._save_object_error('Error contacting the CSW server: %s' % e, - harvest_object) - return False - - identifier = harvest_object.guid - try: - record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema()) - except Exception, e: - self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object) - return False - - if record is None: - self._save_object_error('Empty record for GUID %s' % identifier, - harvest_object) - return False - - try: - # Save the fetch contents in the HarvestObject - # Contents come from csw_client already declared and encoded as utf-8 - # Remove original XML declaration - content = re.sub('<\?xml(.*)\?>', '', record['xml']) - - harvest_object.content = content.strip() - harvest_object.save() - except Exception,e: - self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \ - (identifier, e), harvest_object) - return False - - log.debug('XML content saved (len %s)', len(record['xml'])) - return True - - def _setup_csw_client(self, url): - self.csw = CswService(url) - diff --git a/ckan/ckan/tasks/ckan-plugins.yml b/ckan/ckan/tasks/ckan-plugins.yml index 1a5cf9a3..315d65c4 100644 --- a/ckan/ckan/tasks/ckan-plugins.yml +++ b/ckan/ckan/tasks/ckan-plugins.yml @@ -126,19 +126,3 @@ when: ( install_ldap_plugin | changed ) notify: Restart CKAN tags: [ 'ckan', 'ckan_ldap', 'ckan_plugins' ] - -# - name: Overwrite the base.py ckanext-spatial plugin file to enable authentication against the Geonetwork nodes -# copy: src=base.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/base.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes -# notify: Restart CKAN -# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_geo_auth' ] - -# - name: Overwrite the ckanharvester.py ckanext plugin file to remove the authentication data from URLs -# copy: src=ckanharvester.py dest=/usr/lib/ckan/default/src/ckanext-harvest/ckanext/harvest/harvesters/ckanharvester.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes -# notify: Restart CKAN -# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanharvester' ] - -# - name: Overwrite the csw.py ckanext-spatial plugin file to remove the authentication data from URLs -# copy: src=csw.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/csw.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes -# notify: Restart CKAN -# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanext_spatial' ] -