From 81090ad593b1bb1572c033989c86e79f795670e6 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 5 Jun 2024 10:03:33 +0200 Subject: [PATCH] [IE OAIPHM] added oozie workflow, minor changes, code formatting --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 68 ++++++----- .../dhp/oa/oaipmh/oozie_app/workflow.xml | 106 ++++++++++++++++++ .../dhp/oa/oaipmh/DbSerializationTest.java | 14 +-- .../oa/oaipmh/IrishOaiExporterJobTest.java | 3 +- 4 files changed, 155 insertions(+), 36 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index 433baf272..57f180fa0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -46,15 +46,16 @@ public class IrishOaiExporterJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString(XmlConverterJob.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); + IOUtils + .toString( + XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); @@ -62,9 +63,9 @@ public class IrishOaiExporterJob { final String dbUser = parser.get("dbUser"); final String dbPwd = parser.get("dbPwd"); final int numConnections = Optional - .ofNullable(parser.get("numConnections")) - .map(Integer::valueOf) - .orElse(NUM_CONNECTIONS); + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); log.info("inputPath: '{}'", inputPath); log.info("dbUrl: '{}'", dbUrl); @@ -78,29 +79,31 @@ public class IrishOaiExporterJob { final SparkConf conf = new SparkConf(); conf.registerKryoClasses(new Class[] { - SerializableSolrInputDocument.class + SerializableSolrInputDocument.class }); final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + final String date = LocalDateTime.now().toString(); + log.info("Creating temporary table..."); runWithSparkSession(conf, isSparkSessionManaged, spark -> { final Dataset docs = spark - .read() - .schema(encoderTuple.schema()) - .json(inputPath) - .as(encoderTuple) - .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) - .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) - .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); + .read() + .schema(encoderTuple.schema()) + .json(inputPath) + .as(encoderTuple) + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map((MapFunction) r -> asIrishOaiResult(r, date), encoderOaiRecord) + .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); docs - .repartition(numConnections) - .write() - .mode(SaveMode.Overwrite) - .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); + .repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); }); log.info("Temporary table created."); @@ -108,14 +111,15 @@ public class IrishOaiExporterJob { log.info("Updating OAI records..."); try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { try (final Statement st = con.createStatement()) { - final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); + final String query = IOUtils + .toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); st.execute(query); } } log.info("DONE."); } - protected static OaiRecordWrapper asIrishOaiResult(final String xml) { + protected static OaiRecordWrapper asIrishOaiResult(final String xml, final String date) { try { final Document doc = DocumentHelper.parseText(xml); final OaiRecordWrapper r = new OaiRecordWrapper(); @@ -123,7 +127,7 @@ public class IrishOaiExporterJob { if (isValid(doc)) { r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); - r.setDate(LocalDateTime.now().toString()); + r.setDate(date); r.setSets(new ArrayList<>()); } return r; @@ -140,19 +144,25 @@ public class IrishOaiExporterJob { if (n != null) { for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) { - if ("true".equals(((Node) o).getText().trim())) { return false; } + if ("true".equals(((Node) o).getText().trim())) { + return false; + } } // verify the main country of the result for (final Object o : n.selectNodes("./*[local-name()='country']")) { - if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; } + if ("IE".equals(((Node) o).valueOf("@classid").trim())) { + return true; + } } // verify the countries of the related organizations for (final Object o : n.selectNodes(".//*[local-name()='rel']")) { final String relType = ((Node) o).valueOf("./*[local-name() = 'to']/@type").trim(); final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim(); - if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; } + if ("organization".equals(relType) && "IE".equals(relCountry)) { + return true; + } } } return false; @@ -160,7 +170,9 @@ public class IrishOaiExporterJob { } protected static byte[] gzip(final String str) { - if (StringUtils.isBlank(str)) { return null; } + if (StringUtils.isBlank(str)) { + return null; + } try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { try (final GZIPOutputStream gzip = new GZIPOutputStream(baos)) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml new file mode 100644 index 000000000..c4caad91e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml @@ -0,0 +1,106 @@ + + + + + inputPath + The path of the input records on HDFS + + + numConnections + number of connections to the postgres db (for the write operation) + + + dbUrl + the url of the database + + + dbUser + the user of the database + + + dbPwd + the password for the user of the database + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Irish OAI-PHM provision + eu.dnetlib.dhp.oa.oaipmh.IrishOaiExporterJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=8000 + + --inputPath${inputPath} + --numConnections${numConnections} + --dbUrl${dbUrl} + --dbUser${dbUser} + --dbPwd${dbPwd} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java index f33708f86..d487fda94 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java @@ -42,10 +42,10 @@ public class DbSerializationTest { conf.set("spark.driver.host", "localhost"); spark = SparkSession - .builder() - .appName("TEST") - .config(conf) - .getOrCreate(); + .builder() + .appName("TEST") + .config(conf) + .getOrCreate(); } @AfterAll @@ -79,9 +79,9 @@ public class DbSerializationTest { final Dataset docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class)); docs - .write() - .mode(SaveMode.Overwrite) - .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); }); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java index 57a32e246..c16f75e1d 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.Charset; +import java.time.LocalDateTime; import java.util.zip.GZIPInputStream; import org.apache.commons.io.IOUtils; @@ -23,7 +24,7 @@ public class IrishOaiExporterJobTest { @Test void testAsIrishOaiResult() throws Exception { final String xml = IOUtils.toString(getClass().getResourceAsStream("record_IE.xml")); - final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml); + final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml, LocalDateTime.now().toString()); assertNotNull(res.getId()); assertNotNull(res.getBody()); assertNotNull(res.getSets());