forked from ISTI-ansible-roles/ansible-roles
library/roles/ckan/ckan: Move some files and tasks to the role inside d4science-ghn-cluster.
This commit is contained in:
parent
4358e9fc3a
commit
25045f9606
|
@ -1,916 +0,0 @@
|
||||||
import re
|
|
||||||
import cgitb
|
|
||||||
import warnings
|
|
||||||
import urllib2
|
|
||||||
import requests
|
|
||||||
import sys
|
|
||||||
import logging
|
|
||||||
from string import Template
|
|
||||||
import urlparse
|
|
||||||
from datetime import datetime
|
|
||||||
import uuid
|
|
||||||
import hashlib
|
|
||||||
import dateutil
|
|
||||||
import mimetypes
|
|
||||||
|
|
||||||
|
|
||||||
from pylons import config
|
|
||||||
from owslib import wms
|
|
||||||
import lxml
|
|
||||||
from xml.dom.minidom import Document
|
|
||||||
|
|
||||||
from ckan import plugins as p
|
|
||||||
from ckan import model
|
|
||||||
from ckan.lib.helpers import json
|
|
||||||
from ckan import logic
|
|
||||||
from ckan.lib.navl.validators import not_empty
|
|
||||||
from ckan.lib.search.index import PackageSearchIndex
|
|
||||||
|
|
||||||
from ckanext.harvest.harvesters.base import HarvesterBase
|
|
||||||
from ckanext.harvest.model import HarvestObject
|
|
||||||
|
|
||||||
from ckanext.spatial.validation import Validators, all_validators
|
|
||||||
from ckanext.spatial.model import ISODocument
|
|
||||||
from ckanext.spatial.interfaces import ISpatialHarvester
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
DEFAULT_VALIDATOR_PROFILES = ['iso19139']
|
|
||||||
|
|
||||||
|
|
||||||
def text_traceback():
|
|
||||||
with warnings.catch_warnings():
|
|
||||||
warnings.simplefilter("ignore")
|
|
||||||
res = 'the original traceback:'.join(
|
|
||||||
cgitb.text(sys.exc_info()).split('the original traceback:')[1:]
|
|
||||||
).strip()
|
|
||||||
return res
|
|
||||||
|
|
||||||
|
|
||||||
def guess_standard(content):
|
|
||||||
lowered = content.lower()
|
|
||||||
if '</gmd:MD_Metadata>'.lower() in lowered:
|
|
||||||
return 'iso'
|
|
||||||
if '</gmi:MI_Metadata>'.lower() in lowered:
|
|
||||||
return 'iso'
|
|
||||||
if '</metadata>'.lower() in lowered:
|
|
||||||
return 'fgdc'
|
|
||||||
return 'unknown'
|
|
||||||
|
|
||||||
|
|
||||||
def guess_resource_format(url, use_mimetypes=True):
|
|
||||||
'''
|
|
||||||
Given a URL try to guess the best format to assign to the resource
|
|
||||||
|
|
||||||
The function looks for common patterns in popular geospatial services and
|
|
||||||
file extensions, so it may not be 100% accurate. It just looks at the
|
|
||||||
provided URL, it does not attempt to perform any remote check.
|
|
||||||
|
|
||||||
if 'use_mimetypes' is True (default value), the mimetypes module will be
|
|
||||||
used if no match was found before.
|
|
||||||
|
|
||||||
Returns None if no format could be guessed.
|
|
||||||
|
|
||||||
'''
|
|
||||||
url = url.lower().strip()
|
|
||||||
|
|
||||||
resource_types = {
|
|
||||||
# OGC
|
|
||||||
'wms': ('service=wms', 'geoserver/wms', 'mapserver/wmsserver', 'com.esri.wms.Esrimap', 'service/wms'),
|
|
||||||
'wfs': ('service=wfs', 'geoserver/wfs', 'mapserver/wfsserver', 'com.esri.wfs.Esrimap'),
|
|
||||||
'wcs': ('service=wcs', 'geoserver/wcs', 'imageserver/wcsserver', 'mapserver/wcsserver'),
|
|
||||||
'sos': ('service=sos',),
|
|
||||||
'csw': ('service=csw',),
|
|
||||||
# ESRI
|
|
||||||
'kml': ('mapserver/generatekml',),
|
|
||||||
'arcims': ('com.esri.esrimap.esrimap',),
|
|
||||||
'arcgis_rest': ('arcgis/rest/services',),
|
|
||||||
}
|
|
||||||
|
|
||||||
for resource_type, parts in resource_types.iteritems():
|
|
||||||
if any(part in url for part in parts):
|
|
||||||
return resource_type
|
|
||||||
|
|
||||||
file_types = {
|
|
||||||
'kml' : ('kml',),
|
|
||||||
'kmz': ('kmz',),
|
|
||||||
'gml': ('gml',),
|
|
||||||
}
|
|
||||||
|
|
||||||
for file_type, extensions in file_types.iteritems():
|
|
||||||
if any(url.endswith(extension) for extension in extensions):
|
|
||||||
return file_type
|
|
||||||
|
|
||||||
resource_format, encoding = mimetypes.guess_type(url)
|
|
||||||
if resource_format:
|
|
||||||
return resource_format
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
class SpatialHarvester(HarvesterBase):
|
|
||||||
|
|
||||||
_user_name = None
|
|
||||||
|
|
||||||
_site_user = None
|
|
||||||
|
|
||||||
source_config = {}
|
|
||||||
|
|
||||||
force_import = False
|
|
||||||
|
|
||||||
extent_template = Template('''
|
|
||||||
{"type": "Polygon", "coordinates": [[[$xmin, $ymin], [$xmax, $ymin], [$xmax, $ymax], [$xmin, $ymax], [$xmin, $ymin]]]}
|
|
||||||
''')
|
|
||||||
|
|
||||||
## IHarvester
|
|
||||||
|
|
||||||
def validate_config(self, source_config):
|
|
||||||
if not source_config:
|
|
||||||
return source_config
|
|
||||||
|
|
||||||
try:
|
|
||||||
source_config_obj = json.loads(source_config)
|
|
||||||
|
|
||||||
if 'validator_profiles' in source_config_obj:
|
|
||||||
if not isinstance(source_config_obj['validator_profiles'], list):
|
|
||||||
raise ValueError('validator_profiles must be a list')
|
|
||||||
|
|
||||||
# Check if all profiles exist
|
|
||||||
existing_profiles = [v.name for v in all_validators]
|
|
||||||
unknown_profiles = set(source_config_obj['validator_profiles']) - set(existing_profiles)
|
|
||||||
|
|
||||||
if len(unknown_profiles) > 0:
|
|
||||||
raise ValueError('Unknown validation profile(s): %s' % ','.join(unknown_profiles))
|
|
||||||
|
|
||||||
if 'default_tags' in source_config_obj:
|
|
||||||
if not isinstance(source_config_obj['default_tags'],list):
|
|
||||||
raise ValueError('default_tags must be a list')
|
|
||||||
|
|
||||||
if 'default_extras' in source_config_obj:
|
|
||||||
if not isinstance(source_config_obj['default_extras'],dict):
|
|
||||||
raise ValueError('default_extras must be a dictionary')
|
|
||||||
|
|
||||||
for key in ('override_extras'):
|
|
||||||
if key in source_config_obj:
|
|
||||||
if not isinstance(source_config_obj[key],bool):
|
|
||||||
raise ValueError('%s must be boolean' % key)
|
|
||||||
|
|
||||||
except ValueError, e:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
return source_config
|
|
||||||
|
|
||||||
##
|
|
||||||
|
|
||||||
## SpatialHarvester
|
|
||||||
|
|
||||||
|
|
||||||
def get_package_dict(self, iso_values, harvest_object):
|
|
||||||
'''
|
|
||||||
Constructs a package_dict suitable to be passed to package_create or
|
|
||||||
package_update. See documentation on
|
|
||||||
ckan.logic.action.create.package_create for more details
|
|
||||||
|
|
||||||
Extensions willing to modify the dict should do so implementing the
|
|
||||||
ISpatialHarvester interface
|
|
||||||
|
|
||||||
import ckan.plugins as p
|
|
||||||
from ckanext.spatial.interfaces import ISpatialHarvester
|
|
||||||
|
|
||||||
class MyHarvester(p.SingletonPlugin):
|
|
||||||
|
|
||||||
p.implements(ISpatialHarvester, inherit=True)
|
|
||||||
|
|
||||||
def get_package_dict(self, context, data_dict):
|
|
||||||
|
|
||||||
package_dict = data_dict['package_dict']
|
|
||||||
|
|
||||||
package_dict['extras'].append(
|
|
||||||
{'key': 'my-custom-extra', 'value': 'my-custom-value'}
|
|
||||||
)
|
|
||||||
|
|
||||||
return package_dict
|
|
||||||
|
|
||||||
If a dict is not returned by this function, the import stage will be cancelled.
|
|
||||||
|
|
||||||
:param iso_values: Dictionary with parsed values from the ISO 19139
|
|
||||||
XML document
|
|
||||||
:type iso_values: dict
|
|
||||||
:param harvest_object: HarvestObject domain object (with access to
|
|
||||||
job and source objects)
|
|
||||||
:type harvest_object: HarvestObject
|
|
||||||
|
|
||||||
:returns: A dataset dictionary (package_dict)
|
|
||||||
:rtype: dict
|
|
||||||
'''
|
|
||||||
|
|
||||||
tags = []
|
|
||||||
if 'tags' in iso_values:
|
|
||||||
for tag in iso_values['tags']:
|
|
||||||
tag = tag[:50] if len(tag) > 50 else tag
|
|
||||||
tags.append({'name': tag})
|
|
||||||
|
|
||||||
# Add default_tags from config
|
|
||||||
default_tags = self.source_config.get('default_tags',[])
|
|
||||||
if default_tags:
|
|
||||||
for tag in default_tags:
|
|
||||||
tags.append({'name': tag})
|
|
||||||
|
|
||||||
package_dict = {
|
|
||||||
'title': iso_values['title'],
|
|
||||||
'notes': iso_values['abstract'],
|
|
||||||
'tags': tags,
|
|
||||||
'resources': [],
|
|
||||||
}
|
|
||||||
|
|
||||||
# We need to get the owner organization (if any) from the harvest
|
|
||||||
# source dataset
|
|
||||||
source_dataset = model.Package.get(harvest_object.source.id)
|
|
||||||
if source_dataset.owner_org:
|
|
||||||
package_dict['owner_org'] = source_dataset.owner_org
|
|
||||||
|
|
||||||
# Package name
|
|
||||||
package = harvest_object.package
|
|
||||||
if package is None or package.title != iso_values['title']:
|
|
||||||
name = self._gen_new_name(iso_values['title'])
|
|
||||||
if not name:
|
|
||||||
name = self._gen_new_name(str(iso_values['guid']))
|
|
||||||
if not name:
|
|
||||||
raise Exception('Could not generate a unique name from the title or the GUID. Please choose a more unique title.')
|
|
||||||
package_dict['name'] = name
|
|
||||||
else:
|
|
||||||
package_dict['name'] = package.name
|
|
||||||
|
|
||||||
extras = {
|
|
||||||
'guid': harvest_object.guid,
|
|
||||||
'spatial_harvester': True,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Just add some of the metadata as extras, not the whole lot
|
|
||||||
for name in [
|
|
||||||
# Essentials
|
|
||||||
'spatial-reference-system',
|
|
||||||
'guid',
|
|
||||||
# Usefuls
|
|
||||||
'dataset-reference-date',
|
|
||||||
'metadata-language', # Language
|
|
||||||
'metadata-date', # Released
|
|
||||||
'coupled-resource',
|
|
||||||
'contact-email',
|
|
||||||
'frequency-of-update',
|
|
||||||
'spatial-data-service-type',
|
|
||||||
]:
|
|
||||||
extras[name] = iso_values[name]
|
|
||||||
|
|
||||||
if len(iso_values.get('progress', [])):
|
|
||||||
extras['progress'] = iso_values['progress'][0]
|
|
||||||
else:
|
|
||||||
extras['progress'] = ''
|
|
||||||
|
|
||||||
if len(iso_values.get('resource-type', [])):
|
|
||||||
extras['resource-type'] = iso_values['resource-type'][0]
|
|
||||||
else:
|
|
||||||
extras['resource-type'] = ''
|
|
||||||
|
|
||||||
extras['licence'] = iso_values.get('use-constraints', '')
|
|
||||||
|
|
||||||
def _extract_first_license_url(licences):
|
|
||||||
for licence in licences:
|
|
||||||
o = urlparse(licence)
|
|
||||||
if o.scheme and o.netloc:
|
|
||||||
return licence
|
|
||||||
return None
|
|
||||||
|
|
||||||
if len(extras['licence']):
|
|
||||||
license_url_extracted = _extract_first_license_url(extras['licence'])
|
|
||||||
if license_url_extracted:
|
|
||||||
extras['licence_url'] = license_url_extracted
|
|
||||||
|
|
||||||
|
|
||||||
# Metadata license ID check for package
|
|
||||||
use_constraints = iso_values.get('use-constraints')
|
|
||||||
if use_constraints:
|
|
||||||
|
|
||||||
context = {'model': model, 'session': model.Session, 'user': self._get_user_name()}
|
|
||||||
license_list = p.toolkit.get_action('license_list')(context, {})
|
|
||||||
|
|
||||||
for constraint in use_constraints:
|
|
||||||
package_license = None
|
|
||||||
|
|
||||||
for license in license_list:
|
|
||||||
if constraint.lower() == license.get('id') or constraint == license.get('url'):
|
|
||||||
package_license = license.get('id')
|
|
||||||
break
|
|
||||||
|
|
||||||
if package_license:
|
|
||||||
package_dict['license_id'] = package_license
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
extras['access_constraints'] = iso_values.get('limitations-on-public-access', '')
|
|
||||||
|
|
||||||
# Grpahic preview
|
|
||||||
browse_graphic = iso_values.get('browse-graphic')
|
|
||||||
if browse_graphic:
|
|
||||||
browse_graphic = browse_graphic[0]
|
|
||||||
extras['graphic-preview-file'] = browse_graphic.get('file')
|
|
||||||
if browse_graphic.get('description'):
|
|
||||||
extras['graphic-preview-description'] = browse_graphic.get('description')
|
|
||||||
if browse_graphic.get('type'):
|
|
||||||
extras['graphic-preview-type'] = browse_graphic.get('type')
|
|
||||||
|
|
||||||
|
|
||||||
for key in ['temporal-extent-begin', 'temporal-extent-end']:
|
|
||||||
if len(iso_values[key]) > 0:
|
|
||||||
extras[key] = iso_values[key][0]
|
|
||||||
|
|
||||||
# Save responsible organization roles
|
|
||||||
if iso_values['responsible-organisation']:
|
|
||||||
parties = {}
|
|
||||||
for party in iso_values['responsible-organisation']:
|
|
||||||
if party['organisation-name'] in parties:
|
|
||||||
if not party['role'] in parties[party['organisation-name']]:
|
|
||||||
parties[party['organisation-name']].append(party['role'])
|
|
||||||
else:
|
|
||||||
parties[party['organisation-name']] = [party['role']]
|
|
||||||
extras['responsible-party'] = [{'name': k, 'roles': v} for k, v in parties.iteritems()]
|
|
||||||
|
|
||||||
if len(iso_values['bbox']) > 0:
|
|
||||||
bbox = iso_values['bbox'][0]
|
|
||||||
extras['bbox-east-long'] = bbox['east']
|
|
||||||
extras['bbox-north-lat'] = bbox['north']
|
|
||||||
extras['bbox-south-lat'] = bbox['south']
|
|
||||||
extras['bbox-west-long'] = bbox['west']
|
|
||||||
|
|
||||||
try:
|
|
||||||
xmin = float(bbox['west'])
|
|
||||||
xmax = float(bbox['east'])
|
|
||||||
ymin = float(bbox['south'])
|
|
||||||
ymax = float(bbox['north'])
|
|
||||||
except ValueError, e:
|
|
||||||
self._save_object_error('Error parsing bounding box value: {0}'.format(str(e)),
|
|
||||||
harvest_object, 'Import')
|
|
||||||
else:
|
|
||||||
# Construct a GeoJSON extent so ckanext-spatial can register the extent geometry
|
|
||||||
|
|
||||||
# Some publishers define the same two corners for the bbox (ie a point),
|
|
||||||
# that causes problems in the search if stored as polygon
|
|
||||||
if xmin == xmax or ymin == ymax:
|
|
||||||
extent_string = Template('{"type": "Point", "coordinates": [$x, $y]}').substitute(
|
|
||||||
x=xmin, y=ymin
|
|
||||||
)
|
|
||||||
self._save_object_error('Point extent defined instead of polygon',
|
|
||||||
harvest_object, 'Import')
|
|
||||||
else:
|
|
||||||
extent_string = self.extent_template.substitute(
|
|
||||||
xmin=xmin, ymin=ymin, xmax=xmax, ymax=ymax
|
|
||||||
)
|
|
||||||
|
|
||||||
extras['spatial'] = extent_string.strip()
|
|
||||||
else:
|
|
||||||
log.debug('No spatial extent defined for this object')
|
|
||||||
|
|
||||||
resource_locators = iso_values.get('resource-locator', []) +\
|
|
||||||
iso_values.get('resource-locator-identification', [])
|
|
||||||
|
|
||||||
if len(resource_locators):
|
|
||||||
for resource_locator in resource_locators:
|
|
||||||
url = resource_locator.get('url', '').strip()
|
|
||||||
if url:
|
|
||||||
resource = {}
|
|
||||||
resource['format'] = guess_resource_format(url)
|
|
||||||
if resource['format'] == 'wms' and config.get('ckanext.spatial.harvest.validate_wms', False):
|
|
||||||
# Check if the service is a view service
|
|
||||||
test_url = url.split('?')[0] if '?' in url else url
|
|
||||||
if self._is_wms(test_url):
|
|
||||||
resource['verified'] = True
|
|
||||||
resource['verified_date'] = datetime.now().isoformat()
|
|
||||||
|
|
||||||
resource.update(
|
|
||||||
{
|
|
||||||
'url': url,
|
|
||||||
'name': resource_locator.get('name') or p.toolkit._('Unnamed resource'),
|
|
||||||
'description': resource_locator.get('description') or '',
|
|
||||||
'resource_locator_protocol': resource_locator.get('protocol') or '',
|
|
||||||
'resource_locator_function': resource_locator.get('function') or '',
|
|
||||||
})
|
|
||||||
package_dict['resources'].append(resource)
|
|
||||||
|
|
||||||
|
|
||||||
# Add default_extras from config
|
|
||||||
default_extras = self.source_config.get('default_extras',{})
|
|
||||||
if default_extras:
|
|
||||||
override_extras = self.source_config.get('override_extras',False)
|
|
||||||
for key,value in default_extras.iteritems():
|
|
||||||
log.debug('Processing extra %s', key)
|
|
||||||
if not key in extras or override_extras:
|
|
||||||
# Look for replacement strings
|
|
||||||
if isinstance(value,basestring):
|
|
||||||
value = value.format(harvest_source_id=harvest_object.job.source.id,
|
|
||||||
harvest_source_url=harvest_object.job.source.url.strip('/'),
|
|
||||||
harvest_source_title=harvest_object.job.source.title,
|
|
||||||
harvest_job_id=harvest_object.job.id,
|
|
||||||
harvest_object_id=harvest_object.id)
|
|
||||||
extras[key] = value
|
|
||||||
|
|
||||||
extras_as_dict = []
|
|
||||||
for key, value in extras.iteritems():
|
|
||||||
if isinstance(value, (list, dict)):
|
|
||||||
extras_as_dict.append({'key': key, 'value': json.dumps(value)})
|
|
||||||
else:
|
|
||||||
extras_as_dict.append({'key': key, 'value': value})
|
|
||||||
|
|
||||||
package_dict['extras'] = extras_as_dict
|
|
||||||
|
|
||||||
return package_dict
|
|
||||||
|
|
||||||
def transform_to_iso(self, original_document, original_format, harvest_object):
|
|
||||||
'''
|
|
||||||
DEPRECATED: Use the transform_to_iso method of the ISpatialHarvester
|
|
||||||
interface
|
|
||||||
'''
|
|
||||||
self.__base_transform_to_iso_called = True
|
|
||||||
return None
|
|
||||||
|
|
||||||
def import_stage(self, harvest_object):
|
|
||||||
context = {
|
|
||||||
'model': model,
|
|
||||||
'session': model.Session,
|
|
||||||
'user': self._get_user_name(),
|
|
||||||
}
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__ + '.import')
|
|
||||||
log.debug('Import stage for harvest object: %s', harvest_object.id)
|
|
||||||
|
|
||||||
if not harvest_object:
|
|
||||||
log.error('No harvest object received')
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._set_source_config(harvest_object.source.config)
|
|
||||||
|
|
||||||
if self.force_import:
|
|
||||||
status = 'change'
|
|
||||||
else:
|
|
||||||
status = self._get_object_extra(harvest_object, 'status')
|
|
||||||
|
|
||||||
# Get the last harvested object (if any)
|
|
||||||
previous_object = model.Session.query(HarvestObject) \
|
|
||||||
.filter(HarvestObject.guid==harvest_object.guid) \
|
|
||||||
.filter(HarvestObject.current==True) \
|
|
||||||
.first()
|
|
||||||
|
|
||||||
if status == 'delete':
|
|
||||||
# Delete package
|
|
||||||
context.update({
|
|
||||||
'ignore_auth': True,
|
|
||||||
})
|
|
||||||
p.toolkit.get_action('package_delete')(context, {'id': harvest_object.package_id})
|
|
||||||
log.info('Deleted package {0} with guid {1}'.format(harvest_object.package_id, harvest_object.guid))
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Check if it is a non ISO document
|
|
||||||
original_document = self._get_object_extra(harvest_object, 'original_document')
|
|
||||||
original_format = self._get_object_extra(harvest_object, 'original_format')
|
|
||||||
if original_document and original_format:
|
|
||||||
#DEPRECATED use the ISpatialHarvester interface method
|
|
||||||
self.__base_transform_to_iso_called = False
|
|
||||||
content = self.transform_to_iso(original_document, original_format, harvest_object)
|
|
||||||
if not self.__base_transform_to_iso_called:
|
|
||||||
log.warn('Deprecation warning: calling transform_to_iso directly is deprecated. ' +
|
|
||||||
'Please use the ISpatialHarvester interface method instead.')
|
|
||||||
|
|
||||||
for harvester in p.PluginImplementations(ISpatialHarvester):
|
|
||||||
content = harvester.transform_to_iso(original_document, original_format, harvest_object)
|
|
||||||
|
|
||||||
if content:
|
|
||||||
harvest_object.content = content
|
|
||||||
else:
|
|
||||||
self._save_object_error('Transformation to ISO failed', harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
if harvest_object.content is None:
|
|
||||||
self._save_object_error('Empty content for object {0}'.format(harvest_object.id), harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Validate ISO document
|
|
||||||
is_valid, profile, errors = self._validate_document(harvest_object.content, harvest_object)
|
|
||||||
if not is_valid:
|
|
||||||
# If validation errors were found, import will stop unless
|
|
||||||
# configuration per source or per instance says otherwise
|
|
||||||
continue_import = p.toolkit.asbool(config.get('ckanext.spatial.harvest.continue_on_validation_errors', False)) or \
|
|
||||||
self.source_config.get('continue_on_validation_errors')
|
|
||||||
if not continue_import:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Parse ISO document
|
|
||||||
try:
|
|
||||||
|
|
||||||
iso_parser = ISODocument(harvest_object.content)
|
|
||||||
iso_values = iso_parser.read_values()
|
|
||||||
except Exception, e:
|
|
||||||
self._save_object_error('Error parsing ISO document for object {0}: {1}'.format(harvest_object.id, str(e)),
|
|
||||||
harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Flag previous object as not current anymore
|
|
||||||
if previous_object and not self.force_import:
|
|
||||||
previous_object.current = False
|
|
||||||
previous_object.add()
|
|
||||||
|
|
||||||
# Update GUID with the one on the document
|
|
||||||
iso_guid = iso_values['guid']
|
|
||||||
if iso_guid and harvest_object.guid != iso_guid:
|
|
||||||
# First make sure there already aren't current objects
|
|
||||||
# with the same guid
|
|
||||||
existing_object = model.Session.query(HarvestObject.id) \
|
|
||||||
.filter(HarvestObject.guid==iso_guid) \
|
|
||||||
.filter(HarvestObject.current==True) \
|
|
||||||
.first()
|
|
||||||
if existing_object:
|
|
||||||
self._save_object_error('Object {0} already has this guid {1}'.format(existing_object.id, iso_guid),
|
|
||||||
harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
harvest_object.guid = iso_guid
|
|
||||||
harvest_object.add()
|
|
||||||
|
|
||||||
# Generate GUID if not present (i.e. it's a manual import)
|
|
||||||
if not harvest_object.guid:
|
|
||||||
m = hashlib.md5()
|
|
||||||
m.update(harvest_object.content.encode('utf8', 'ignore'))
|
|
||||||
harvest_object.guid = m.hexdigest()
|
|
||||||
harvest_object.add()
|
|
||||||
|
|
||||||
# Get document modified date
|
|
||||||
try:
|
|
||||||
metadata_modified_date = dateutil.parser.parse(iso_values['metadata-date'], ignoretz=True)
|
|
||||||
except ValueError:
|
|
||||||
self._save_object_error('Could not extract reference date for object {0} ({1})'
|
|
||||||
.format(harvest_object.id, iso_values['metadata-date']), harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
harvest_object.metadata_modified_date = metadata_modified_date
|
|
||||||
harvest_object.add()
|
|
||||||
|
|
||||||
|
|
||||||
# Build the package dict
|
|
||||||
package_dict = self.get_package_dict(iso_values, harvest_object)
|
|
||||||
for harvester in p.PluginImplementations(ISpatialHarvester):
|
|
||||||
package_dict = harvester.get_package_dict(context, {
|
|
||||||
'package_dict': package_dict,
|
|
||||||
'iso_values': iso_values,
|
|
||||||
'xml_tree': iso_parser.xml_tree,
|
|
||||||
'harvest_object': harvest_object,
|
|
||||||
})
|
|
||||||
if not package_dict:
|
|
||||||
log.error('No package dict returned, aborting import for object {0}'.format(harvest_object.id))
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Create / update the package
|
|
||||||
context.update({
|
|
||||||
'extras_as_string': True,
|
|
||||||
'api_version': '2',
|
|
||||||
'return_id_only': True})
|
|
||||||
|
|
||||||
if self._site_user and context['user'] == self._site_user['name']:
|
|
||||||
context['ignore_auth'] = True
|
|
||||||
|
|
||||||
|
|
||||||
# The default package schema does not like Upper case tags
|
|
||||||
tag_schema = logic.schema.default_tags_schema()
|
|
||||||
tag_schema['name'] = [not_empty, unicode]
|
|
||||||
|
|
||||||
# Flag this object as the current one
|
|
||||||
harvest_object.current = True
|
|
||||||
harvest_object.add()
|
|
||||||
|
|
||||||
if status == 'new':
|
|
||||||
package_schema = logic.schema.default_create_package_schema()
|
|
||||||
package_schema['tags'] = tag_schema
|
|
||||||
context['schema'] = package_schema
|
|
||||||
|
|
||||||
# We need to explicitly provide a package ID, otherwise ckanext-spatial
|
|
||||||
# won't be be able to link the extent to the package.
|
|
||||||
package_dict['id'] = unicode(uuid.uuid4())
|
|
||||||
package_schema['id'] = [unicode]
|
|
||||||
|
|
||||||
# Save reference to the package on the object
|
|
||||||
harvest_object.package_id = package_dict['id']
|
|
||||||
harvest_object.add()
|
|
||||||
# Defer constraints and flush so the dataset can be indexed with
|
|
||||||
# the harvest object id (on the after_show hook from the harvester
|
|
||||||
# plugin)
|
|
||||||
model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
|
|
||||||
model.Session.flush()
|
|
||||||
|
|
||||||
try:
|
|
||||||
package_id = p.toolkit.get_action('package_create')(context, package_dict)
|
|
||||||
log.info('Created new package %s with guid %s', package_id, harvest_object.guid)
|
|
||||||
except p.toolkit.ValidationError, e:
|
|
||||||
self._save_object_error('Validation Error: %s' % str(e.error_summary), harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
elif status == 'change':
|
|
||||||
|
|
||||||
# Check if the modified date is more recent
|
|
||||||
if not self.force_import and previous_object and harvest_object.metadata_modified_date <= previous_object.metadata_modified_date:
|
|
||||||
|
|
||||||
# Assign the previous job id to the new object to
|
|
||||||
# avoid losing history
|
|
||||||
harvest_object.harvest_job_id = previous_object.job.id
|
|
||||||
harvest_object.add()
|
|
||||||
|
|
||||||
# Delete the previous object to avoid cluttering the object table
|
|
||||||
previous_object.delete()
|
|
||||||
|
|
||||||
# Reindex the corresponding package to update the reference to the
|
|
||||||
# harvest object
|
|
||||||
if ((config.get('ckanext.spatial.harvest.reindex_unchanged', True) != 'False'
|
|
||||||
or self.source_config.get('reindex_unchanged') != 'False')
|
|
||||||
and harvest_object.package_id):
|
|
||||||
context.update({'validate': False, 'ignore_auth': True})
|
|
||||||
try:
|
|
||||||
package_dict = logic.get_action('package_show')(context,
|
|
||||||
{'id': harvest_object.package_id})
|
|
||||||
except p.toolkit.ObjectNotFound:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
for extra in package_dict.get('extras', []):
|
|
||||||
if extra['key'] == 'harvest_object_id':
|
|
||||||
extra['value'] = harvest_object.id
|
|
||||||
if package_dict:
|
|
||||||
package_index = PackageSearchIndex()
|
|
||||||
package_index.index_package(package_dict)
|
|
||||||
|
|
||||||
log.info('Document with GUID %s unchanged, skipping...' % (harvest_object.guid))
|
|
||||||
else:
|
|
||||||
package_schema = logic.schema.default_update_package_schema()
|
|
||||||
package_schema['tags'] = tag_schema
|
|
||||||
context['schema'] = package_schema
|
|
||||||
|
|
||||||
package_dict['id'] = harvest_object.package_id
|
|
||||||
try:
|
|
||||||
package_id = p.toolkit.get_action('package_update')(context, package_dict)
|
|
||||||
log.info('Updated package %s with guid %s', package_id, harvest_object.guid)
|
|
||||||
except p.toolkit.ValidationError, e:
|
|
||||||
self._save_object_error('Validation Error: %s' % str(e.error_summary), harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
model.Session.commit()
|
|
||||||
|
|
||||||
return True
|
|
||||||
##
|
|
||||||
|
|
||||||
def _is_wms(self, url):
|
|
||||||
'''
|
|
||||||
Checks if the provided URL actually points to a Web Map Service.
|
|
||||||
Uses owslib WMS reader to parse the response.
|
|
||||||
'''
|
|
||||||
try:
|
|
||||||
capabilities_url = wms.WMSCapabilitiesReader().capabilities_url(url)
|
|
||||||
res = urllib2.urlopen(capabilities_url, None, 10)
|
|
||||||
xml = res.read()
|
|
||||||
|
|
||||||
s = wms.WebMapService(url, xml=xml)
|
|
||||||
return isinstance(s.contents, dict) and s.contents != {}
|
|
||||||
except Exception, e:
|
|
||||||
log.error('WMS check for %s failed with exception: %s' % (url, str(e)))
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _get_object_extra(self, harvest_object, key):
|
|
||||||
'''
|
|
||||||
Helper function for retrieving the value from a harvest object extra,
|
|
||||||
given the key
|
|
||||||
'''
|
|
||||||
for extra in harvest_object.extras:
|
|
||||||
if extra.key == key:
|
|
||||||
return extra.value
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _set_source_config(self, config_str):
|
|
||||||
'''
|
|
||||||
Loads the source configuration JSON object into a dict for
|
|
||||||
convenient access
|
|
||||||
'''
|
|
||||||
if config_str:
|
|
||||||
self.source_config = json.loads(config_str)
|
|
||||||
log.debug('Using config: %r', self.source_config)
|
|
||||||
else:
|
|
||||||
self.source_config = {}
|
|
||||||
|
|
||||||
def _get_validator(self):
|
|
||||||
'''
|
|
||||||
Returns the validator object using the relevant profiles
|
|
||||||
|
|
||||||
The profiles to be used are assigned in the following order:
|
|
||||||
|
|
||||||
1. 'validator_profiles' property of the harvest source config object
|
|
||||||
2. 'ckan.spatial.validator.profiles' configuration option in the ini file
|
|
||||||
3. Default value as defined in DEFAULT_VALIDATOR_PROFILES
|
|
||||||
'''
|
|
||||||
if not hasattr(self, '_validator'):
|
|
||||||
if hasattr(self, 'source_config') and self.source_config.get('validator_profiles', None):
|
|
||||||
profiles = self.source_config.get('validator_profiles')
|
|
||||||
elif config.get('ckan.spatial.validator.profiles', None):
|
|
||||||
profiles = [
|
|
||||||
x.strip() for x in
|
|
||||||
config.get('ckan.spatial.validator.profiles').split(',')
|
|
||||||
]
|
|
||||||
else:
|
|
||||||
profiles = DEFAULT_VALIDATOR_PROFILES
|
|
||||||
self._validator = Validators(profiles=profiles)
|
|
||||||
|
|
||||||
# Add any custom validators from extensions
|
|
||||||
for plugin_with_validators in p.PluginImplementations(ISpatialHarvester):
|
|
||||||
custom_validators = plugin_with_validators.get_validators()
|
|
||||||
for custom_validator in custom_validators:
|
|
||||||
if custom_validator not in all_validators:
|
|
||||||
self._validator.add_validator(custom_validator)
|
|
||||||
|
|
||||||
|
|
||||||
return self._validator
|
|
||||||
|
|
||||||
def _get_user_name(self):
|
|
||||||
'''
|
|
||||||
Returns the name of the user that will perform the harvesting actions
|
|
||||||
(deleting, updating and creating datasets)
|
|
||||||
|
|
||||||
By default this will be the internal site admin user. This is the
|
|
||||||
recommended setting, but if necessary it can be overridden with the
|
|
||||||
`ckanext.spatial.harvest.user_name` config option, eg to support the
|
|
||||||
old hardcoded 'harvest' user:
|
|
||||||
|
|
||||||
ckanext.spatial.harvest.user_name = harvest
|
|
||||||
|
|
||||||
'''
|
|
||||||
if self._user_name:
|
|
||||||
return self._user_name
|
|
||||||
|
|
||||||
context = {'model': model,
|
|
||||||
'ignore_auth': True,
|
|
||||||
'defer_commit': True, # See ckan/ckan#1714
|
|
||||||
}
|
|
||||||
self._site_user = p.toolkit.get_action('get_site_user')(context, {})
|
|
||||||
|
|
||||||
config_user_name = config.get('ckanext.spatial.harvest.user_name')
|
|
||||||
if config_user_name:
|
|
||||||
self._user_name = config_user_name
|
|
||||||
else:
|
|
||||||
self._user_name = self._site_user['name']
|
|
||||||
|
|
||||||
return self._user_name
|
|
||||||
|
|
||||||
def _get_content(self, url):
|
|
||||||
'''
|
|
||||||
DEPRECATED: Use _get_content_as_unicode instead
|
|
||||||
'''
|
|
||||||
|
|
||||||
parts = urlparse.urlsplit(url)
|
|
||||||
if parts.username and parts.password:
|
|
||||||
url_path = parts.path.rsplit('/', 1)[0]
|
|
||||||
url_path = url_path + '/xml.user.login'
|
|
||||||
if parts.port is None:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
url_path,
|
|
||||||
parts.query,
|
|
||||||
parts.fragment
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
url_path,
|
|
||||||
parts.query
|
|
||||||
))
|
|
||||||
log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
|
||||||
auth_data = Document()
|
|
||||||
root = auth_data.createElement('request')
|
|
||||||
auth_data.appendChild(root)
|
|
||||||
username_tag = auth_data.createElement('username')
|
|
||||||
user_data = auth_data.createTextNode(parts.username)
|
|
||||||
username_tag.appendChild(user_data)
|
|
||||||
root.appendChild(username_tag)
|
|
||||||
password_tag = auth_data.createElement('password')
|
|
||||||
password_data = auth_data.createTextNode(parts.password)
|
|
||||||
password_tag.appendChild(password_data)
|
|
||||||
root.appendChild(password_tag)
|
|
||||||
xml_auth_data = auth_data.toprettyxml(indent=" ")
|
|
||||||
|
|
||||||
req_headers = {'Content-Type': 'application/xml'}
|
|
||||||
|
|
||||||
sess = requests.Session()
|
|
||||||
req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers)
|
|
||||||
opener = urllib2.build_opener()
|
|
||||||
|
|
||||||
url = url.replace(' ', '%20')
|
|
||||||
if opener:
|
|
||||||
http_response = opener.open(url)
|
|
||||||
else:
|
|
||||||
http_response = urllib2.urlopen(url)
|
|
||||||
return http_response.read()
|
|
||||||
|
|
||||||
def _get_content_as_unicode(self, url):
|
|
||||||
'''
|
|
||||||
Get remote content as unicode.
|
|
||||||
|
|
||||||
We let requests handle the conversion [1] , which will use the
|
|
||||||
content-type header first or chardet if the header is missing
|
|
||||||
(requests uses its own embedded chardet version).
|
|
||||||
|
|
||||||
As we will be storing and serving the contents as unicode, we actually
|
|
||||||
replace the original XML encoding declaration with an UTF-8 one.
|
|
||||||
|
|
||||||
|
|
||||||
[1] http://github.com/kennethreitz/requests/blob/63243b1e3b435c7736acf1e51c0f6fa6666d861d/requests/models.py#L811
|
|
||||||
|
|
||||||
'''
|
|
||||||
parts = urlparse.urlsplit(url)
|
|
||||||
if parts.username and parts.password:
|
|
||||||
url_path = parts.path.rsplit('/', 1)[0]
|
|
||||||
url_path = url_path + '/xml.user.login'
|
|
||||||
if parts.port is None:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
url_path,
|
|
||||||
parts.query,
|
|
||||||
parts.fragment
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
url_path,
|
|
||||||
parts.query
|
|
||||||
))
|
|
||||||
log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
|
||||||
auth_data = Document()
|
|
||||||
root = auth_data.createElement('request')
|
|
||||||
auth_data.appendChild(root)
|
|
||||||
username_tag = auth_data.createElement('username')
|
|
||||||
user_data = auth_data.createTextNode(parts.username)
|
|
||||||
username_tag.appendChild(user_data)
|
|
||||||
root.appendChild(username_tag)
|
|
||||||
password_tag = auth_data.createElement('password')
|
|
||||||
password_data = auth_data.createTextNode(parts.password)
|
|
||||||
password_tag.appendChild(password_data)
|
|
||||||
root.appendChild(password_tag)
|
|
||||||
xml_auth_data = auth_data.toprettyxml(indent=" ")
|
|
||||||
|
|
||||||
req_headers = {'Content-Type': 'application/xml'}
|
|
||||||
|
|
||||||
geo_session = requests.Session()
|
|
||||||
geo_session.post(url=auth_url, data=xml_auth_data, headers=req_headers)
|
|
||||||
|
|
||||||
url = url.replace(' ', '%20')
|
|
||||||
if geo_session:
|
|
||||||
response = geo_session.get(url, timeout=10)
|
|
||||||
else:
|
|
||||||
response = requests.get(url, timeout=10)
|
|
||||||
|
|
||||||
content = response.text
|
|
||||||
|
|
||||||
# Remove original XML declaration
|
|
||||||
content = re.sub('<\?xml(.*)\?>', '', content)
|
|
||||||
|
|
||||||
# Get rid of the BOM and other rubbish at the beginning of the file
|
|
||||||
content = re.sub('.*?<', '<', content, 1)
|
|
||||||
content = content[content.index('<'):]
|
|
||||||
|
|
||||||
return content
|
|
||||||
|
|
||||||
def _validate_document(self, document_string, harvest_object, validator=None):
|
|
||||||
'''
|
|
||||||
Validates an XML document with the default, or if present, the
|
|
||||||
provided validators.
|
|
||||||
|
|
||||||
It will create a HarvestObjectError for each validation error found,
|
|
||||||
so they can be shown properly on the frontend.
|
|
||||||
|
|
||||||
Returns a tuple, with a boolean showing whether the validation passed
|
|
||||||
or not, the profile used and a list of errors (tuples with error
|
|
||||||
message and error lines if present).
|
|
||||||
'''
|
|
||||||
if not validator:
|
|
||||||
validator = self._get_validator()
|
|
||||||
|
|
||||||
document_string = re.sub('<\?xml(.*)\?>', '', document_string)
|
|
||||||
|
|
||||||
try:
|
|
||||||
xml = etree.fromstring(document_string)
|
|
||||||
except etree.XMLSyntaxError, e:
|
|
||||||
self._save_object_error('Could not parse XML file: {0}'.format(str(e)), harvest_object, 'Import')
|
|
||||||
return False, None, []
|
|
||||||
|
|
||||||
valid, profile, errors = validator.is_valid(xml)
|
|
||||||
if not valid:
|
|
||||||
log.error('Validation errors found using profile {0} for object with GUID {1}'.format(profile, harvest_object.guid))
|
|
||||||
for error in errors:
|
|
||||||
self._save_object_error(error[0], harvest_object, 'Validation', line=error[1])
|
|
||||||
|
|
||||||
return valid, profile, errors
|
|
|
@ -1,574 +0,0 @@
|
||||||
import urllib
|
|
||||||
import urllib2
|
|
||||||
import urlparse
|
|
||||||
import httplib
|
|
||||||
import datetime
|
|
||||||
import socket
|
|
||||||
|
|
||||||
from sqlalchemy import exists
|
|
||||||
|
|
||||||
from ckan.lib.base import c
|
|
||||||
from ckan import model
|
|
||||||
from ckan.logic import ValidationError, NotFound, get_action
|
|
||||||
from ckan.lib.helpers import json
|
|
||||||
from ckan.lib.munge import munge_name
|
|
||||||
from ckan.plugins import toolkit
|
|
||||||
|
|
||||||
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError
|
|
||||||
|
|
||||||
import logging
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
from base import HarvesterBase
|
|
||||||
|
|
||||||
|
|
||||||
class CKANHarvester(HarvesterBase):
|
|
||||||
'''
|
|
||||||
A Harvester for CKAN instances
|
|
||||||
'''
|
|
||||||
config = None
|
|
||||||
|
|
||||||
api_version = 2
|
|
||||||
action_api_version = 3
|
|
||||||
|
|
||||||
def _get_action_api_offset(self):
|
|
||||||
return '/api/%d/action' % self.action_api_version
|
|
||||||
|
|
||||||
def _get_search_api_offset(self):
|
|
||||||
return '%s/package_search' % self._get_action_api_offset()
|
|
||||||
|
|
||||||
def _get_content(self, url):
|
|
||||||
http_request = urllib2.Request(url=url)
|
|
||||||
|
|
||||||
api_key = self.config.get('api_key')
|
|
||||||
if api_key:
|
|
||||||
http_request.add_header('Authorization', api_key)
|
|
||||||
|
|
||||||
try:
|
|
||||||
http_response = urllib2.urlopen(http_request)
|
|
||||||
except urllib2.HTTPError, e:
|
|
||||||
if e.getcode() == 404:
|
|
||||||
raise ContentNotFoundError('HTTP error: %s' % e.code)
|
|
||||||
else:
|
|
||||||
raise ContentFetchError('HTTP error: %s' % e.code)
|
|
||||||
except urllib2.URLError, e:
|
|
||||||
raise ContentFetchError('URL error: %s' % e.reason)
|
|
||||||
except httplib.HTTPException, e:
|
|
||||||
raise ContentFetchError('HTTP Exception: %s' % e)
|
|
||||||
except socket.error, e:
|
|
||||||
raise ContentFetchError('HTTP socket error: %s' % e)
|
|
||||||
except Exception, e:
|
|
||||||
raise ContentFetchError('HTTP general exception: %s' % e)
|
|
||||||
return http_response.read()
|
|
||||||
|
|
||||||
def _get_group(self, base_url, group_name):
|
|
||||||
url = base_url + self._get_action_api_offset() + '/group_show?id=' + \
|
|
||||||
munge_name(group_name)
|
|
||||||
try:
|
|
||||||
content = self._get_content(url)
|
|
||||||
return json.loads(content)
|
|
||||||
except (ContentFetchError, ValueError):
|
|
||||||
log.debug('Could not fetch/decode remote group')
|
|
||||||
raise RemoteResourceError('Could not fetch/decode remote group')
|
|
||||||
|
|
||||||
def _get_organization(self, base_url, org_name):
|
|
||||||
url = base_url + self._get_action_api_offset() + \
|
|
||||||
'/organization_show?id=' + org_name
|
|
||||||
try:
|
|
||||||
content = self._get_content(url)
|
|
||||||
content_dict = json.loads(content)
|
|
||||||
return content_dict['result']
|
|
||||||
except (ContentFetchError, ValueError, KeyError):
|
|
||||||
log.debug('Could not fetch/decode remote group')
|
|
||||||
raise RemoteResourceError(
|
|
||||||
'Could not fetch/decode remote organization')
|
|
||||||
|
|
||||||
def _set_config(self, config_str):
|
|
||||||
if config_str:
|
|
||||||
self.config = json.loads(config_str)
|
|
||||||
if 'api_version' in self.config:
|
|
||||||
self.api_version = int(self.config['api_version'])
|
|
||||||
|
|
||||||
log.debug('Using config: %r', self.config)
|
|
||||||
else:
|
|
||||||
self.config = {}
|
|
||||||
|
|
||||||
def info(self):
|
|
||||||
return {
|
|
||||||
'name': 'ckan',
|
|
||||||
'title': 'CKAN',
|
|
||||||
'description': 'Harvests remote CKAN instances',
|
|
||||||
'form_config_interface': 'Text'
|
|
||||||
}
|
|
||||||
|
|
||||||
def validate_config(self, config):
|
|
||||||
if not config:
|
|
||||||
return config
|
|
||||||
|
|
||||||
try:
|
|
||||||
config_obj = json.loads(config)
|
|
||||||
|
|
||||||
if 'api_version' in config_obj:
|
|
||||||
try:
|
|
||||||
int(config_obj['api_version'])
|
|
||||||
except ValueError:
|
|
||||||
raise ValueError('api_version must be an integer')
|
|
||||||
|
|
||||||
if 'default_tags' in config_obj:
|
|
||||||
if not isinstance(config_obj['default_tags'], list):
|
|
||||||
raise ValueError('default_tags must be a list')
|
|
||||||
|
|
||||||
if 'default_groups' in config_obj:
|
|
||||||
if not isinstance(config_obj['default_groups'], list):
|
|
||||||
raise ValueError('default_groups must be a list')
|
|
||||||
|
|
||||||
# Check if default groups exist
|
|
||||||
context = {'model': model, 'user': c.user}
|
|
||||||
for group_name in config_obj['default_groups']:
|
|
||||||
try:
|
|
||||||
group = get_action('group_show')(
|
|
||||||
context, {'id': group_name})
|
|
||||||
except NotFound, e:
|
|
||||||
raise ValueError('Default group not found')
|
|
||||||
|
|
||||||
if 'default_extras' in config_obj:
|
|
||||||
if not isinstance(config_obj['default_extras'], dict):
|
|
||||||
raise ValueError('default_extras must be a dictionary')
|
|
||||||
|
|
||||||
if 'user' in config_obj:
|
|
||||||
# Check if user exists
|
|
||||||
context = {'model': model, 'user': c.user}
|
|
||||||
try:
|
|
||||||
user = get_action('user_show')(
|
|
||||||
context, {'id': config_obj.get('user')})
|
|
||||||
except NotFound:
|
|
||||||
raise ValueError('User not found')
|
|
||||||
|
|
||||||
for key in ('read_only', 'force_all'):
|
|
||||||
if key in config_obj:
|
|
||||||
if not isinstance(config_obj[key], bool):
|
|
||||||
raise ValueError('%s must be boolean' % key)
|
|
||||||
|
|
||||||
except ValueError, e:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
return config
|
|
||||||
|
|
||||||
def gather_stage(self, harvest_job):
|
|
||||||
log.debug('In CKANHarvester gather_stage (%s)',
|
|
||||||
harvest_job.source.url)
|
|
||||||
toolkit.requires_ckan_version(min_version='2.0')
|
|
||||||
get_all_packages = True
|
|
||||||
|
|
||||||
self._set_config(harvest_job.source.config)
|
|
||||||
|
|
||||||
# URL cleanup
|
|
||||||
parts = urlparse.urlsplit(harvest_job.source.url)
|
|
||||||
if parts.username and parts.password:
|
|
||||||
if parts.port is None:
|
|
||||||
cleaned_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
url_path,
|
|
||||||
parts.query,
|
|
||||||
parts.fragment
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
cleaned_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
url_path,
|
|
||||||
parts.query
|
|
||||||
))
|
|
||||||
|
|
||||||
# Get source URL
|
|
||||||
remote_ckan_base_url = cleaned_url.rstrip('/')
|
|
||||||
|
|
||||||
# Filter in/out datasets from particular organizations
|
|
||||||
fq_terms = []
|
|
||||||
org_filter_include = self.config.get('organizations_filter_include', [])
|
|
||||||
org_filter_exclude = self.config.get('organizations_filter_exclude', [])
|
|
||||||
if org_filter_include:
|
|
||||||
fq_terms.append(' OR '.join(
|
|
||||||
'organization:%s' % org_name for org_name in org_filter_include))
|
|
||||||
elif org_filter_exclude:
|
|
||||||
fq_terms.extend(
|
|
||||||
'-organization:%s' % org_name for org_name in org_filter_exclude)
|
|
||||||
|
|
||||||
# Ideally we can request from the remote CKAN only those datasets
|
|
||||||
# modified since the last completely successful harvest.
|
|
||||||
last_error_free_job = self._last_error_free_job(harvest_job)
|
|
||||||
log.debug('Last error-free job: %r', last_error_free_job)
|
|
||||||
if (last_error_free_job and
|
|
||||||
not self.config.get('force_all', False)):
|
|
||||||
get_all_packages = False
|
|
||||||
|
|
||||||
# Request only the datasets modified since
|
|
||||||
last_time = last_error_free_job.gather_started
|
|
||||||
# Note: SOLR works in UTC, and gather_started is also UTC, so
|
|
||||||
# this should work as long as local and remote clocks are
|
|
||||||
# relatively accurate. Going back a little earlier, just in case.
|
|
||||||
get_changes_since = \
|
|
||||||
(last_time - datetime.timedelta(hours=1)).isoformat()
|
|
||||||
log.info('Searching for datasets modified since: %s UTC',
|
|
||||||
get_changes_since)
|
|
||||||
|
|
||||||
fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \
|
|
||||||
.format(since=get_changes_since)
|
|
||||||
|
|
||||||
try:
|
|
||||||
pkg_dicts = self._search_for_datasets(
|
|
||||||
remote_ckan_base_url,
|
|
||||||
fq_terms + [fq_since_last_time])
|
|
||||||
except SearchError, e:
|
|
||||||
log.info('Searching for datasets changed since last time '
|
|
||||||
'gave an error: %s', e)
|
|
||||||
get_all_packages = True
|
|
||||||
|
|
||||||
if not get_all_packages and not pkg_dicts:
|
|
||||||
log.info('No datasets have been updated on the remote '
|
|
||||||
'CKAN instance since the last harvest job %s',
|
|
||||||
last_time)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Fall-back option - request all the datasets from the remote CKAN
|
|
||||||
if get_all_packages:
|
|
||||||
# Request all remote packages
|
|
||||||
try:
|
|
||||||
pkg_dicts = self._search_for_datasets(remote_ckan_base_url,
|
|
||||||
fq_terms)
|
|
||||||
except SearchError, e:
|
|
||||||
log.info('Searching for all datasets gave an error: %s', e)
|
|
||||||
self._save_gather_error(
|
|
||||||
'Unable to search remote CKAN for datasets:%s url:%s'
|
|
||||||
'terms:%s' % (e, remote_ckan_base_url, fq_terms),
|
|
||||||
harvest_job)
|
|
||||||
return None
|
|
||||||
if not pkg_dicts:
|
|
||||||
self._save_gather_error(
|
|
||||||
'No datasets found at CKAN: %s' % remote_ckan_base_url,
|
|
||||||
harvest_job)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Create harvest objects for each dataset
|
|
||||||
try:
|
|
||||||
package_ids = set()
|
|
||||||
object_ids = []
|
|
||||||
for pkg_dict in pkg_dicts:
|
|
||||||
if pkg_dict['id'] in package_ids:
|
|
||||||
log.info('Discarding duplicate dataset %s - probably due '
|
|
||||||
'to datasets being changed at the same time as '
|
|
||||||
'when the harvester was paging through',
|
|
||||||
pkg_dict['id'])
|
|
||||||
continue
|
|
||||||
package_ids.add(pkg_dict['id'])
|
|
||||||
|
|
||||||
log.debug('Creating HarvestObject for %s %s',
|
|
||||||
pkg_dict['name'], pkg_dict['id'])
|
|
||||||
obj = HarvestObject(guid=pkg_dict['id'],
|
|
||||||
job=harvest_job,
|
|
||||||
content=json.dumps(pkg_dict))
|
|
||||||
obj.save()
|
|
||||||
object_ids.append(obj.id)
|
|
||||||
|
|
||||||
return object_ids
|
|
||||||
except Exception, e:
|
|
||||||
self._save_gather_error('%r' % e.message, harvest_job)
|
|
||||||
|
|
||||||
def _search_for_datasets(self, remote_ckan_base_url, fq_terms=None):
|
|
||||||
'''Does a dataset search on a remote CKAN and returns the results.
|
|
||||||
|
|
||||||
Deals with paging to return all the results, not just the first page.
|
|
||||||
'''
|
|
||||||
base_search_url = remote_ckan_base_url + self._get_search_api_offset()
|
|
||||||
params = {'rows': '100', 'start': '0'}
|
|
||||||
# There is the worry that datasets will be changed whilst we are paging
|
|
||||||
# through them.
|
|
||||||
# * In SOLR 4.7 there is a cursor, but not using that yet
|
|
||||||
# because few CKANs are running that version yet.
|
|
||||||
# * However we sort, then new names added or removed before the current
|
|
||||||
# page would cause existing names on the next page to be missed or
|
|
||||||
# double counted.
|
|
||||||
# * Another approach might be to sort by metadata_modified and always
|
|
||||||
# ask for changes since (and including) the date of the last item of
|
|
||||||
# the day before. However if the entire page is of the exact same
|
|
||||||
# time, then you end up in an infinite loop asking for the same page.
|
|
||||||
# * We choose a balanced approach of sorting by ID, which means
|
|
||||||
# datasets are only missed if some are removed, which is far less
|
|
||||||
# likely than any being added. If some are missed then it is assumed
|
|
||||||
# they will harvested the next time anyway. When datasets are added,
|
|
||||||
# we are at risk of seeing datasets twice in the paging, so we detect
|
|
||||||
# and remove any duplicates.
|
|
||||||
params['sort'] = 'id asc'
|
|
||||||
if fq_terms:
|
|
||||||
params['fq'] = ' '.join(fq_terms)
|
|
||||||
|
|
||||||
pkg_dicts = []
|
|
||||||
pkg_ids = set()
|
|
||||||
previous_content = None
|
|
||||||
while True:
|
|
||||||
url = base_search_url + '?' + urllib.urlencode(params)
|
|
||||||
log.debug('Searching for CKAN datasets: %s', url)
|
|
||||||
try:
|
|
||||||
content = self._get_content(url)
|
|
||||||
except ContentFetchError, e:
|
|
||||||
raise SearchError(
|
|
||||||
'Error sending request to search remote '
|
|
||||||
'CKAN instance %s using URL %r. Error: %s' %
|
|
||||||
(remote_ckan_base_url, url, e))
|
|
||||||
|
|
||||||
if previous_content and content == previous_content:
|
|
||||||
raise SearchError('The paging doesn\'t seem to work. URL: %s' %
|
|
||||||
url)
|
|
||||||
try:
|
|
||||||
response_dict = json.loads(content)
|
|
||||||
except ValueError:
|
|
||||||
raise SearchError('Response from remote CKAN was not JSON: %r'
|
|
||||||
% content)
|
|
||||||
try:
|
|
||||||
pkg_dicts_page = response_dict.get('result', {}).get('results',
|
|
||||||
[])
|
|
||||||
except ValueError:
|
|
||||||
raise SearchError('Response JSON did not contain '
|
|
||||||
'result/results: %r' % response_dict)
|
|
||||||
|
|
||||||
# Weed out any datasets found on previous pages (should datasets be
|
|
||||||
# changing while we page)
|
|
||||||
ids_in_page = set(p['id'] for p in pkg_dicts_page)
|
|
||||||
duplicate_ids = ids_in_page & pkg_ids
|
|
||||||
if duplicate_ids:
|
|
||||||
pkg_dicts_page = [p for p in pkg_dicts_page
|
|
||||||
if p['id'] not in duplicate_ids]
|
|
||||||
pkg_ids |= ids_in_page
|
|
||||||
|
|
||||||
pkg_dicts.extend(pkg_dicts_page)
|
|
||||||
|
|
||||||
if len(pkg_dicts_page) == 0:
|
|
||||||
break
|
|
||||||
|
|
||||||
params['start'] = str(int(params['start']) + int(params['rows']))
|
|
||||||
|
|
||||||
return pkg_dicts
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def _last_error_free_job(cls, harvest_job):
|
|
||||||
# TODO weed out cancelled jobs somehow.
|
|
||||||
# look for jobs with no gather errors
|
|
||||||
jobs = \
|
|
||||||
model.Session.query(HarvestJob) \
|
|
||||||
.filter(HarvestJob.source == harvest_job.source) \
|
|
||||||
.filter(HarvestJob.gather_started != None) \
|
|
||||||
.filter(HarvestJob.status == 'Finished') \
|
|
||||||
.filter(HarvestJob.id != harvest_job.id) \
|
|
||||||
.filter(
|
|
||||||
~exists().where(
|
|
||||||
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
|
|
||||||
.order_by(HarvestJob.gather_started.desc())
|
|
||||||
# now check them until we find one with no fetch/import errors
|
|
||||||
# (looping rather than doing sql, in case there are lots of objects
|
|
||||||
# and lots of jobs)
|
|
||||||
for job in jobs:
|
|
||||||
for obj in job.objects:
|
|
||||||
if obj.current is False and \
|
|
||||||
obj.report_status != 'not modified':
|
|
||||||
# unsuccessful, so go onto the next job
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
return job
|
|
||||||
|
|
||||||
def fetch_stage(self, harvest_object):
|
|
||||||
# Nothing to do here - we got the package dict in the search in the
|
|
||||||
# gather stage
|
|
||||||
return True
|
|
||||||
|
|
||||||
def import_stage(self, harvest_object):
|
|
||||||
log.debug('In CKANHarvester import_stage')
|
|
||||||
|
|
||||||
context = {'model': model, 'session': model.Session,
|
|
||||||
'user': self._get_user_name()}
|
|
||||||
if not harvest_object:
|
|
||||||
log.error('No harvest object received')
|
|
||||||
return False
|
|
||||||
|
|
||||||
if harvest_object.content is None:
|
|
||||||
self._save_object_error('Empty content for object %s' %
|
|
||||||
harvest_object.id,
|
|
||||||
harvest_object, 'Import')
|
|
||||||
return False
|
|
||||||
|
|
||||||
self._set_config(harvest_object.job.source.config)
|
|
||||||
|
|
||||||
try:
|
|
||||||
package_dict = json.loads(harvest_object.content)
|
|
||||||
|
|
||||||
if package_dict.get('type') == 'harvest':
|
|
||||||
log.warn('Remote dataset is a harvest source, ignoring...')
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Set default tags if needed
|
|
||||||
default_tags = self.config.get('default_tags', [])
|
|
||||||
if default_tags:
|
|
||||||
if not 'tags' in package_dict:
|
|
||||||
package_dict['tags'] = []
|
|
||||||
package_dict['tags'].extend(
|
|
||||||
[t for t in default_tags if t not in package_dict['tags']])
|
|
||||||
|
|
||||||
remote_groups = self.config.get('remote_groups', None)
|
|
||||||
if not remote_groups in ('only_local', 'create'):
|
|
||||||
# Ignore remote groups
|
|
||||||
package_dict.pop('groups', None)
|
|
||||||
else:
|
|
||||||
if not 'groups' in package_dict:
|
|
||||||
package_dict['groups'] = []
|
|
||||||
|
|
||||||
# check if remote groups exist locally, otherwise remove
|
|
||||||
validated_groups = []
|
|
||||||
|
|
||||||
for group_name in package_dict['groups']:
|
|
||||||
try:
|
|
||||||
data_dict = {'id': group_name}
|
|
||||||
group = get_action('group_show')(context, data_dict)
|
|
||||||
if self.api_version == 1:
|
|
||||||
validated_groups.append(group['name'])
|
|
||||||
else:
|
|
||||||
validated_groups.append(group['id'])
|
|
||||||
except NotFound, e:
|
|
||||||
log.info('Group %s is not available', group_name)
|
|
||||||
if remote_groups == 'create':
|
|
||||||
try:
|
|
||||||
group = self._get_group(harvest_object.source.url, group_name)
|
|
||||||
except RemoteResourceError:
|
|
||||||
log.error('Could not get remote group %s', group_name)
|
|
||||||
continue
|
|
||||||
|
|
||||||
for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name']:
|
|
||||||
group.pop(key, None)
|
|
||||||
|
|
||||||
get_action('group_create')(context, group)
|
|
||||||
log.info('Group %s has been newly created', group_name)
|
|
||||||
if self.api_version == 1:
|
|
||||||
validated_groups.append(group['name'])
|
|
||||||
else:
|
|
||||||
validated_groups.append(group['id'])
|
|
||||||
|
|
||||||
package_dict['groups'] = validated_groups
|
|
||||||
|
|
||||||
|
|
||||||
# Local harvest source organization
|
|
||||||
source_dataset = get_action('package_show')(context, {'id': harvest_object.source.id})
|
|
||||||
local_org = source_dataset.get('owner_org')
|
|
||||||
|
|
||||||
remote_orgs = self.config.get('remote_orgs', None)
|
|
||||||
|
|
||||||
if not remote_orgs in ('only_local', 'create'):
|
|
||||||
# Assign dataset to the source organization
|
|
||||||
package_dict['owner_org'] = local_org
|
|
||||||
else:
|
|
||||||
if not 'owner_org' in package_dict:
|
|
||||||
package_dict['owner_org'] = None
|
|
||||||
|
|
||||||
# check if remote org exist locally, otherwise remove
|
|
||||||
validated_org = None
|
|
||||||
remote_org = package_dict['owner_org']
|
|
||||||
|
|
||||||
if remote_org:
|
|
||||||
try:
|
|
||||||
data_dict = {'id': remote_org}
|
|
||||||
org = get_action('organization_show')(context, data_dict)
|
|
||||||
validated_org = org['id']
|
|
||||||
except NotFound, e:
|
|
||||||
log.info('Organization %s is not available', remote_org)
|
|
||||||
if remote_orgs == 'create':
|
|
||||||
try:
|
|
||||||
try:
|
|
||||||
org = self._get_organization(harvest_object.source.url, remote_org)
|
|
||||||
except RemoteResourceError:
|
|
||||||
# fallback if remote CKAN exposes organizations as groups
|
|
||||||
# this especially targets older versions of CKAN
|
|
||||||
org = self._get_group(harvest_object.source.url, remote_org)
|
|
||||||
|
|
||||||
for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']:
|
|
||||||
org.pop(key, None)
|
|
||||||
get_action('organization_create')(context, org)
|
|
||||||
log.info('Organization %s has been newly created', remote_org)
|
|
||||||
validated_org = org['id']
|
|
||||||
except (RemoteResourceError, ValidationError):
|
|
||||||
log.error('Could not get remote org %s', remote_org)
|
|
||||||
|
|
||||||
package_dict['owner_org'] = validated_org or local_org
|
|
||||||
|
|
||||||
# Set default groups if needed
|
|
||||||
default_groups = self.config.get('default_groups', [])
|
|
||||||
if default_groups:
|
|
||||||
if not 'groups' in package_dict:
|
|
||||||
package_dict['groups'] = []
|
|
||||||
package_dict['groups'].extend(
|
|
||||||
[g for g in default_groups
|
|
||||||
if g not in package_dict['groups']])
|
|
||||||
|
|
||||||
# Set default extras if needed
|
|
||||||
default_extras = self.config.get('default_extras', {})
|
|
||||||
def get_extra(key, package_dict):
|
|
||||||
for extra in package_dict.get('extras', []):
|
|
||||||
if extra['key'] == key:
|
|
||||||
return extra
|
|
||||||
if default_extras:
|
|
||||||
override_extras = self.config.get('override_extras', False)
|
|
||||||
if not 'extras' in package_dict:
|
|
||||||
package_dict['extras'] = {}
|
|
||||||
for key, value in default_extras.iteritems():
|
|
||||||
existing_extra = get_extra(key, package_dict)
|
|
||||||
if existing_extra and not override_extras:
|
|
||||||
continue # no need for the default
|
|
||||||
if existing_extra:
|
|
||||||
package_dict['extras'].remove(existing_extra)
|
|
||||||
# Look for replacement strings
|
|
||||||
if isinstance(value, basestring):
|
|
||||||
value = value.format(
|
|
||||||
harvest_source_id=harvest_object.job.source.id,
|
|
||||||
harvest_source_url=
|
|
||||||
harvest_object.job.source.url.strip('/'),
|
|
||||||
harvest_source_title=
|
|
||||||
harvest_object.job.source.title,
|
|
||||||
harvest_job_id=harvest_object.job.id,
|
|
||||||
harvest_object_id=harvest_object.id,
|
|
||||||
dataset_id=package_dict['id'])
|
|
||||||
|
|
||||||
package_dict['extras'].append({'key': key, 'value': value})
|
|
||||||
|
|
||||||
for resource in package_dict.get('resources', []):
|
|
||||||
# Clear remote url_type for resources (eg datastore, upload) as
|
|
||||||
# we are only creating normal resources with links to the
|
|
||||||
# remote ones
|
|
||||||
resource.pop('url_type', None)
|
|
||||||
|
|
||||||
# Clear revision_id as the revision won't exist on this CKAN
|
|
||||||
# and saving it will cause an IntegrityError with the foreign
|
|
||||||
# key.
|
|
||||||
resource.pop('revision_id', None)
|
|
||||||
|
|
||||||
result = self._create_or_update_package(
|
|
||||||
package_dict, harvest_object, package_dict_form='package_show')
|
|
||||||
|
|
||||||
return result
|
|
||||||
except ValidationError, e:
|
|
||||||
self._save_object_error('Invalid package with GUID %s: %r' %
|
|
||||||
(harvest_object.guid, e.error_dict),
|
|
||||||
harvest_object, 'Import')
|
|
||||||
except Exception, e:
|
|
||||||
self._save_object_error('%s' % e, harvest_object, 'Import')
|
|
||||||
|
|
||||||
|
|
||||||
class ContentFetchError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class ContentNotFoundError(ContentFetchError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
class RemoteResourceError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class SearchError(Exception):
|
|
||||||
pass
|
|
|
@ -1,313 +0,0 @@
|
||||||
import re
|
|
||||||
import urllib
|
|
||||||
import urllib2
|
|
||||||
import requests
|
|
||||||
import urlparse
|
|
||||||
import lxml
|
|
||||||
from xml.dom.minidom import Document
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from ckan import model
|
|
||||||
|
|
||||||
from ckan.plugins.core import SingletonPlugin, implements
|
|
||||||
|
|
||||||
from ckanext.harvest.interfaces import IHarvester
|
|
||||||
from ckanext.harvest.model import HarvestObject
|
|
||||||
from ckanext.harvest.model import HarvestObjectExtra as HOExtra
|
|
||||||
|
|
||||||
from ckanext.spatial.lib.csw_client import CswService
|
|
||||||
from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback
|
|
||||||
|
|
||||||
|
|
||||||
class CSWHarvester(SpatialHarvester, SingletonPlugin):
|
|
||||||
'''
|
|
||||||
A Harvester for CSW servers
|
|
||||||
'''
|
|
||||||
implements(IHarvester)
|
|
||||||
|
|
||||||
csw=None
|
|
||||||
|
|
||||||
def info(self):
|
|
||||||
return {
|
|
||||||
'name': 'csw',
|
|
||||||
'title': 'CSW Server',
|
|
||||||
'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard'
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def get_original_url(self, harvest_object_id):
|
|
||||||
obj = model.Session.query(HarvestObject).\
|
|
||||||
filter(HarvestObject.id==harvest_object_id).\
|
|
||||||
first()
|
|
||||||
|
|
||||||
parts = urlparse.urlparse(obj.source.url)
|
|
||||||
|
|
||||||
params = {
|
|
||||||
'SERVICE': 'CSW',
|
|
||||||
'VERSION': '2.0.2',
|
|
||||||
'REQUEST': 'GetRecordById',
|
|
||||||
'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd',
|
|
||||||
'OUTPUTFORMAT':'application/xml' ,
|
|
||||||
'ID': obj.guid
|
|
||||||
}
|
|
||||||
|
|
||||||
if parts.username and parts.password:
|
|
||||||
if parts.port is None:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.path,
|
|
||||||
None,
|
|
||||||
urllib.urlencode(params),
|
|
||||||
None
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
parts.path,
|
|
||||||
urllib.urlencode(params),
|
|
||||||
None
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.netloc,
|
|
||||||
parts.path,
|
|
||||||
None,
|
|
||||||
urllib.urlencode(params),
|
|
||||||
None
|
|
||||||
))
|
|
||||||
|
|
||||||
return url
|
|
||||||
|
|
||||||
def output_schema(self):
|
|
||||||
return 'gmd'
|
|
||||||
|
|
||||||
def gather_stage(self, harvest_job):
|
|
||||||
log = logging.getLogger(__name__ + '.CSW.gather')
|
|
||||||
log.debug('CswHarvester gather_stage for job: %r', harvest_job)
|
|
||||||
# Get source URL
|
|
||||||
url = harvest_job.source.url
|
|
||||||
|
|
||||||
parts = urlparse.urlsplit(url)
|
|
||||||
if parts.username and parts.password:
|
|
||||||
url_path = parts.path.rsplit('/', 1)[0]
|
|
||||||
url_path = url_path + '/xml.user.login'
|
|
||||||
if parts.port is None:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
url_path,
|
|
||||||
parts.query,
|
|
||||||
parts.fragment
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
auth_url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
url_path,
|
|
||||||
parts.query
|
|
||||||
))
|
|
||||||
log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
|
||||||
auth_data = Document()
|
|
||||||
root = auth_data.createElement('request')
|
|
||||||
auth_data.appendChild(root)
|
|
||||||
username_tag = auth_data.createElement('username')
|
|
||||||
user_data = auth_data.createTextNode(parts.username)
|
|
||||||
username_tag.appendChild(user_data)
|
|
||||||
root.appendChild(username_tag)
|
|
||||||
password_tag = auth_data.createElement('password')
|
|
||||||
password_data = auth_data.createTextNode(parts.password)
|
|
||||||
password_tag.appendChild(password_data)
|
|
||||||
root.appendChild(password_tag)
|
|
||||||
xml_auth_data = auth_data.toprettyxml(indent=" ")
|
|
||||||
|
|
||||||
req_headers = {'Content-Type': 'application/xml'}
|
|
||||||
|
|
||||||
sess = requests.Session()
|
|
||||||
log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url)
|
|
||||||
req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers)
|
|
||||||
log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies)
|
|
||||||
|
|
||||||
if parts.username and parts.password:
|
|
||||||
if parts.port is None:
|
|
||||||
url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.path,
|
|
||||||
parts.query,
|
|
||||||
parts.fragment
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
url = urlparse.urlunsplit((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
parts.path,
|
|
||||||
parts.query
|
|
||||||
))
|
|
||||||
|
|
||||||
self._set_source_config(harvest_job.source.config)
|
|
||||||
|
|
||||||
try:
|
|
||||||
log.debug('Gather stage. Contacting the geoserver, URL is %s', url)
|
|
||||||
res = sess.get(url)
|
|
||||||
log.debug('Gather stage. Geoserver contacted, result is %s', res)
|
|
||||||
self._setup_csw_client(sess.get(url))
|
|
||||||
#self._setup_csw_client(url)
|
|
||||||
except Exception, e:
|
|
||||||
self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job)
|
|
||||||
return None
|
|
||||||
|
|
||||||
query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\
|
|
||||||
filter(HarvestObject.current==True).\
|
|
||||||
filter(HarvestObject.harvest_source_id==harvest_job.source.id)
|
|
||||||
guid_to_package_id = {}
|
|
||||||
|
|
||||||
for guid, package_id in query:
|
|
||||||
guid_to_package_id[guid] = package_id
|
|
||||||
|
|
||||||
guids_in_db = set(guid_to_package_id.keys())
|
|
||||||
|
|
||||||
# extract cql filter if any
|
|
||||||
cql = self.source_config.get('cql')
|
|
||||||
|
|
||||||
log.debug('Starting gathering for %s' % url)
|
|
||||||
guids_in_harvest = set()
|
|
||||||
try:
|
|
||||||
for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql):
|
|
||||||
try:
|
|
||||||
log.info('Got identifier %s from the CSW', identifier)
|
|
||||||
if identifier is None:
|
|
||||||
log.error('CSW returned identifier %r, skipping...' % identifier)
|
|
||||||
continue
|
|
||||||
|
|
||||||
guids_in_harvest.add(identifier)
|
|
||||||
except Exception, e:
|
|
||||||
self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job)
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
except Exception, e:
|
|
||||||
log.error('Exception: %s' % text_traceback())
|
|
||||||
self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job)
|
|
||||||
return None
|
|
||||||
|
|
||||||
new = guids_in_harvest - guids_in_db
|
|
||||||
delete = guids_in_db - guids_in_harvest
|
|
||||||
change = guids_in_db & guids_in_harvest
|
|
||||||
|
|
||||||
ids = []
|
|
||||||
for guid in new:
|
|
||||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
|
||||||
extras=[HOExtra(key='status', value='new')])
|
|
||||||
obj.save()
|
|
||||||
ids.append(obj.id)
|
|
||||||
for guid in change:
|
|
||||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
|
||||||
package_id=guid_to_package_id[guid],
|
|
||||||
extras=[HOExtra(key='status', value='change')])
|
|
||||||
obj.save()
|
|
||||||
ids.append(obj.id)
|
|
||||||
for guid in delete:
|
|
||||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
|
||||||
package_id=guid_to_package_id[guid],
|
|
||||||
extras=[HOExtra(key='status', value='delete')])
|
|
||||||
model.Session.query(HarvestObject).\
|
|
||||||
filter_by(guid=guid).\
|
|
||||||
update({'current': False}, False)
|
|
||||||
obj.save()
|
|
||||||
ids.append(obj.id)
|
|
||||||
|
|
||||||
if len(ids) == 0:
|
|
||||||
self._save_gather_error('No records received from the CSW server', harvest_job)
|
|
||||||
return None
|
|
||||||
|
|
||||||
return ids
|
|
||||||
|
|
||||||
def fetch_stage(self,harvest_object):
|
|
||||||
|
|
||||||
# Check harvest object status
|
|
||||||
status = self._get_object_extra(harvest_object, 'status')
|
|
||||||
|
|
||||||
if status == 'delete':
|
|
||||||
# No need to fetch anything, just pass to the import stage
|
|
||||||
return True
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__ + '.CSW.fetch')
|
|
||||||
log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id)
|
|
||||||
|
|
||||||
url = harvest_object.source.url
|
|
||||||
parts = urlparse.urlparse(url)
|
|
||||||
if parts.username and parts.password:
|
|
||||||
if parts.port is None:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.path,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.hostname,
|
|
||||||
parts.port,
|
|
||||||
parts.path,
|
|
||||||
None,
|
|
||||||
None
|
|
||||||
))
|
|
||||||
else:
|
|
||||||
url = urlparse.urlunparse((
|
|
||||||
parts.scheme,
|
|
||||||
parts.netloc,
|
|
||||||
parts.path,
|
|
||||||
None,
|
|
||||||
None,
|
|
||||||
None
|
|
||||||
))
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._setup_csw_client(url)
|
|
||||||
log.debug('Fetch stage. Contacting the geoserver, URL is %s', url)
|
|
||||||
except Exception, e:
|
|
||||||
self._save_object_error('Error contacting the CSW server: %s' % e,
|
|
||||||
harvest_object)
|
|
||||||
return False
|
|
||||||
|
|
||||||
identifier = harvest_object.guid
|
|
||||||
try:
|
|
||||||
record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema())
|
|
||||||
except Exception, e:
|
|
||||||
self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object)
|
|
||||||
return False
|
|
||||||
|
|
||||||
if record is None:
|
|
||||||
self._save_object_error('Empty record for GUID %s' % identifier,
|
|
||||||
harvest_object)
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Save the fetch contents in the HarvestObject
|
|
||||||
# Contents come from csw_client already declared and encoded as utf-8
|
|
||||||
# Remove original XML declaration
|
|
||||||
content = re.sub('<\?xml(.*)\?>', '', record['xml'])
|
|
||||||
|
|
||||||
harvest_object.content = content.strip()
|
|
||||||
harvest_object.save()
|
|
||||||
except Exception,e:
|
|
||||||
self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \
|
|
||||||
(identifier, e), harvest_object)
|
|
||||||
return False
|
|
||||||
|
|
||||||
log.debug('XML content saved (len %s)', len(record['xml']))
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _setup_csw_client(self, url):
|
|
||||||
self.csw = CswService(url)
|
|
||||||
|
|
|
@ -126,19 +126,3 @@
|
||||||
when: ( install_ldap_plugin | changed )
|
when: ( install_ldap_plugin | changed )
|
||||||
notify: Restart CKAN
|
notify: Restart CKAN
|
||||||
tags: [ 'ckan', 'ckan_ldap', 'ckan_plugins' ]
|
tags: [ 'ckan', 'ckan_ldap', 'ckan_plugins' ]
|
||||||
|
|
||||||
# - name: Overwrite the base.py ckanext-spatial plugin file to enable authentication against the Geonetwork nodes
|
|
||||||
# copy: src=base.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/base.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes
|
|
||||||
# notify: Restart CKAN
|
|
||||||
# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_geo_auth' ]
|
|
||||||
|
|
||||||
# - name: Overwrite the ckanharvester.py ckanext plugin file to remove the authentication data from URLs
|
|
||||||
# copy: src=ckanharvester.py dest=/usr/lib/ckan/default/src/ckanext-harvest/ckanext/harvest/harvesters/ckanharvester.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes
|
|
||||||
# notify: Restart CKAN
|
|
||||||
# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanharvester' ]
|
|
||||||
|
|
||||||
# - name: Overwrite the csw.py ckanext-spatial plugin file to remove the authentication data from URLs
|
|
||||||
# copy: src=csw.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/csw.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes
|
|
||||||
# notify: Restart CKAN
|
|
||||||
# tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanext_spatial' ]
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue