library/roles/ckan: Try new modifications to make the geonetwork authentication work.
This commit is contained in:
parent
d97d521bdc
commit
647a2d830e
|
@ -2,10 +2,11 @@ import re
|
|||
import cgitb
|
||||
import warnings
|
||||
import urllib2
|
||||
import requests
|
||||
import sys
|
||||
import logging
|
||||
from string import Template
|
||||
from urlparse import urlparse
|
||||
import urlparse
|
||||
from datetime import datetime
|
||||
import uuid
|
||||
import hashlib
|
||||
|
@ -15,8 +16,8 @@ import mimetypes
|
|||
|
||||
from pylons import config
|
||||
from owslib import wms
|
||||
import requests
|
||||
from lxml import etree
|
||||
import lxml
|
||||
from xml.dom.minidom import Document
|
||||
|
||||
from ckan import plugins as p
|
||||
from ckan import model
|
||||
|
@ -765,17 +766,28 @@ class SpatialHarvester(HarvesterBase):
|
|||
DEPRECATED: Use _get_content_as_unicode instead
|
||||
'''
|
||||
|
||||
parts = urlparse.urlparse(url)
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.username and parts.password:
|
||||
auth_url = url.rsplit('/', 1)[0]
|
||||
auth_url = auth_url + '/xml.user.login'
|
||||
auth_url = urlparse.urlunparse((
|
||||
url_path = parts.path.rsplit('/', 1)[0]
|
||||
url_path = url_path + '/xml.user.login'
|
||||
if parts.port is None:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.netloc,
|
||||
parts.path
|
||||
parts.hostname,
|
||||
url_path,
|
||||
parts.query,
|
||||
parts.fragment
|
||||
))
|
||||
log.error('Authenticate agains Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
||||
auth_data = minidom.Document()
|
||||
else:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
url_path,
|
||||
parts.query
|
||||
))
|
||||
log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
||||
auth_data = Document()
|
||||
root = auth_data.createElement('request')
|
||||
auth_data.appendChild(root)
|
||||
username_tag = auth_data.createElement('username')
|
||||
|
@ -793,7 +805,6 @@ class SpatialHarvester(HarvesterBase):
|
|||
sess = requests.Session()
|
||||
req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers)
|
||||
opener = urllib2.build_opener()
|
||||
opener.addheaders.append(('Set-Cookie', req.cookie))
|
||||
|
||||
url = url.replace(' ', '%20')
|
||||
if opener:
|
||||
|
@ -817,17 +828,28 @@ class SpatialHarvester(HarvesterBase):
|
|||
[1] http://github.com/kennethreitz/requests/blob/63243b1e3b435c7736acf1e51c0f6fa6666d861d/requests/models.py#L811
|
||||
|
||||
'''
|
||||
parts = urlparse.urlparse(url)
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.username and parts.password:
|
||||
auth_url = url.rsplit('/', 1)[0]
|
||||
auth_url = auth_url + '/xml.user.login'
|
||||
auth_url = urlparse.urlunparse((
|
||||
url_path = parts.path.rsplit('/', 1)[0]
|
||||
url_path = url_path + '/xml.user.login'
|
||||
if parts.port is None:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.netloc,
|
||||
parts.path
|
||||
parts.hostname,
|
||||
url_path,
|
||||
parts.query,
|
||||
parts.fragment
|
||||
))
|
||||
else:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
url_path,
|
||||
parts.query
|
||||
))
|
||||
log.error('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
||||
auth_data = minidom.Document()
|
||||
auth_data = Document()
|
||||
root = auth_data.createElement('request')
|
||||
auth_data.appendChild(root)
|
||||
username_tag = auth_data.createElement('username')
|
||||
|
|
|
@ -0,0 +1,574 @@
|
|||
import urllib
|
||||
import urllib2
|
||||
import urlparse
|
||||
import httplib
|
||||
import datetime
|
||||
import socket
|
||||
|
||||
from sqlalchemy import exists
|
||||
|
||||
from ckan.lib.base import c
|
||||
from ckan import model
|
||||
from ckan.logic import ValidationError, NotFound, get_action
|
||||
from ckan.lib.helpers import json
|
||||
from ckan.lib.munge import munge_name
|
||||
from ckan.plugins import toolkit
|
||||
|
||||
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
from base import HarvesterBase
|
||||
|
||||
|
||||
class CKANHarvester(HarvesterBase):
|
||||
'''
|
||||
A Harvester for CKAN instances
|
||||
'''
|
||||
config = None
|
||||
|
||||
api_version = 2
|
||||
action_api_version = 3
|
||||
|
||||
def _get_action_api_offset(self):
|
||||
return '/api/%d/action' % self.action_api_version
|
||||
|
||||
def _get_search_api_offset(self):
|
||||
return '%s/package_search' % self._get_action_api_offset()
|
||||
|
||||
def _get_content(self, url):
|
||||
http_request = urllib2.Request(url=url)
|
||||
|
||||
api_key = self.config.get('api_key')
|
||||
if api_key:
|
||||
http_request.add_header('Authorization', api_key)
|
||||
|
||||
try:
|
||||
http_response = urllib2.urlopen(http_request)
|
||||
except urllib2.HTTPError, e:
|
||||
if e.getcode() == 404:
|
||||
raise ContentNotFoundError('HTTP error: %s' % e.code)
|
||||
else:
|
||||
raise ContentFetchError('HTTP error: %s' % e.code)
|
||||
except urllib2.URLError, e:
|
||||
raise ContentFetchError('URL error: %s' % e.reason)
|
||||
except httplib.HTTPException, e:
|
||||
raise ContentFetchError('HTTP Exception: %s' % e)
|
||||
except socket.error, e:
|
||||
raise ContentFetchError('HTTP socket error: %s' % e)
|
||||
except Exception, e:
|
||||
raise ContentFetchError('HTTP general exception: %s' % e)
|
||||
return http_response.read()
|
||||
|
||||
def _get_group(self, base_url, group_name):
|
||||
url = base_url + self._get_action_api_offset() + '/group_show?id=' + \
|
||||
munge_name(group_name)
|
||||
try:
|
||||
content = self._get_content(url)
|
||||
return json.loads(content)
|
||||
except (ContentFetchError, ValueError):
|
||||
log.debug('Could not fetch/decode remote group')
|
||||
raise RemoteResourceError('Could not fetch/decode remote group')
|
||||
|
||||
def _get_organization(self, base_url, org_name):
|
||||
url = base_url + self._get_action_api_offset() + \
|
||||
'/organization_show?id=' + org_name
|
||||
try:
|
||||
content = self._get_content(url)
|
||||
content_dict = json.loads(content)
|
||||
return content_dict['result']
|
||||
except (ContentFetchError, ValueError, KeyError):
|
||||
log.debug('Could not fetch/decode remote group')
|
||||
raise RemoteResourceError(
|
||||
'Could not fetch/decode remote organization')
|
||||
|
||||
def _set_config(self, config_str):
|
||||
if config_str:
|
||||
self.config = json.loads(config_str)
|
||||
if 'api_version' in self.config:
|
||||
self.api_version = int(self.config['api_version'])
|
||||
|
||||
log.debug('Using config: %r', self.config)
|
||||
else:
|
||||
self.config = {}
|
||||
|
||||
def info(self):
|
||||
return {
|
||||
'name': 'ckan',
|
||||
'title': 'CKAN',
|
||||
'description': 'Harvests remote CKAN instances',
|
||||
'form_config_interface': 'Text'
|
||||
}
|
||||
|
||||
def validate_config(self, config):
|
||||
if not config:
|
||||
return config
|
||||
|
||||
try:
|
||||
config_obj = json.loads(config)
|
||||
|
||||
if 'api_version' in config_obj:
|
||||
try:
|
||||
int(config_obj['api_version'])
|
||||
except ValueError:
|
||||
raise ValueError('api_version must be an integer')
|
||||
|
||||
if 'default_tags' in config_obj:
|
||||
if not isinstance(config_obj['default_tags'], list):
|
||||
raise ValueError('default_tags must be a list')
|
||||
|
||||
if 'default_groups' in config_obj:
|
||||
if not isinstance(config_obj['default_groups'], list):
|
||||
raise ValueError('default_groups must be a list')
|
||||
|
||||
# Check if default groups exist
|
||||
context = {'model': model, 'user': c.user}
|
||||
for group_name in config_obj['default_groups']:
|
||||
try:
|
||||
group = get_action('group_show')(
|
||||
context, {'id': group_name})
|
||||
except NotFound, e:
|
||||
raise ValueError('Default group not found')
|
||||
|
||||
if 'default_extras' in config_obj:
|
||||
if not isinstance(config_obj['default_extras'], dict):
|
||||
raise ValueError('default_extras must be a dictionary')
|
||||
|
||||
if 'user' in config_obj:
|
||||
# Check if user exists
|
||||
context = {'model': model, 'user': c.user}
|
||||
try:
|
||||
user = get_action('user_show')(
|
||||
context, {'id': config_obj.get('user')})
|
||||
except NotFound:
|
||||
raise ValueError('User not found')
|
||||
|
||||
for key in ('read_only', 'force_all'):
|
||||
if key in config_obj:
|
||||
if not isinstance(config_obj[key], bool):
|
||||
raise ValueError('%s must be boolean' % key)
|
||||
|
||||
except ValueError, e:
|
||||
raise e
|
||||
|
||||
return config
|
||||
|
||||
def gather_stage(self, harvest_job):
|
||||
log.debug('In CKANHarvester gather_stage (%s)',
|
||||
harvest_job.source.url)
|
||||
toolkit.requires_ckan_version(min_version='2.0')
|
||||
get_all_packages = True
|
||||
|
||||
self._set_config(harvest_job.source.config)
|
||||
|
||||
# URL cleanup
|
||||
parts = urlparse.urlsplit(harvest_job.source.url)
|
||||
if parts.username and parts.password:
|
||||
if parts.port is None:
|
||||
cleaned_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
url_path,
|
||||
parts.query,
|
||||
parts.fragment
|
||||
))
|
||||
else:
|
||||
cleaned_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
url_path,
|
||||
parts.query
|
||||
))
|
||||
|
||||
# Get source URL
|
||||
remote_ckan_base_url = cleaned_url.rstrip('/')
|
||||
|
||||
# Filter in/out datasets from particular organizations
|
||||
fq_terms = []
|
||||
org_filter_include = self.config.get('organizations_filter_include', [])
|
||||
org_filter_exclude = self.config.get('organizations_filter_exclude', [])
|
||||
if org_filter_include:
|
||||
fq_terms.append(' OR '.join(
|
||||
'organization:%s' % org_name for org_name in org_filter_include))
|
||||
elif org_filter_exclude:
|
||||
fq_terms.extend(
|
||||
'-organization:%s' % org_name for org_name in org_filter_exclude)
|
||||
|
||||
# Ideally we can request from the remote CKAN only those datasets
|
||||
# modified since the last completely successful harvest.
|
||||
last_error_free_job = self._last_error_free_job(harvest_job)
|
||||
log.debug('Last error-free job: %r', last_error_free_job)
|
||||
if (last_error_free_job and
|
||||
not self.config.get('force_all', False)):
|
||||
get_all_packages = False
|
||||
|
||||
# Request only the datasets modified since
|
||||
last_time = last_error_free_job.gather_started
|
||||
# Note: SOLR works in UTC, and gather_started is also UTC, so
|
||||
# this should work as long as local and remote clocks are
|
||||
# relatively accurate. Going back a little earlier, just in case.
|
||||
get_changes_since = \
|
||||
(last_time - datetime.timedelta(hours=1)).isoformat()
|
||||
log.info('Searching for datasets modified since: %s UTC',
|
||||
get_changes_since)
|
||||
|
||||
fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \
|
||||
.format(since=get_changes_since)
|
||||
|
||||
try:
|
||||
pkg_dicts = self._search_for_datasets(
|
||||
remote_ckan_base_url,
|
||||
fq_terms + [fq_since_last_time])
|
||||
except SearchError, e:
|
||||
log.info('Searching for datasets changed since last time '
|
||||
'gave an error: %s', e)
|
||||
get_all_packages = True
|
||||
|
||||
if not get_all_packages and not pkg_dicts:
|
||||
log.info('No datasets have been updated on the remote '
|
||||
'CKAN instance since the last harvest job %s',
|
||||
last_time)
|
||||
return None
|
||||
|
||||
# Fall-back option - request all the datasets from the remote CKAN
|
||||
if get_all_packages:
|
||||
# Request all remote packages
|
||||
try:
|
||||
pkg_dicts = self._search_for_datasets(remote_ckan_base_url,
|
||||
fq_terms)
|
||||
except SearchError, e:
|
||||
log.info('Searching for all datasets gave an error: %s', e)
|
||||
self._save_gather_error(
|
||||
'Unable to search remote CKAN for datasets:%s url:%s'
|
||||
'terms:%s' % (e, remote_ckan_base_url, fq_terms),
|
||||
harvest_job)
|
||||
return None
|
||||
if not pkg_dicts:
|
||||
self._save_gather_error(
|
||||
'No datasets found at CKAN: %s' % remote_ckan_base_url,
|
||||
harvest_job)
|
||||
return None
|
||||
|
||||
# Create harvest objects for each dataset
|
||||
try:
|
||||
package_ids = set()
|
||||
object_ids = []
|
||||
for pkg_dict in pkg_dicts:
|
||||
if pkg_dict['id'] in package_ids:
|
||||
log.info('Discarding duplicate dataset %s - probably due '
|
||||
'to datasets being changed at the same time as '
|
||||
'when the harvester was paging through',
|
||||
pkg_dict['id'])
|
||||
continue
|
||||
package_ids.add(pkg_dict['id'])
|
||||
|
||||
log.debug('Creating HarvestObject for %s %s',
|
||||
pkg_dict['name'], pkg_dict['id'])
|
||||
obj = HarvestObject(guid=pkg_dict['id'],
|
||||
job=harvest_job,
|
||||
content=json.dumps(pkg_dict))
|
||||
obj.save()
|
||||
object_ids.append(obj.id)
|
||||
|
||||
return object_ids
|
||||
except Exception, e:
|
||||
self._save_gather_error('%r' % e.message, harvest_job)
|
||||
|
||||
def _search_for_datasets(self, remote_ckan_base_url, fq_terms=None):
|
||||
'''Does a dataset search on a remote CKAN and returns the results.
|
||||
|
||||
Deals with paging to return all the results, not just the first page.
|
||||
'''
|
||||
base_search_url = remote_ckan_base_url + self._get_search_api_offset()
|
||||
params = {'rows': '100', 'start': '0'}
|
||||
# There is the worry that datasets will be changed whilst we are paging
|
||||
# through them.
|
||||
# * In SOLR 4.7 there is a cursor, but not using that yet
|
||||
# because few CKANs are running that version yet.
|
||||
# * However we sort, then new names added or removed before the current
|
||||
# page would cause existing names on the next page to be missed or
|
||||
# double counted.
|
||||
# * Another approach might be to sort by metadata_modified and always
|
||||
# ask for changes since (and including) the date of the last item of
|
||||
# the day before. However if the entire page is of the exact same
|
||||
# time, then you end up in an infinite loop asking for the same page.
|
||||
# * We choose a balanced approach of sorting by ID, which means
|
||||
# datasets are only missed if some are removed, which is far less
|
||||
# likely than any being added. If some are missed then it is assumed
|
||||
# they will harvested the next time anyway. When datasets are added,
|
||||
# we are at risk of seeing datasets twice in the paging, so we detect
|
||||
# and remove any duplicates.
|
||||
params['sort'] = 'id asc'
|
||||
if fq_terms:
|
||||
params['fq'] = ' '.join(fq_terms)
|
||||
|
||||
pkg_dicts = []
|
||||
pkg_ids = set()
|
||||
previous_content = None
|
||||
while True:
|
||||
url = base_search_url + '?' + urllib.urlencode(params)
|
||||
log.debug('Searching for CKAN datasets: %s', url)
|
||||
try:
|
||||
content = self._get_content(url)
|
||||
except ContentFetchError, e:
|
||||
raise SearchError(
|
||||
'Error sending request to search remote '
|
||||
'CKAN instance %s using URL %r. Error: %s' %
|
||||
(remote_ckan_base_url, url, e))
|
||||
|
||||
if previous_content and content == previous_content:
|
||||
raise SearchError('The paging doesn\'t seem to work. URL: %s' %
|
||||
url)
|
||||
try:
|
||||
response_dict = json.loads(content)
|
||||
except ValueError:
|
||||
raise SearchError('Response from remote CKAN was not JSON: %r'
|
||||
% content)
|
||||
try:
|
||||
pkg_dicts_page = response_dict.get('result', {}).get('results',
|
||||
[])
|
||||
except ValueError:
|
||||
raise SearchError('Response JSON did not contain '
|
||||
'result/results: %r' % response_dict)
|
||||
|
||||
# Weed out any datasets found on previous pages (should datasets be
|
||||
# changing while we page)
|
||||
ids_in_page = set(p['id'] for p in pkg_dicts_page)
|
||||
duplicate_ids = ids_in_page & pkg_ids
|
||||
if duplicate_ids:
|
||||
pkg_dicts_page = [p for p in pkg_dicts_page
|
||||
if p['id'] not in duplicate_ids]
|
||||
pkg_ids |= ids_in_page
|
||||
|
||||
pkg_dicts.extend(pkg_dicts_page)
|
||||
|
||||
if len(pkg_dicts_page) == 0:
|
||||
break
|
||||
|
||||
params['start'] = str(int(params['start']) + int(params['rows']))
|
||||
|
||||
return pkg_dicts
|
||||
|
||||
@classmethod
|
||||
def _last_error_free_job(cls, harvest_job):
|
||||
# TODO weed out cancelled jobs somehow.
|
||||
# look for jobs with no gather errors
|
||||
jobs = \
|
||||
model.Session.query(HarvestJob) \
|
||||
.filter(HarvestJob.source == harvest_job.source) \
|
||||
.filter(HarvestJob.gather_started != None) \
|
||||
.filter(HarvestJob.status == 'Finished') \
|
||||
.filter(HarvestJob.id != harvest_job.id) \
|
||||
.filter(
|
||||
~exists().where(
|
||||
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
|
||||
.order_by(HarvestJob.gather_started.desc())
|
||||
# now check them until we find one with no fetch/import errors
|
||||
# (looping rather than doing sql, in case there are lots of objects
|
||||
# and lots of jobs)
|
||||
for job in jobs:
|
||||
for obj in job.objects:
|
||||
if obj.current is False and \
|
||||
obj.report_status != 'not modified':
|
||||
# unsuccessful, so go onto the next job
|
||||
break
|
||||
else:
|
||||
return job
|
||||
|
||||
def fetch_stage(self, harvest_object):
|
||||
# Nothing to do here - we got the package dict in the search in the
|
||||
# gather stage
|
||||
return True
|
||||
|
||||
def import_stage(self, harvest_object):
|
||||
log.debug('In CKANHarvester import_stage')
|
||||
|
||||
context = {'model': model, 'session': model.Session,
|
||||
'user': self._get_user_name()}
|
||||
if not harvest_object:
|
||||
log.error('No harvest object received')
|
||||
return False
|
||||
|
||||
if harvest_object.content is None:
|
||||
self._save_object_error('Empty content for object %s' %
|
||||
harvest_object.id,
|
||||
harvest_object, 'Import')
|
||||
return False
|
||||
|
||||
self._set_config(harvest_object.job.source.config)
|
||||
|
||||
try:
|
||||
package_dict = json.loads(harvest_object.content)
|
||||
|
||||
if package_dict.get('type') == 'harvest':
|
||||
log.warn('Remote dataset is a harvest source, ignoring...')
|
||||
return True
|
||||
|
||||
# Set default tags if needed
|
||||
default_tags = self.config.get('default_tags', [])
|
||||
if default_tags:
|
||||
if not 'tags' in package_dict:
|
||||
package_dict['tags'] = []
|
||||
package_dict['tags'].extend(
|
||||
[t for t in default_tags if t not in package_dict['tags']])
|
||||
|
||||
remote_groups = self.config.get('remote_groups', None)
|
||||
if not remote_groups in ('only_local', 'create'):
|
||||
# Ignore remote groups
|
||||
package_dict.pop('groups', None)
|
||||
else:
|
||||
if not 'groups' in package_dict:
|
||||
package_dict['groups'] = []
|
||||
|
||||
# check if remote groups exist locally, otherwise remove
|
||||
validated_groups = []
|
||||
|
||||
for group_name in package_dict['groups']:
|
||||
try:
|
||||
data_dict = {'id': group_name}
|
||||
group = get_action('group_show')(context, data_dict)
|
||||
if self.api_version == 1:
|
||||
validated_groups.append(group['name'])
|
||||
else:
|
||||
validated_groups.append(group['id'])
|
||||
except NotFound, e:
|
||||
log.info('Group %s is not available', group_name)
|
||||
if remote_groups == 'create':
|
||||
try:
|
||||
group = self._get_group(harvest_object.source.url, group_name)
|
||||
except RemoteResourceError:
|
||||
log.error('Could not get remote group %s', group_name)
|
||||
continue
|
||||
|
||||
for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name']:
|
||||
group.pop(key, None)
|
||||
|
||||
get_action('group_create')(context, group)
|
||||
log.info('Group %s has been newly created', group_name)
|
||||
if self.api_version == 1:
|
||||
validated_groups.append(group['name'])
|
||||
else:
|
||||
validated_groups.append(group['id'])
|
||||
|
||||
package_dict['groups'] = validated_groups
|
||||
|
||||
|
||||
# Local harvest source organization
|
||||
source_dataset = get_action('package_show')(context, {'id': harvest_object.source.id})
|
||||
local_org = source_dataset.get('owner_org')
|
||||
|
||||
remote_orgs = self.config.get('remote_orgs', None)
|
||||
|
||||
if not remote_orgs in ('only_local', 'create'):
|
||||
# Assign dataset to the source organization
|
||||
package_dict['owner_org'] = local_org
|
||||
else:
|
||||
if not 'owner_org' in package_dict:
|
||||
package_dict['owner_org'] = None
|
||||
|
||||
# check if remote org exist locally, otherwise remove
|
||||
validated_org = None
|
||||
remote_org = package_dict['owner_org']
|
||||
|
||||
if remote_org:
|
||||
try:
|
||||
data_dict = {'id': remote_org}
|
||||
org = get_action('organization_show')(context, data_dict)
|
||||
validated_org = org['id']
|
||||
except NotFound, e:
|
||||
log.info('Organization %s is not available', remote_org)
|
||||
if remote_orgs == 'create':
|
||||
try:
|
||||
try:
|
||||
org = self._get_organization(harvest_object.source.url, remote_org)
|
||||
except RemoteResourceError:
|
||||
# fallback if remote CKAN exposes organizations as groups
|
||||
# this especially targets older versions of CKAN
|
||||
org = self._get_group(harvest_object.source.url, remote_org)
|
||||
|
||||
for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']:
|
||||
org.pop(key, None)
|
||||
get_action('organization_create')(context, org)
|
||||
log.info('Organization %s has been newly created', remote_org)
|
||||
validated_org = org['id']
|
||||
except (RemoteResourceError, ValidationError):
|
||||
log.error('Could not get remote org %s', remote_org)
|
||||
|
||||
package_dict['owner_org'] = validated_org or local_org
|
||||
|
||||
# Set default groups if needed
|
||||
default_groups = self.config.get('default_groups', [])
|
||||
if default_groups:
|
||||
if not 'groups' in package_dict:
|
||||
package_dict['groups'] = []
|
||||
package_dict['groups'].extend(
|
||||
[g for g in default_groups
|
||||
if g not in package_dict['groups']])
|
||||
|
||||
# Set default extras if needed
|
||||
default_extras = self.config.get('default_extras', {})
|
||||
def get_extra(key, package_dict):
|
||||
for extra in package_dict.get('extras', []):
|
||||
if extra['key'] == key:
|
||||
return extra
|
||||
if default_extras:
|
||||
override_extras = self.config.get('override_extras', False)
|
||||
if not 'extras' in package_dict:
|
||||
package_dict['extras'] = {}
|
||||
for key, value in default_extras.iteritems():
|
||||
existing_extra = get_extra(key, package_dict)
|
||||
if existing_extra and not override_extras:
|
||||
continue # no need for the default
|
||||
if existing_extra:
|
||||
package_dict['extras'].remove(existing_extra)
|
||||
# Look for replacement strings
|
||||
if isinstance(value, basestring):
|
||||
value = value.format(
|
||||
harvest_source_id=harvest_object.job.source.id,
|
||||
harvest_source_url=
|
||||
harvest_object.job.source.url.strip('/'),
|
||||
harvest_source_title=
|
||||
harvest_object.job.source.title,
|
||||
harvest_job_id=harvest_object.job.id,
|
||||
harvest_object_id=harvest_object.id,
|
||||
dataset_id=package_dict['id'])
|
||||
|
||||
package_dict['extras'].append({'key': key, 'value': value})
|
||||
|
||||
for resource in package_dict.get('resources', []):
|
||||
# Clear remote url_type for resources (eg datastore, upload) as
|
||||
# we are only creating normal resources with links to the
|
||||
# remote ones
|
||||
resource.pop('url_type', None)
|
||||
|
||||
# Clear revision_id as the revision won't exist on this CKAN
|
||||
# and saving it will cause an IntegrityError with the foreign
|
||||
# key.
|
||||
resource.pop('revision_id', None)
|
||||
|
||||
result = self._create_or_update_package(
|
||||
package_dict, harvest_object, package_dict_form='package_show')
|
||||
|
||||
return result
|
||||
except ValidationError, e:
|
||||
self._save_object_error('Invalid package with GUID %s: %r' %
|
||||
(harvest_object.guid, e.error_dict),
|
||||
harvest_object, 'Import')
|
||||
except Exception, e:
|
||||
self._save_object_error('%s' % e, harvest_object, 'Import')
|
||||
|
||||
|
||||
class ContentFetchError(Exception):
|
||||
pass
|
||||
|
||||
class ContentNotFoundError(ContentFetchError):
|
||||
pass
|
||||
|
||||
class RemoteResourceError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class SearchError(Exception):
|
||||
pass
|
|
@ -0,0 +1,313 @@
|
|||
import re
|
||||
import urllib
|
||||
import urllib2
|
||||
import requests
|
||||
import urlparse
|
||||
import lxml
|
||||
from xml.dom.minidom import Document
|
||||
|
||||
import logging
|
||||
|
||||
from ckan import model
|
||||
|
||||
from ckan.plugins.core import SingletonPlugin, implements
|
||||
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
from ckanext.harvest.model import HarvestObject
|
||||
from ckanext.harvest.model import HarvestObjectExtra as HOExtra
|
||||
|
||||
from ckanext.spatial.lib.csw_client import CswService
|
||||
from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback
|
||||
|
||||
|
||||
class CSWHarvester(SpatialHarvester, SingletonPlugin):
|
||||
'''
|
||||
A Harvester for CSW servers
|
||||
'''
|
||||
implements(IHarvester)
|
||||
|
||||
csw=None
|
||||
|
||||
def info(self):
|
||||
return {
|
||||
'name': 'csw',
|
||||
'title': 'CSW Server',
|
||||
'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard'
|
||||
}
|
||||
|
||||
|
||||
def get_original_url(self, harvest_object_id):
|
||||
obj = model.Session.query(HarvestObject).\
|
||||
filter(HarvestObject.id==harvest_object_id).\
|
||||
first()
|
||||
|
||||
parts = urlparse.urlparse(obj.source.url)
|
||||
|
||||
params = {
|
||||
'SERVICE': 'CSW',
|
||||
'VERSION': '2.0.2',
|
||||
'REQUEST': 'GetRecordById',
|
||||
'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd',
|
||||
'OUTPUTFORMAT':'application/xml' ,
|
||||
'ID': obj.guid
|
||||
}
|
||||
|
||||
if parts.username and parts.password:
|
||||
if parts.port is None:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.path,
|
||||
None,
|
||||
urllib.urlencode(params),
|
||||
None
|
||||
))
|
||||
else:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
parts.path,
|
||||
urllib.urlencode(params),
|
||||
None
|
||||
))
|
||||
else:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.netloc,
|
||||
parts.path,
|
||||
None,
|
||||
urllib.urlencode(params),
|
||||
None
|
||||
))
|
||||
|
||||
return url
|
||||
|
||||
def output_schema(self):
|
||||
return 'gmd'
|
||||
|
||||
def gather_stage(self, harvest_job):
|
||||
log = logging.getLogger(__name__ + '.CSW.gather')
|
||||
log.debug('CswHarvester gather_stage for job: %r', harvest_job)
|
||||
# Get source URL
|
||||
url = harvest_job.source.url
|
||||
|
||||
parts = urlparse.urlsplit(url)
|
||||
if parts.username and parts.password:
|
||||
url_path = parts.path.rsplit('/', 1)[0]
|
||||
url_path = url_path + '/xml.user.login'
|
||||
if parts.port is None:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
url_path,
|
||||
parts.query,
|
||||
parts.fragment
|
||||
))
|
||||
else:
|
||||
auth_url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
url_path,
|
||||
parts.query
|
||||
))
|
||||
log.debug('Authenticate against Geonetwork. User is %s and password is %s', parts.username, parts.password)
|
||||
auth_data = Document()
|
||||
root = auth_data.createElement('request')
|
||||
auth_data.appendChild(root)
|
||||
username_tag = auth_data.createElement('username')
|
||||
user_data = auth_data.createTextNode(parts.username)
|
||||
username_tag.appendChild(user_data)
|
||||
root.appendChild(username_tag)
|
||||
password_tag = auth_data.createElement('password')
|
||||
password_data = auth_data.createTextNode(parts.password)
|
||||
password_tag.appendChild(password_data)
|
||||
root.appendChild(password_tag)
|
||||
xml_auth_data = auth_data.toprettyxml(indent=" ")
|
||||
|
||||
req_headers = {'Content-Type': 'application/xml'}
|
||||
|
||||
sess = requests.Session()
|
||||
log.debug('Gather stage. Authorization to the geoserver, URL is %s', auth_url)
|
||||
req = sess.post(url=auth_url, data=xml_auth_data, headers=req_headers)
|
||||
log.debug('Gather stage. Geoserver Authorization cookie is %s', req.cookies)
|
||||
|
||||
if parts.username and parts.password:
|
||||
if parts.port is None:
|
||||
url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.path,
|
||||
parts.query,
|
||||
parts.fragment
|
||||
))
|
||||
else:
|
||||
url = urlparse.urlunsplit((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
parts.path,
|
||||
parts.query
|
||||
))
|
||||
|
||||
self._set_source_config(harvest_job.source.config)
|
||||
|
||||
try:
|
||||
log.debug('Gather stage. Contacting the geoserver, URL is %s', url)
|
||||
res = sess.get(url)
|
||||
log.debug('Gather stage. Geoserver contacted, result is %s', res)
|
||||
self._setup_csw_client(sess.get(url))
|
||||
#self._setup_csw_client(url)
|
||||
except Exception, e:
|
||||
self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job)
|
||||
return None
|
||||
|
||||
query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\
|
||||
filter(HarvestObject.current==True).\
|
||||
filter(HarvestObject.harvest_source_id==harvest_job.source.id)
|
||||
guid_to_package_id = {}
|
||||
|
||||
for guid, package_id in query:
|
||||
guid_to_package_id[guid] = package_id
|
||||
|
||||
guids_in_db = set(guid_to_package_id.keys())
|
||||
|
||||
# extract cql filter if any
|
||||
cql = self.source_config.get('cql')
|
||||
|
||||
log.debug('Starting gathering for %s' % url)
|
||||
guids_in_harvest = set()
|
||||
try:
|
||||
for identifier in self.csw.getidentifiers(page=10, outputschema=self.output_schema(), cql=cql):
|
||||
try:
|
||||
log.info('Got identifier %s from the CSW', identifier)
|
||||
if identifier is None:
|
||||
log.error('CSW returned identifier %r, skipping...' % identifier)
|
||||
continue
|
||||
|
||||
guids_in_harvest.add(identifier)
|
||||
except Exception, e:
|
||||
self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job)
|
||||
continue
|
||||
|
||||
|
||||
except Exception, e:
|
||||
log.error('Exception: %s' % text_traceback())
|
||||
self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job)
|
||||
return None
|
||||
|
||||
new = guids_in_harvest - guids_in_db
|
||||
delete = guids_in_db - guids_in_harvest
|
||||
change = guids_in_db & guids_in_harvest
|
||||
|
||||
ids = []
|
||||
for guid in new:
|
||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
||||
extras=[HOExtra(key='status', value='new')])
|
||||
obj.save()
|
||||
ids.append(obj.id)
|
||||
for guid in change:
|
||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
||||
package_id=guid_to_package_id[guid],
|
||||
extras=[HOExtra(key='status', value='change')])
|
||||
obj.save()
|
||||
ids.append(obj.id)
|
||||
for guid in delete:
|
||||
obj = HarvestObject(guid=guid, job=harvest_job,
|
||||
package_id=guid_to_package_id[guid],
|
||||
extras=[HOExtra(key='status', value='delete')])
|
||||
model.Session.query(HarvestObject).\
|
||||
filter_by(guid=guid).\
|
||||
update({'current': False}, False)
|
||||
obj.save()
|
||||
ids.append(obj.id)
|
||||
|
||||
if len(ids) == 0:
|
||||
self._save_gather_error('No records received from the CSW server', harvest_job)
|
||||
return None
|
||||
|
||||
return ids
|
||||
|
||||
def fetch_stage(self,harvest_object):
|
||||
|
||||
# Check harvest object status
|
||||
status = self._get_object_extra(harvest_object, 'status')
|
||||
|
||||
if status == 'delete':
|
||||
# No need to fetch anything, just pass to the import stage
|
||||
return True
|
||||
|
||||
log = logging.getLogger(__name__ + '.CSW.fetch')
|
||||
log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id)
|
||||
|
||||
url = harvest_object.source.url
|
||||
parts = urlparse.urlparse(url)
|
||||
if parts.username and parts.password:
|
||||
if parts.port is None:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.path,
|
||||
None,
|
||||
None,
|
||||
None
|
||||
))
|
||||
else:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.hostname,
|
||||
parts.port,
|
||||
parts.path,
|
||||
None,
|
||||
None
|
||||
))
|
||||
else:
|
||||
url = urlparse.urlunparse((
|
||||
parts.scheme,
|
||||
parts.netloc,
|
||||
parts.path,
|
||||
None,
|
||||
None,
|
||||
None
|
||||
))
|
||||
|
||||
try:
|
||||
self._setup_csw_client(url)
|
||||
log.debug('Fetch stage. Contacting the geoserver, URL is %s', url)
|
||||
except Exception, e:
|
||||
self._save_object_error('Error contacting the CSW server: %s' % e,
|
||||
harvest_object)
|
||||
return False
|
||||
|
||||
identifier = harvest_object.guid
|
||||
try:
|
||||
record = self.csw.getrecordbyid([identifier], outputschema=self.output_schema())
|
||||
except Exception, e:
|
||||
self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object)
|
||||
return False
|
||||
|
||||
if record is None:
|
||||
self._save_object_error('Empty record for GUID %s' % identifier,
|
||||
harvest_object)
|
||||
return False
|
||||
|
||||
try:
|
||||
# Save the fetch contents in the HarvestObject
|
||||
# Contents come from csw_client already declared and encoded as utf-8
|
||||
# Remove original XML declaration
|
||||
content = re.sub('<\?xml(.*)\?>', '', record['xml'])
|
||||
|
||||
harvest_object.content = content.strip()
|
||||
harvest_object.save()
|
||||
except Exception,e:
|
||||
self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \
|
||||
(identifier, e), harvest_object)
|
||||
return False
|
||||
|
||||
log.debug('XML content saved (len %s)', len(record['xml']))
|
||||
return True
|
||||
|
||||
def _setup_csw_client(self, url):
|
||||
self.csw = CswService(url)
|
||||
|
|
@ -164,6 +164,16 @@
|
|||
notify: Restart CKAN
|
||||
tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_geo_auth' ]
|
||||
|
||||
- name: Overwrite the ckanharvester.py ckanext plugin file to remove the authentication data from URLs
|
||||
copy: src=ckanharvester.py dest=/usr/lib/ckan/default/src/ckanext-harvest/ckanext/harvest/harvesters/ckanharvester.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes
|
||||
notify: Restart CKAN
|
||||
tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanharvester' ]
|
||||
|
||||
- name: Overwrite the csw.py ckanext-spatial plugin file to remove the authentication data from URLs
|
||||
copy: src=csw.py dest=/usr/lib/ckan/default/src/ckanext-spatial/ckanext/spatial/harvesters/csw.py owner={{ ckan_shell_user }} group={{ ckan_shell_user }} mode=0644 backup=yes
|
||||
notify: Restart CKAN
|
||||
tags: [ 'ckan', 'ckan_pages', 'ckan_plugins', 'ckan_ckanext_spatial' ]
|
||||
|
||||
- name: Restart apache
|
||||
service: name=apache state=restarted enabled=yes
|
||||
when: ( ckan_install | changed )
|
||||
|
|
Loading…
Reference in New Issue