spatial-d4science/bin/ckan_pycsw.py

292 lines
8.7 KiB
Python

import sys
import logging
import datetime
import io
import os
import argparse
from six.moves.configparser import SafeConfigParser
import requests
from lxml import etree
from pycsw.core import metadata, repository, util
import pycsw.core.config
import pycsw.core.admin
logging.basicConfig(format="%(message)s", level=logging.INFO)
log = logging.getLogger(__name__)
def setup_db(pycsw_config):
"""Setup database tables and indexes"""
from sqlalchemy import Column, Text
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
ckan_columns = [
Column("ckan_id", Text, index=True),
Column("ckan_modified", Text),
]
pycsw.core.admin.setup_db(
database,
table_name,
"",
create_plpythonu_functions=False,
extra_columns=ckan_columns,
)
def set_keywords(pycsw_config_file, pycsw_config, ckan_url, limit=20):
"""set pycsw service metadata keywords from top limit CKAN tags"""
log.info("Fetching tags from %s", ckan_url)
url = ckan_url + "api/tag_counts"
response = requests.get(url)
tags = response.json()
log.info("Deriving top %d tags", limit)
# uniquify and sort by top limit
tags_unique = [list(x) for x in set(tuple(x) for x in tags)]
tags_sorted = sorted(tags_unique, key=lambda x: x[1], reverse=1)[0:limit]
keywords = ",".join("%s" % tn[0] for tn in tags_sorted)
log.info("Setting tags in pycsw configuration file %s", pycsw_config_file)
pycsw_config.set("metadata:main", "identification_keywords", keywords)
with open(pycsw_config_file, "wb") as configfile:
pycsw_config.write(configfile)
def load(pycsw_config, ckan_url):
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
context = pycsw.core.config.StaticContext()
repo = repository.Repository(database, context, table=table_name)
log.info(
"Started gathering CKAN datasets identifiers: {0}".format(
str(datetime.datetime.now())
)
)
query = 'api/search/dataset?qjson={"fl":"id,metadata_modified,extras_harvest_object_id,extras_metadata_source", "q":"harvest_object_id:[\\"\\" TO *]", "limit":1000, "start":%s}'
start = 0
gathered_records = {}
while True:
url = ckan_url + query % start
response = requests.get(url)
listing = response.json()
if not isinstance(listing, dict):
raise RuntimeError("Wrong API response: %s" % listing)
results = listing.get("results")
if not results:
break
for result in results:
gathered_records[result["id"]] = {
"metadata_modified": result["metadata_modified"],
"harvest_object_id": result["extras"]["harvest_object_id"],
"source": result["extras"].get("metadata_source"),
}
start = start + 1000
log.debug("Gathered %s" % start)
log.info(
"Gather finished ({0} datasets): {1}".format(
len(gathered_records.keys()), str(datetime.datetime.now())
)
)
existing_records = {}
query = repo.session.query(repo.dataset.ckan_id, repo.dataset.ckan_modified)
for row in query:
existing_records[row[0]] = row[1]
repo.session.close()
new = set(gathered_records) - set(existing_records)
deleted = set(existing_records) - set(gathered_records)
changed = set()
for key in set(gathered_records) & set(existing_records):
if gathered_records[key]["metadata_modified"] > existing_records[key]:
changed.add(key)
for ckan_id in deleted:
try:
repo.session.begin()
repo.session.query(repo.dataset.ckan_id).filter_by(ckan_id=ckan_id).delete()
log.info("Deleted %s" % ckan_id)
repo.session.commit()
except Exception:
repo.session.rollback()
raise
for ckan_id in new:
ckan_info = gathered_records[ckan_id]
record = get_record(context, repo, ckan_url, ckan_id, ckan_info)
if not record:
log.info("Skipped record %s" % ckan_id)
continue
try:
repo.insert(record, "local", util.get_today_and_now())
log.info("Inserted %s" % ckan_id)
except Exception as err:
log.error("ERROR: not inserted %s Error:%s" % (ckan_id, err))
for ckan_id in changed:
ckan_info = gathered_records[ckan_id]
record = get_record(context, repo, ckan_url, ckan_id, ckan_info)
if not record:
continue
update_dict = dict(
[
(getattr(repo.dataset, key), getattr(record, key))
for key in record.__dict__.keys()
if key != "_sa_instance_state"
]
)
try:
repo.session.begin()
repo.session.query(repo.dataset).filter_by(ckan_id=ckan_id).update(
update_dict
)
repo.session.commit()
log.info("Changed %s" % ckan_id)
except Exception as err:
repo.session.rollback()
raise RuntimeError("ERROR: %s" % str(err))
def clear(pycsw_config):
from sqlalchemy import create_engine, MetaData, Table
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
log.debug("Creating engine")
engine = create_engine(database)
records = Table(table_name, MetaData(engine))
records.delete().execute()
log.info("Table cleared")
def get_record(context, repo, ckan_url, ckan_id, ckan_info):
query = ckan_url + "harvest/object/%s"
url = query % ckan_info["harvest_object_id"]
response = requests.get(url)
if ckan_info["source"] == "arcgis":
return
try:
xml = etree.parse(io.BytesIO(response.content))
except Exception as err:
log.error("Could not pass xml doc from %s, Error: %s" % (ckan_id, err))
return
try:
record = metadata.parse_record(context, xml, repo)[0]
except Exception as err:
log.error("Could not extract metadata from %s, Error: %s" % (ckan_id, err))
return
if not record.identifier:
record.identifier = ckan_id
record.ckan_id = ckan_id
record.ckan_modified = ckan_info["metadata_modified"]
return record
usage = """
Manages the CKAN-pycsw integration
python ckan-pycsw.py setup [-p]
Setups the necessary pycsw table on the db.
python ckan-pycsw.py set_keywords [-p] -u
Sets pycsw server metadata keywords from CKAN site tag list.
python ckan-pycsw.py load [-p] -u
Loads CKAN datasets as records into the pycsw db.
python ckan-pycsw.py clear [-p]
Removes all records from the pycsw table.
All commands require the pycsw configuration file. By default it will try
to find a file called 'default.cfg' in the same directory, but you'll
probably need to provide the actual location via the -p option:
python ckan_pycsw.py setup -p /etc/ckan/default/pycsw.cfg
The load command requires a CKAN URL from where the datasets will be pulled:
python ckan_pycsw.py load -p /etc/ckan/default/pycsw.cfg -u http://localhost
"""
def _load_config(file_path):
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
raise AssertionError("pycsw config file {0} does not exist.".format(abs_path))
config = SafeConfigParser()
config.read(abs_path)
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="\n".split(usage)[0], usage=usage)
parser.add_argument("command", help="Command to perform")
parser.add_argument(
"-p",
"--pycsw_config",
action="store",
default="default.cfg",
help="pycsw config file to use.",
)
parser.add_argument(
"-u",
"--ckan_url",
action="store",
help="CKAN instance to import the datasets from.",
)
if len(sys.argv) <= 1:
parser.print_usage()
sys.exit(1)
arg = parser.parse_args()
pycsw_config = _load_config(arg.pycsw_config)
if arg.command == "setup":
setup_db(pycsw_config)
elif arg.command in ["load", "set_keywords"]:
if not arg.ckan_url:
raise AssertionError("You need to provide a CKAN URL with -u or --ckan_url")
ckan_url = arg.ckan_url.rstrip("/") + "/"
if arg.command == "load":
load(pycsw_config, ckan_url)
else:
set_keywords(arg.pycsw_config, pycsw_config, ckan_url)
elif arg.command == "clear":
clear(pycsw_config)
else:
print("Unknown command {0}".format(arg.command))
sys.exit(1)