From ed052a3476bf5c8980412b0d1b8387491d761ab2 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 6 May 2024 16:08:33 +0200 Subject: [PATCH] job for the population of the oai database --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 156 ++++++++++++++++++ .../dhp/oa/oaipmh/OaiRecordWrapper.java | 50 ++++++ 2 files changed, 206 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java new file mode 100644 index 000000000..9a608b6fa --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -0,0 +1,156 @@ +package eu.dnetlib.dhp.oa.oaipmh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Optional; +import java.util.Properties; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.dom4j.Document; +import org.dom4j.DocumentHelper; +import org.dom4j.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.XmlConverterJob; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; + +public class IrishOaiExporterJob { + + private static final Logger log = LoggerFactory.getLogger(IrishOaiExporterJob.class); + + protected static final int NUM_CONNECTIONS = 20; + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString(XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); + + log.info("inputPath: '{}'", inputPath); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + log.info("numPartitions: '{}'", numConnections); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); + final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + final Dataset docs = spark + .read() + .schema(encoderTuple.schema()) + .json(inputPath) + .as(encoderTuple) + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) + .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); + + docs.repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, dbTable, connectionProperties); + + }); + } + + private static OaiRecordWrapper asIrishOaiResult(final String xml) { + try { + final Document doc = DocumentHelper.parseText(xml); + final OaiRecordWrapper r = new OaiRecordWrapper(); + + if (isValid(doc)) { + r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); + r.setBody(gzip(xml)); + r.setDate(LocalDateTime.now()); + r.setSets(new ArrayList<>()); + } + return r; + } catch (final Exception e) { + log.error("Error parsing record: " + xml, e); + throw new RuntimeException("Error parsing record: " + xml, e); + } + } + + private static boolean isValid(final Document doc) { + + final Node n = doc.selectSingleNode("//*[local-name()='entity']/*[local-name()='result']"); + + if (n != null) { + for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) { + if ("true".equals(((Node) o).getText().trim())) { return false; } + } + + for (final Object o : n.selectNodes("./*[local-name()='country']")) { + if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; } + } + + for (final Object o : n.selectNodes(".//*[local-name()='rel']")) { + final String relType = ((Node) o).valueOf("./[local-name() = 'to']/@type").trim(); + final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim(); + if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; } + } + } + return false; + + } + + private static byte[] gzip(final String str) { + if (StringUtils.isBlank(str)) { return null; } + + try { + final ByteArrayOutputStream obj = new ByteArrayOutputStream(); + final GZIPOutputStream gzip = new GZIPOutputStream(obj); + gzip.write(str.getBytes("UTF-8")); + gzip.flush(); + gzip.close(); + return obj.toByteArray(); + } catch (final IOException e) { + throw new RuntimeException("error in gzip", e); + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java new file mode 100644 index 000000000..4c2766754 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java @@ -0,0 +1,50 @@ +package eu.dnetlib.dhp.oa.oaipmh; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.List; + +public class OaiRecordWrapper implements Serializable { + + private static final long serialVersionUID = 8997046455575004880L; + + private String id; + private byte[] body; + private LocalDateTime date; + private List sets; + + public OaiRecordWrapper() {} + + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + public byte[] getBody() { + return this.body; + } + + public void setBody(final byte[] body) { + this.body = body; + } + + public LocalDateTime getDate() { + return this.date; + } + + public void setDate(final LocalDateTime date) { + this.date = date; + } + + public List getSets() { + return this.sets; + } + + public void setSets(final List sets) { + this.sets = sets; + } + +}