From ed052a3476bf5c8980412b0d1b8387491d761ab2 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 6 May 2024 16:08:33 +0200 Subject: [PATCH 1/7] job for the population of the oai database --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 156 ++++++++++++++++++ .../dhp/oa/oaipmh/OaiRecordWrapper.java | 50 ++++++ 2 files changed, 206 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java new file mode 100644 index 000000000..9a608b6fa --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -0,0 +1,156 @@ +package eu.dnetlib.dhp.oa.oaipmh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Optional; +import java.util.Properties; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.dom4j.Document; +import org.dom4j.DocumentHelper; +import org.dom4j.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.provision.XmlConverterJob; +import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument; +import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; + +public class IrishOaiExporterJob { + + private static final Logger log = LoggerFactory.getLogger(IrishOaiExporterJob.class); + + protected static final int NUM_CONNECTIONS = 20; + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString(XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + final String dbUrl = parser.get("dbUrl"); + final String dbTable = parser.get("dbTable"); + final String dbUser = parser.get("dbUser"); + final String dbPwd = parser.get("dbPwd"); + final int numConnections = Optional + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); + + log.info("inputPath: '{}'", inputPath); + log.info("dbUrl: '{}'", dbUrl); + log.info("dbUser: '{}'", dbUser); + log.info("table: '{}'", dbTable); + log.info("dbPwd: '{}'", "xxx"); + log.info("numPartitions: '{}'", numConnections); + + final Properties connectionProperties = new Properties(); + connectionProperties.put("user", dbUser); + connectionProperties.put("password", dbPwd); + + final SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[] { + SerializableSolrInputDocument.class + }); + + final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); + final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + final Dataset docs = spark + .read() + .schema(encoderTuple.schema()) + .json(inputPath) + .as(encoderTuple) + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) + .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); + + docs.repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, dbTable, connectionProperties); + + }); + } + + private static OaiRecordWrapper asIrishOaiResult(final String xml) { + try { + final Document doc = DocumentHelper.parseText(xml); + final OaiRecordWrapper r = new OaiRecordWrapper(); + + if (isValid(doc)) { + r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); + r.setBody(gzip(xml)); + r.setDate(LocalDateTime.now()); + r.setSets(new ArrayList<>()); + } + return r; + } catch (final Exception e) { + log.error("Error parsing record: " + xml, e); + throw new RuntimeException("Error parsing record: " + xml, e); + } + } + + private static boolean isValid(final Document doc) { + + final Node n = doc.selectSingleNode("//*[local-name()='entity']/*[local-name()='result']"); + + if (n != null) { + for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) { + if ("true".equals(((Node) o).getText().trim())) { return false; } + } + + for (final Object o : n.selectNodes("./*[local-name()='country']")) { + if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; } + } + + for (final Object o : n.selectNodes(".//*[local-name()='rel']")) { + final String relType = ((Node) o).valueOf("./[local-name() = 'to']/@type").trim(); + final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim(); + if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; } + } + } + return false; + + } + + private static byte[] gzip(final String str) { + if (StringUtils.isBlank(str)) { return null; } + + try { + final ByteArrayOutputStream obj = new ByteArrayOutputStream(); + final GZIPOutputStream gzip = new GZIPOutputStream(obj); + gzip.write(str.getBytes("UTF-8")); + gzip.flush(); + gzip.close(); + return obj.toByteArray(); + } catch (final IOException e) { + throw new RuntimeException("error in gzip", e); + } + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java new file mode 100644 index 000000000..4c2766754 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java @@ -0,0 +1,50 @@ +package eu.dnetlib.dhp.oa.oaipmh; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.List; + +public class OaiRecordWrapper implements Serializable { + + private static final long serialVersionUID = 8997046455575004880L; + + private String id; + private byte[] body; + private LocalDateTime date; + private List sets; + + public OaiRecordWrapper() {} + + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + public byte[] getBody() { + return this.body; + } + + public void setBody(final byte[] body) { + this.body = body; + } + + public LocalDateTime getDate() { + return this.date; + } + + public void setDate(final LocalDateTime date) { + this.date = date; + } + + public List getSets() { + return this.sets; + } + + public void setSets(final List sets) { + this.sets = sets; + } + +} From aa40e53c19acf6c8007b7819bea3e65ba642e057 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 7 May 2024 08:01:19 +0200 Subject: [PATCH 2/7] oai exporter parameters --- .../input_params_irish_oai_exporter.json | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json new file mode 100644 index 000000000..99a12927b --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json @@ -0,0 +1,38 @@ +[ + { + "paramName": "i", + "paramLongName": "inputPath", + "paramDescription": "The path of the input records on HDFS", + "paramRequired": true + }, + { + "paramName": "nc", + "paramLongName": "numConnections", + "paramDescription": "number of connections to the postgres db (for the write operation)", + "paramRequired": false + }, + { + "paramName": "du", + "paramLongName": "dbUrl", + "paramDescription": "the url of the database", + "paramRequired": true + }, + { + "paramName": "dusr", + "paramLongName": "dbUser", + "paramDescription": "the user of the database", + "paramRequired": true + }, + { + "paramName": "t", + "paramLongName": "dbTable", + "paramDescription": "the name of the table in the database", + "paramRequired": true + }, + { + "paramName": "dpwd", + "paramLongName": "dbPwd", + "paramDescription": "the password for the user of the database", + "paramRequired": true + } +] From 70bf6ac41561d487109a04ef60b0659a8785d989 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 7 May 2024 09:36:26 +0200 Subject: [PATCH 3/7] oai exporter tests --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 11 ++- .../oa/oaipmh/IrishOaiExporterJobTest.java | 93 +++++++++++++++++++ .../eu/dnetlib/dhp/oa/oaipmh/record_IE.xml | 89 ++++++++++++++++++ .../dhp/oa/oaipmh/record_IE_deleted.xml | 89 ++++++++++++++++++ .../eu/dnetlib/dhp/oa/oaipmh/record_IT.xml | 66 +++++++++++++ 5 files changed, 344 insertions(+), 4 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE_deleted.xml create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IT.xml diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index 9a608b6fa..e2ae890e5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -98,7 +98,7 @@ public class IrishOaiExporterJob { }); } - private static OaiRecordWrapper asIrishOaiResult(final String xml) { + protected static OaiRecordWrapper asIrishOaiResult(final String xml) { try { final Document doc = DocumentHelper.parseText(xml); final OaiRecordWrapper r = new OaiRecordWrapper(); @@ -116,21 +116,24 @@ public class IrishOaiExporterJob { } } - private static boolean isValid(final Document doc) { + protected static boolean isValid(final Document doc) { final Node n = doc.selectSingleNode("//*[local-name()='entity']/*[local-name()='result']"); if (n != null) { + for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) { if ("true".equals(((Node) o).getText().trim())) { return false; } } + // verify the main country of the result for (final Object o : n.selectNodes("./*[local-name()='country']")) { if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; } } + // verify the countries of the related organizations for (final Object o : n.selectNodes(".//*[local-name()='rel']")) { - final String relType = ((Node) o).valueOf("./[local-name() = 'to']/@type").trim(); + final String relType = ((Node) o).valueOf("./*[local-name() = 'to']/@type").trim(); final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim(); if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; } } @@ -139,7 +142,7 @@ public class IrishOaiExporterJob { } - private static byte[] gzip(final String str) { + protected static byte[] gzip(final String str) { if (StringUtils.isBlank(str)) { return null; } try { diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java new file mode 100644 index 000000000..6140b0907 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -0,0 +1,93 @@ +package eu.dnetlib.dhp.oa.oaipmh; + +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; +import org.junit.jupiter.api.Test; + +class IrishOaiExporterJobTest { + + @Test + void testAsIrishOaiResult() throws Exception { + final String xml = IOUtils.toString(getClass().getResourceAsStream("record_IE.xml")); + final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml); + assertNotNull(res.getId()); + assertNotNull(res.getBody()); + assertNotNull(res.getSets()); + assertNotNull(res.getDate()); + assertEquals("dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0", res.getId()); + assertTrue(res.getBody().length > 0); + assertTrue(res.getSets().isEmpty()); + } + + @Test + void testIsValid_IE() throws DocumentException { + final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IE.xml")); + assertTrue(IrishOaiExporterJob.isValid(doc)); + } + + @Test + void testIsValid_invalid_country() throws DocumentException { + final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IT.xml")); + assertFalse(IrishOaiExporterJob.isValid(doc)); + } + + @Test + void testIsValid_deleted() throws DocumentException { + final Document doc = new SAXReader().read(getClass().getResourceAsStream("record_IE_deleted.xml")); + assertFalse(IrishOaiExporterJob.isValid(doc)); + } + + @Test + void testGzip_simple() { + final String message = ""; + final byte[] bytes = IrishOaiExporterJob.gzip(message); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + assertEquals(message, decompress(bytes)); + } + + @Test + void testGzip_empty() { + assertNull(IrishOaiExporterJob.gzip("")); + assertNull(IrishOaiExporterJob.gzip(null)); + } + + private static String decompress(final byte[] compressed) { + final StringBuilder outStr = new StringBuilder(); + if ((compressed == null) || (compressed.length == 0)) { return null; } + try { + if (isCompressed(compressed)) { + final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed)); + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, "UTF-8")); + + String line; + while ((line = bufferedReader.readLine()) != null) { + outStr.append(line); + } + } else { + outStr.append(compressed); + } + return outStr.toString(); + } catch (final IOException e) { + throw new RuntimeException("error in gunzip", e); + } + } + + private static boolean isCompressed(final byte[] compressed) { + return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE.xml new file mode 100644 index 000000000..01b7334f8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE.xml @@ -0,0 +1,89 @@ + + +
+ dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0 + 2023-03-31T18:37:45.599Z + 2023-03-31T18:45:52.701Z +
+ + + + + + 50|od______6005::55a12e2e0fee45ce8005633c6c17fe9f + oai:repository.wit.ie:3029 + 50|od_______934::e7162a5632264cd622ee7180ca66fdce + oai:generic.eprints.org:3029 + 50|od_______934::55a12e2e0fee45ce8005633c6c17fe9f + + + + + + http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf + A service innovation capability maturity model for SMEs + + Blommerde, Tadhg + Lynch, Patrick + + 2015-04-28 + There is general consensus that service innovations are prerequisite to sustained competitive advantage and are an essential mechanism for responding to changes in customer needs and the operating environment of firms (Giannopoulou et al., 2011; Stryja et al., 2013). Services have been described as ubiquitous in their role of generating economic growth and wellbeing and represent over 70% of employment and GDP in developed nations (Janssen et al., 2012; Mustak, 2014). As a consequence, service innovations must be a core ambition of all countries, regions, and firms wishing to remain competitive (van Ark et al., 2003). While acknowledging the importance of once-off innovations, more critical still is the capability to repeatedly introduce and exploit service innovations (Siguaw et al., 2006). This is generally referred to as service innovation capability (SIC) and describes the repeatable routines and behaviours that organisations have in place to transform ideas and knowledge into innovations (Basterretxea and Martínez, 2012). However, despite links between SIC and continuous, sustainable, and consistent service innovations, there is evidence that many organisations struggle with its effective management (Adams et al., 2006; den Hertog et al., 2010). This is often attributed to the lack of formal guidance available and the absence of metrics to determine an organisation’s SIC performance (Hogan et al., 2011; Szczygielski, 2011). Maturity modelling research in this discipline remains at an embryonic stage, thus far presenting only conceptual and opaque discussions that fail to address the necessity for an assessment and strategic management framework (Gryszkiewicz et al., 2013; Hipp and Grupp, 2005). Therefore, the purpose of this ongoing research project is to evaluate the maturity of an organisation’s SIC to inform its effective management and enhancement. To achieve this it dimensionalises the concept into four constituent capabilities, specifically, strategising, customer involvement, knowledge management, and networking (Blommerde and Lynch, 2014). The study then tracks the maturity of these capabilities as they progress through eight evolutionary plateaus towards a fully developed or optimal state. This is accomplished through a capability maturity model that enables organisations to rapidly diagnose key areas of strength and weakness to systematically cultivate behaviours that leverage their untapped innovative potential (Wendler, 2012; Essmann and du Preez, 2010). As a result of the immense knowledge vacuum characteristic of this discipline, it is anticipated that this ongoing research project will make a substantial contribution to both academic understanding and take strides towards filling the void in practical support (Rapaccini et al., 2013). It expands the service innovation literature by detailing key service innovation levers, bolsters the discipline through clear definitions of terminology, provides a powerful explanation of the development of SICs, and operationalises the dynamic capabilities view through a novel self-assessment reference model (Jochem et al., 2011). The next step in the project is the evaluation of the, as yet, conceptual service innovation capability maturity model. Adopting a positivistic philosophical stance, the study proposes the use of structural equation modelling on data gathered through an extensive survey to confirm the model and support theoretical assumptions. + RIKON (Research in Inovation, Knowledge & Organisational Networks) + + application/pdf + + + false + false + true + + + + true + false + 0.8 + dedup-result-decisiontree-v4 + + + + + openorgs____::54cd984fc7d3b153ec2181f985041f02 + + WIT + South East Technological University + + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + + + + 2015-04-28 + + + http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf + + http://repository.wit.ie/3029/ + + + + + + +
+
\ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE_deleted.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE_deleted.xml new file mode 100644 index 000000000..00d225aa5 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IE_deleted.xml @@ -0,0 +1,89 @@ + + +
+ dedup_wf_002::532be02f990b479a1da46d71f1a4c3f0 + 2023-03-31T18:37:45.599Z + 2023-03-31T18:45:52.701Z +
+ + + + + + 50|od______6005::55a12e2e0fee45ce8005633c6c17fe9f + oai:repository.wit.ie:3029 + 50|od_______934::e7162a5632264cd622ee7180ca66fdce + oai:generic.eprints.org:3029 + 50|od_______934::55a12e2e0fee45ce8005633c6c17fe9f + + + + + + http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf + A service innovation capability maturity model for SMEs + + Blommerde, Tadhg + Lynch, Patrick + + 2015-04-28 + There is general consensus that service innovations are prerequisite to sustained competitive advantage and are an essential mechanism for responding to changes in customer needs and the operating environment of firms (Giannopoulou et al., 2011; Stryja et al., 2013). Services have been described as ubiquitous in their role of generating economic growth and wellbeing and represent over 70% of employment and GDP in developed nations (Janssen et al., 2012; Mustak, 2014). As a consequence, service innovations must be a core ambition of all countries, regions, and firms wishing to remain competitive (van Ark et al., 2003). While acknowledging the importance of once-off innovations, more critical still is the capability to repeatedly introduce and exploit service innovations (Siguaw et al., 2006). This is generally referred to as service innovation capability (SIC) and describes the repeatable routines and behaviours that organisations have in place to transform ideas and knowledge into innovations (Basterretxea and Martínez, 2012). However, despite links between SIC and continuous, sustainable, and consistent service innovations, there is evidence that many organisations struggle with its effective management (Adams et al., 2006; den Hertog et al., 2010). This is often attributed to the lack of formal guidance available and the absence of metrics to determine an organisation’s SIC performance (Hogan et al., 2011; Szczygielski, 2011). Maturity modelling research in this discipline remains at an embryonic stage, thus far presenting only conceptual and opaque discussions that fail to address the necessity for an assessment and strategic management framework (Gryszkiewicz et al., 2013; Hipp and Grupp, 2005). Therefore, the purpose of this ongoing research project is to evaluate the maturity of an organisation’s SIC to inform its effective management and enhancement. To achieve this it dimensionalises the concept into four constituent capabilities, specifically, strategising, customer involvement, knowledge management, and networking (Blommerde and Lynch, 2014). The study then tracks the maturity of these capabilities as they progress through eight evolutionary plateaus towards a fully developed or optimal state. This is accomplished through a capability maturity model that enables organisations to rapidly diagnose key areas of strength and weakness to systematically cultivate behaviours that leverage their untapped innovative potential (Wendler, 2012; Essmann and du Preez, 2010). As a result of the immense knowledge vacuum characteristic of this discipline, it is anticipated that this ongoing research project will make a substantial contribution to both academic understanding and take strides towards filling the void in practical support (Rapaccini et al., 2013). It expands the service innovation literature by detailing key service innovation levers, bolsters the discipline through clear definitions of terminology, provides a powerful explanation of the development of SICs, and operationalises the dynamic capabilities view through a novel self-assessment reference model (Jochem et al., 2011). The next step in the project is the evaluation of the, as yet, conceptual service innovation capability maturity model. Adopting a positivistic philosophical stance, the study proposes the use of structural equation modelling on data gathered through an extensive survey to confirm the model and support theoretical assumptions. + RIKON (Research in Inovation, Knowledge & Organisational Networks) + + application/pdf + + + false + false + true + + + + true + true + 0.8 + dedup-result-decisiontree-v4 + + + + + openorgs____::54cd984fc7d3b153ec2181f985041f02 + + WIT + South East Technological University + + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + A service innovation capability maturity model for SMEs + 2015-04-28 + + + + + + + 2015-04-28 + + + http://repository.wit.ie/3029/1/Research%20Day%202015%20-%20Poster%20Tadhg%20Blommerde.pdf + + http://repository.wit.ie/3029/ + + + + + + +
+
\ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IT.xml b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IT.xml new file mode 100644 index 000000000..7649589d1 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/oaipmh/record_IT.xml @@ -0,0 +1,66 @@ + + +
+ od_______310::02365c51a0ed7cbb54b2bbc7c0426d1b + 2024-04-06T06:05:16+0000 + 2024-04-06T06:56:01.776Z +
+ + + + + 50|od_______310::02365c51a0ed7cbb54b2bbc7c0426d1b + oai:flore.unifi.it:2158/608965 + 2158/608965 + + + + + + Estorsione (art. 629) + + MACRI', FRANCESCO + + 2011-01-01 + + 2011-01-01 + 2011-01-01 + 2015-04-28 + UTET + + + + false + false + 0.9 + null + + + + + openorgs____::41406edad82942e9e0b29317b8a847e2 + University of Florence + + University of Florence + + + + + + + + 2011-01-01 + + 2158/608965 + http://hdl.handle.net/2158/608965 + + + https://hdl.handle.net/2158/608965 + + + + + + +
+
\ No newline at end of file From e234848af8b0a313a0c8b3988d2ceb4f425edc78 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 8 May 2024 10:00:53 +0200 Subject: [PATCH 4/7] oaf record: xpath for root --- .../main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index e2ae890e5..fff5d015d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -105,7 +105,7 @@ public class IrishOaiExporterJob { if (isValid(doc)) { r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); - r.setBody(gzip(xml)); + r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); r.setDate(LocalDateTime.now()); r.setSets(new ArrayList<>()); } From c9a327bc5094f48c08f4d7d0b3274378d8d8c63f Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 8 May 2024 11:34:08 +0200 Subject: [PATCH 5/7] refactoring of gzip method --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 14 +++++------ .../oa/oaipmh/IrishOaiExporterJobTest.java | 24 +++++-------------- 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index fff5d015d..b59f0ae73 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -4,6 +4,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Optional; @@ -145,15 +146,14 @@ public class IrishOaiExporterJob { protected static byte[] gzip(final String str) { if (StringUtils.isBlank(str)) { return null; } - try { - final ByteArrayOutputStream obj = new ByteArrayOutputStream(); - final GZIPOutputStream gzip = new GZIPOutputStream(obj); - gzip.write(str.getBytes("UTF-8")); - gzip.flush(); - gzip.close(); - return obj.toByteArray(); + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + try (final GZIPOutputStream gzip = new GZIPOutputStream(baos)) { + IOUtils.write(str.getBytes(Charset.defaultCharset()), gzip); + } + return baos.toByteArray(); } catch (final IOException e) { throw new RuntimeException("error in gzip", e); } } + } diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java index 6140b0907..e33c701c5 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -6,10 +6,9 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStreamReader; +import java.nio.charset.Charset; import java.util.zip.GZIPInputStream; import org.apache.commons.io.IOUtils; @@ -57,7 +56,7 @@ class IrishOaiExporterJobTest { final byte[] bytes = IrishOaiExporterJob.gzip(message); assertNotNull(bytes); assertTrue(bytes.length > 0); - assertEquals(message, decompress(bytes)); + assertEquals(message, gunzip(bytes)); } @Test @@ -66,22 +65,11 @@ class IrishOaiExporterJobTest { assertNull(IrishOaiExporterJob.gzip(null)); } - private static String decompress(final byte[] compressed) { - final StringBuilder outStr = new StringBuilder(); + public static String gunzip(final byte[] compressed) { if ((compressed == null) || (compressed.length == 0)) { return null; } - try { - if (isCompressed(compressed)) { - final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed)); - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, "UTF-8")); - - String line; - while ((line = bufferedReader.readLine()) != null) { - outStr.append(line); - } - } else { - outStr.append(compressed); - } - return outStr.toString(); + if (!isCompressed(compressed)) { return new String(compressed); } + try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { + return IOUtils.toString(gis, Charset.defaultCharset()); } catch (final IOException e) { throw new RuntimeException("error in gunzip", e); } From 2b3b5fe9a172bb1fafb4815a5c52aa9fcaff6644 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Wed, 15 May 2024 14:13:16 +0200 Subject: [PATCH 6/7] oai finalization and test --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 26 ++++- .../dhp/oa/oaipmh/OaiRecordWrapper.java | 11 ++- .../input_params_irish_oai_exporter.json | 6 -- .../eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql | 12 +++ .../dhp/oa/oaipmh/DbSerializationTest.java | 97 +++++++++++++++++++ .../oa/oaipmh/IrishOaiExporterJobTest.java | 14 ++- 6 files changed, 146 insertions(+), 20 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index b59f0ae73..433baf272 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.oaipmh; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; @@ -5,6 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Optional; @@ -37,6 +41,8 @@ public class IrishOaiExporterJob { protected static final int NUM_CONNECTIONS = 20; + public static final String TMP_OAI_TABLE = "temp_oai_data"; + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -53,7 +59,6 @@ public class IrishOaiExporterJob { final String inputPath = parser.get("inputPath"); final String dbUrl = parser.get("dbUrl"); - final String dbTable = parser.get("dbTable"); final String dbUser = parser.get("dbUser"); final String dbPwd = parser.get("dbPwd"); final int numConnections = Optional @@ -64,7 +69,6 @@ public class IrishOaiExporterJob { log.info("inputPath: '{}'", inputPath); log.info("dbUrl: '{}'", dbUrl); log.info("dbUser: '{}'", dbUser); - log.info("table: '{}'", dbTable); log.info("dbPwd: '{}'", "xxx"); log.info("numPartitions: '{}'", numConnections); @@ -80,6 +84,7 @@ public class IrishOaiExporterJob { final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + log.info("Creating temporary table..."); runWithSparkSession(conf, isSparkSessionManaged, spark -> { final Dataset docs = spark @@ -91,12 +96,23 @@ public class IrishOaiExporterJob { .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); - docs.repartition(numConnections) + docs + .repartition(numConnections) .write() .mode(SaveMode.Overwrite) - .jdbc(dbUrl, dbTable, connectionProperties); + .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); }); + log.info("Temporary table created."); + + log.info("Updating OAI records..."); + try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { + try (final Statement st = con.createStatement()) { + final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); + st.execute(query); + } + } + log.info("DONE."); } protected static OaiRecordWrapper asIrishOaiResult(final String xml) { @@ -107,7 +123,7 @@ public class IrishOaiExporterJob { if (isValid(doc)) { r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); - r.setDate(LocalDateTime.now()); + r.setDate(LocalDateTime.now().toString()); r.setSets(new ArrayList<>()); } return r; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java index 4c2766754..2fdf32c96 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/OaiRecordWrapper.java @@ -1,7 +1,7 @@ + package eu.dnetlib.dhp.oa.oaipmh; import java.io.Serializable; -import java.time.LocalDateTime; import java.util.List; public class OaiRecordWrapper implements Serializable { @@ -10,10 +10,11 @@ public class OaiRecordWrapper implements Serializable { private String id; private byte[] body; - private LocalDateTime date; + private String date; private List sets; - public OaiRecordWrapper() {} + public OaiRecordWrapper() { + } public String getId() { return this.id; @@ -31,11 +32,11 @@ public class OaiRecordWrapper implements Serializable { this.body = body; } - public LocalDateTime getDate() { + public String getDate() { return this.date; } - public void setDate(final LocalDateTime date) { + public void setDate(final String date) { this.date = date; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json index 99a12927b..86b2bb0d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json @@ -23,12 +23,6 @@ "paramDescription": "the user of the database", "paramRequired": true }, - { - "paramName": "t", - "paramLongName": "dbTable", - "paramDescription": "the name of the table in the database", - "paramRequired": true - }, { "paramName": "dpwd", "paramLongName": "dbPwd", diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql new file mode 100644 index 000000000..1ec0dfee0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oai-finalize.sql @@ -0,0 +1,12 @@ +BEGIN; + +DELETE FROM oai_data; + +INSERT INTO oai_data(id, body, date, sets) SELECT + id, + body, + date::timestamp, + sets +FROM temp_oai_data; + +COMMIT; diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java new file mode 100644 index 000000000..f33708f86 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java @@ -0,0 +1,97 @@ + +package eu.dnetlib.dhp.oa.oaipmh; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Disabled +public class DbSerializationTest { + + private static SparkSession spark; + + public static final String dbUrl = "jdbc:postgresql://localhost:5432/db_test"; + public static final String dbUser = null; + public static final String dbPwd = null; + + @BeforeAll + public static void beforeAll() throws IOException { + + final SparkConf conf = new SparkConf(); + conf.setAppName("TEST"); + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + + spark = SparkSession + .builder() + .appName("TEST") + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + spark.stop(); + } + + @Test + public void testDatabaseSerialization() throws Exception { + final Properties connectionProperties = new Properties(); + if (dbUser != null) { + connectionProperties.put("user", dbUser); + } + if (dbPwd != null) { + connectionProperties.put("password", dbPwd); + } + + runWithSparkSession(new SparkConf(), false, spark -> { + + final List list = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + final OaiRecordWrapper r = new OaiRecordWrapper(); + r.setId("record_" + i); + r.setBody("jsahdjkahdjahdajad".getBytes()); + r.setDate(LocalDateTime.now().toString()); + r.setSets(Arrays.asList()); + list.add(r); + } + + final Dataset docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class)); + + docs + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); + + }); + + try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { + try (final Statement st = con.createStatement()) { + final String query = IOUtils.toString(getClass().getResourceAsStream("oai-finalize.sql")); + st.execute(query); + } + } + + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java index e33c701c5..57a32e246 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -1,3 +1,4 @@ + package eu.dnetlib.dhp.oa.oaipmh; import static org.junit.Assert.assertNull; @@ -17,7 +18,7 @@ import org.dom4j.DocumentException; import org.dom4j.io.SAXReader; import org.junit.jupiter.api.Test; -class IrishOaiExporterJobTest { +public class IrishOaiExporterJobTest { @Test void testAsIrishOaiResult() throws Exception { @@ -66,8 +67,12 @@ class IrishOaiExporterJobTest { } public static String gunzip(final byte[] compressed) { - if ((compressed == null) || (compressed.length == 0)) { return null; } - if (!isCompressed(compressed)) { return new String(compressed); } + if ((compressed == null) || (compressed.length == 0)) { + return null; + } + if (!isCompressed(compressed)) { + return new String(compressed); + } try (final GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(compressed))) { return IOUtils.toString(gis, Charset.defaultCharset()); } catch (final IOException e) { @@ -76,6 +81,7 @@ class IrishOaiExporterJobTest { } private static boolean isCompressed(final byte[] compressed) { - return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); + return (compressed[0] == (byte) GZIPInputStream.GZIP_MAGIC) + && (compressed[1] == (byte) (GZIPInputStream.GZIP_MAGIC >> 8)); } } From 81090ad593b1bb1572c033989c86e79f795670e6 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 5 Jun 2024 10:03:33 +0200 Subject: [PATCH 7/7] [IE OAIPHM] added oozie workflow, minor changes, code formatting --- .../dhp/oa/oaipmh/IrishOaiExporterJob.java | 68 ++++++----- .../dhp/oa/oaipmh/oozie_app/workflow.xml | 106 ++++++++++++++++++ .../dhp/oa/oaipmh/DbSerializationTest.java | 14 +-- .../oa/oaipmh/IrishOaiExporterJobTest.java | 3 +- 4 files changed, 155 insertions(+), 36 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java index 433baf272..57f180fa0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJob.java @@ -46,15 +46,16 @@ public class IrishOaiExporterJob { public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString(XmlConverterJob.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); + IOUtils + .toString( + XmlConverterJob.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/oaipmh/input_params_irish_oai_exporter.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); @@ -62,9 +63,9 @@ public class IrishOaiExporterJob { final String dbUser = parser.get("dbUser"); final String dbPwd = parser.get("dbPwd"); final int numConnections = Optional - .ofNullable(parser.get("numConnections")) - .map(Integer::valueOf) - .orElse(NUM_CONNECTIONS); + .ofNullable(parser.get("numConnections")) + .map(Integer::valueOf) + .orElse(NUM_CONNECTIONS); log.info("inputPath: '{}'", inputPath); log.info("dbUrl: '{}'", dbUrl); @@ -78,29 +79,31 @@ public class IrishOaiExporterJob { final SparkConf conf = new SparkConf(); conf.registerKryoClasses(new Class[] { - SerializableSolrInputDocument.class + SerializableSolrInputDocument.class }); final Encoder encoderTuple = Encoders.bean(TupleWrapper.class); final Encoder encoderOaiRecord = Encoders.bean(OaiRecordWrapper.class); + final String date = LocalDateTime.now().toString(); + log.info("Creating temporary table..."); runWithSparkSession(conf, isSparkSessionManaged, spark -> { final Dataset docs = spark - .read() - .schema(encoderTuple.schema()) - .json(inputPath) - .as(encoderTuple) - .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) - .map((MapFunction) IrishOaiExporterJob::asIrishOaiResult, encoderOaiRecord) - .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); + .read() + .schema(encoderTuple.schema()) + .json(inputPath) + .as(encoderTuple) + .map((MapFunction) TupleWrapper::getXml, Encoders.STRING()) + .map((MapFunction) r -> asIrishOaiResult(r, date), encoderOaiRecord) + .filter((FilterFunction) obj -> (obj != null) && StringUtils.isNotBlank(obj.getId())); docs - .repartition(numConnections) - .write() - .mode(SaveMode.Overwrite) - .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); + .repartition(numConnections) + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, TMP_OAI_TABLE, connectionProperties); }); log.info("Temporary table created."); @@ -108,14 +111,15 @@ public class IrishOaiExporterJob { log.info("Updating OAI records..."); try (final Connection con = DriverManager.getConnection(dbUrl, dbUser, dbPwd)) { try (final Statement st = con.createStatement()) { - final String query = IOUtils.toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); + final String query = IOUtils + .toString(IrishOaiExporterJob.class.getResourceAsStream("oai-finalize.sql")); st.execute(query); } } log.info("DONE."); } - protected static OaiRecordWrapper asIrishOaiResult(final String xml) { + protected static OaiRecordWrapper asIrishOaiResult(final String xml, final String date) { try { final Document doc = DocumentHelper.parseText(xml); final OaiRecordWrapper r = new OaiRecordWrapper(); @@ -123,7 +127,7 @@ public class IrishOaiExporterJob { if (isValid(doc)) { r.setId(doc.valueOf("//*[local-name()='objIdentifier']").trim()); r.setBody(gzip(doc.selectSingleNode("//*[local-name()='entity']").asXML())); - r.setDate(LocalDateTime.now().toString()); + r.setDate(date); r.setSets(new ArrayList<>()); } return r; @@ -140,19 +144,25 @@ public class IrishOaiExporterJob { if (n != null) { for (final Object o : n.selectNodes(".//*[local-name()='datainfo']/*[local-name()='deletedbyinference']")) { - if ("true".equals(((Node) o).getText().trim())) { return false; } + if ("true".equals(((Node) o).getText().trim())) { + return false; + } } // verify the main country of the result for (final Object o : n.selectNodes("./*[local-name()='country']")) { - if ("IE".equals(((Node) o).valueOf("@classid").trim())) { return true; } + if ("IE".equals(((Node) o).valueOf("@classid").trim())) { + return true; + } } // verify the countries of the related organizations for (final Object o : n.selectNodes(".//*[local-name()='rel']")) { final String relType = ((Node) o).valueOf("./*[local-name() = 'to']/@type").trim(); final String relCountry = ((Node) o).valueOf("./*[local-name() = 'country']/@classid").trim(); - if ("organization".equals(relType) && "IE".equals(relCountry)) { return true; } + if ("organization".equals(relType) && "IE".equals(relCountry)) { + return true; + } } } return false; @@ -160,7 +170,9 @@ public class IrishOaiExporterJob { } protected static byte[] gzip(final String str) { - if (StringUtils.isBlank(str)) { return null; } + if (StringUtils.isBlank(str)) { + return null; + } try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { try (final GZIPOutputStream gzip = new GZIPOutputStream(baos)) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml new file mode 100644 index 000000000..c4caad91e --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/oaipmh/oozie_app/workflow.xml @@ -0,0 +1,106 @@ + + + + + inputPath + The path of the input records on HDFS + + + numConnections + number of connections to the postgres db (for the write operation) + + + dbUrl + the url of the database + + + dbUser + the user of the database + + + dbPwd + the password for the user of the database + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + Irish OAI-PHM provision + eu.dnetlib.dhp.oa.oaipmh.IrishOaiExporterJob + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=8000 + + --inputPath${inputPath} + --numConnections${numConnections} + --dbUrl${dbUrl} + --dbUser${dbUser} + --dbPwd${dbPwd} + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java index f33708f86..d487fda94 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/DbSerializationTest.java @@ -42,10 +42,10 @@ public class DbSerializationTest { conf.set("spark.driver.host", "localhost"); spark = SparkSession - .builder() - .appName("TEST") - .config(conf) - .getOrCreate(); + .builder() + .appName("TEST") + .config(conf) + .getOrCreate(); } @AfterAll @@ -79,9 +79,9 @@ public class DbSerializationTest { final Dataset docs = spark.createDataset(list, Encoders.bean(OaiRecordWrapper.class)); docs - .write() - .mode(SaveMode.Overwrite) - .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); + .write() + .mode(SaveMode.Overwrite) + .jdbc(dbUrl, IrishOaiExporterJob.TMP_OAI_TABLE, connectionProperties); }); diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java index 57a32e246..c16f75e1d 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/oaipmh/IrishOaiExporterJobTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.Charset; +import java.time.LocalDateTime; import java.util.zip.GZIPInputStream; import org.apache.commons.io.IOUtils; @@ -23,7 +24,7 @@ public class IrishOaiExporterJobTest { @Test void testAsIrishOaiResult() throws Exception { final String xml = IOUtils.toString(getClass().getResourceAsStream("record_IE.xml")); - final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml); + final OaiRecordWrapper res = IrishOaiExporterJob.asIrishOaiResult(xml, LocalDateTime.now().toString()); assertNotNull(res.getId()); assertNotNull(res.getBody()); assertNotNull(res.getSets());