From 4fa5671d16006f9afafb1e58361bf65e349f81bd Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 27 May 2021 16:22:07 +0200 Subject: [PATCH 01/11] first implementation of Hdfs Mdstores Importer --- .../raw/MigrateHdfsMdstoresApplication.java | 120 ++++++++++++++++++ .../migrate_hdfs_mstores_parameters.json | 32 +++++ 2 files changed, 152 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java new file mode 100644 index 000000000..b261b5e76 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -0,0 +1,120 @@ + +package eu.dnetlib.dhp.oa.graph.raw; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Arrays; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; +import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; + +public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { + + private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); + + private final String mdstoreManagerUrl; + + private final String format; + + private final String layout; + + private final String interpretation; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString(MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String mdstoreManagerUrl = parser.get("mdstoreManagerUrl"); + final String mdFormat = parser.get("mdFormat"); + final String mdLayout = parser.get("mdLayout"); + final String mdInterpretation = parser.get("mdInterpretation"); + + final String hdfsPath = parser.get("hdfsPath"); + + final SparkConf conf = new SparkConf(); + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + try (final MigrateHdfsMdstoresApplication app = + new MigrateHdfsMdstoresApplication(hdfsPath, mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation)) { + app.execute(spark); + } + }); + } + + public MigrateHdfsMdstoresApplication(final String hdfsPath, final String mdstoreManagerUrl, final String format, final String layout, + final String interpretation) throws Exception { + super(hdfsPath); + this.mdstoreManagerUrl = mdstoreManagerUrl; + this.format = format; + this.layout = layout; + this.interpretation = interpretation; + } + + public void execute(final SparkSession spark) throws Exception { + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + final Set paths = mdstorePaths(sc); + log.info("Found " + paths.size() + " not empty mdstores"); + + spark.read() + .parquet(paths.toArray(new String[paths.size()])) + .map((MapFunction) r -> r.getAs("body"), Encoders.STRING()) + .foreach(xml -> emit(xml, String.format("%s-%s-%s", format, layout, interpretation))); + } + + private Set mdstorePaths(final JavaSparkContext sc) throws Exception { + final String url = mdstoreManagerUrl + "/mdstores"; + final ObjectMapper objectMapper = new ObjectMapper(); + + final HttpGet req = new HttpGet(url); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String json = IOUtils.toString(response.getEntity().getContent()); + final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); + return Arrays.stream(mdstores) + .filter(md -> md.getFormat().equalsIgnoreCase(format)) + .filter(md -> md.getLayout().equalsIgnoreCase(layout)) + .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) + .filter(md -> StringUtils.isNotBlank(md.getHdfsPath())) + .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) + .filter(md -> md.getSize() > 0) + .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .collect(Collectors.toSet()); + } + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json new file mode 100644 index 000000000..1d89017c5 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json @@ -0,0 +1,32 @@ +[ + { + "paramName": "p", + "paramLongName": "hdfsPath", + "paramDescription": "the path where storing the sequential file", + "paramRequired": true + }, + { + "paramName": "u", + "paramLongName": "mdstoreManagerUrl", + "paramDescription": "the MdstoreManager url", + "paramRequired": true + }, + { + "paramName": "f", + "paramLongName": "mdFormat", + "paramDescription": "metadata format", + "paramRequired": true + }, + { + "paramName": "l", + "paramLongName": "mdLayout", + "paramDescription": "metadata layout", + "paramRequired": true + }, + { + "paramName": "i", + "paramLongName": "mdInterpretation", + "paramDescription": "metadata interpretation", + "paramRequired": true + } +] \ No newline at end of file From ad56a44fda8ff76202c963326cfcb5e6a5b753f6 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 28 May 2021 14:45:39 +0200 Subject: [PATCH 02/11] save as gzipped sequence file --- .../raw/MigrateHdfsMdstoresApplication.java | 75 ++++++------ .../raw/MigrateMongoMdstoresApplication.java | 3 - .../oozie_app/config-default.xml | 18 +++ .../raw_hdfs_stores/oozie_app/workflow.xml | 109 ++++++++++++++++++ 4 files changed, 169 insertions(+), 36 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index b261b5e76..3e78edf4c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -6,10 +6,14 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Arrays; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; @@ -29,24 +33,18 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.schema.mdstore.MDStoreWithInfo; +import scala.Tuple2; public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); - private final String mdstoreManagerUrl; - - private final String format; - - private final String layout; - - private final String interpretation; - public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils - .toString(MigrateHdfsMdstoresApplication.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); + .toString( + MigrateHdfsMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json"))); parser.parseArgument(args); final Boolean isSparkSessionManaged = Optional @@ -62,39 +60,51 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication final String hdfsPath = parser.get("hdfsPath"); + final Set paths = mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation); + final SparkConf conf = new SparkConf(); runWithSparkSession(conf, isSparkSessionManaged, spark -> { - try (final MigrateHdfsMdstoresApplication app = - new MigrateHdfsMdstoresApplication(hdfsPath, mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation)) { - app.execute(spark); - } + HdfsSupport.remove(hdfsPath, spark.sparkContext().hadoopConfiguration()); + processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation)); }); } - public MigrateHdfsMdstoresApplication(final String hdfsPath, final String mdstoreManagerUrl, final String format, final String layout, - final String interpretation) throws Exception { - super(hdfsPath); - this.mdstoreManagerUrl = mdstoreManagerUrl; - this.format = format; - this.layout = layout; - this.interpretation = interpretation; - } - - public void execute(final SparkSession spark) throws Exception { + public static void processPaths(final SparkSession spark, + final String outputPath, + final Set paths, + final String type) throws Exception { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final Set paths = mdstorePaths(sc); log.info("Found " + paths.size() + " not empty mdstores"); + paths.forEach(log::info); - spark.read() - .parquet(paths.toArray(new String[paths.size()])) + final String[] validPaths = paths + .stream() + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) + .toArray(size -> new String[size]); + + spark + .read() + .parquet(validPaths) .map((MapFunction) r -> r.getAs("body"), Encoders.STRING()) - .foreach(xml -> emit(xml, String.format("%s-%s-%s", format, layout, interpretation))); + .toJavaRDD() + .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) + .coalesce(1) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + + /* + * .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch + * (final Exception e) { throw new RuntimeException(e); } }); + */ } - private Set mdstorePaths(final JavaSparkContext sc) throws Exception { - final String url = mdstoreManagerUrl + "/mdstores"; + private static Set mdstorePaths(final String mdstoreManagerUrl, + final String format, + final String layout, + final String interpretation) + throws Exception { + final String url = mdstoreManagerUrl + "/mdstores/"; final ObjectMapper objectMapper = new ObjectMapper(); final HttpGet req = new HttpGet(url); @@ -103,7 +113,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication try (final CloseableHttpResponse response = client.execute(req)) { final String json = IOUtils.toString(response.getEntity().getContent()); final MDStoreWithInfo[] mdstores = objectMapper.readValue(json, MDStoreWithInfo[].class); - return Arrays.stream(mdstores) + return Arrays + .stream(mdstores) .filter(md -> md.getFormat().equalsIgnoreCase(format)) .filter(md -> md.getLayout().equalsIgnoreCase(layout)) .filter(md -> md.getInterpretation().equalsIgnoreCase(interpretation)) @@ -111,10 +122,8 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .filter(md -> StringUtils.isNotBlank(md.getCurrentVersion())) .filter(md -> md.getSize() > 0) .map(md -> md.getHdfsPath() + "/" + md.getCurrentVersion() + "/store") - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toSet()); } } } - } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java index 9acdabb37..3f6afbeac 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.java @@ -10,9 +10,6 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.MdstoreClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml new file mode 100644 index 000000000..2c24aeb1a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml @@ -0,0 +1,109 @@ + + + + + graphOutputPath + the target path to store raw graph + + + contentPath + path location to store (or reuse) content from the aggregator + + + mdstoreManagerUrl + the address of the Mdstore Manager + + + isLookupUrl + the address of the lookUp service + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + \ No newline at end of file From e9f2b6037c313ec9ad2b97191de34f46d7d2dc5a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 31 May 2021 11:36:26 +0200 Subject: [PATCH 03/11] patch of mdstore records --- .../raw/AbstractMdRecordToOafMapper.java | 80 +++++++-- .../raw/MigrateHdfsMdstoresApplication.java | 32 +++- .../oozie_app/workflow-bak.xml | 157 ++++++++++++++++++ .../raw_hdfs_stores/oozie_app/workflow.xml | 50 +++++- .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 82 +++++++-- .../dhp/oa/graph/raw/odf_from_hdfs.xml | 77 +++++++++ 6 files changed, 448 insertions(+), 30 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index d15fbf37a..1beb616ba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -1,24 +1,65 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static eu.dnetlib.dhp.schema.common.ModelConstants.*; -import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.*; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PID_TYPES; +import static eu.dnetlib.dhp.schema.common.ModelConstants.IS_PRODUCED_BY; +import static eu.dnetlib.dhp.schema.common.ModelConstants.OUTCOME; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PRODUCES; +import static eu.dnetlib.dhp.schema.common.ModelConstants.REPOSITORY_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.RESULT_PROJECT; +import static eu.dnetlib.dhp.schema.common.ModelConstants.UNKNOWN; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.createOpenaireId; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.dataInfo; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.field; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.journal; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.keyValue; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.listFields; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.oaiIProvenance; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.qualifier; +import static eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils.structuredProperty; -import java.util.*; -import java.util.stream.Collectors; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentFactory; import org.dom4j.DocumentHelper; import org.dom4j.Node; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.AccessRight; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Context; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.GeoLocation; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Journal; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OAIProvenance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Qualifier; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -43,6 +84,8 @@ public abstract class AbstractMdRecordToOafMapper { protected static final Map nsContext = new HashMap<>(); + private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesApplication.class); + static { nsContext.put("dr", "http://www.driver-repository.eu/namespace/dr"); nsContext.put("dri", "http://www.driver-repository.eu/namespace/dri"); @@ -61,6 +104,9 @@ public abstract class AbstractMdRecordToOafMapper { } public List processMdRecord(final String xml) { + + // log.info("Processing record: " + xml); + try { DocumentFactory.getInstance().setXPathNamespaceURIs(nsContext); @@ -100,10 +146,10 @@ public abstract class AbstractMdRecordToOafMapper { } protected String getResultType(final Document doc, final List instances) { - String type = doc.valueOf("//dr:CobjCategory/@type"); + final String type = doc.valueOf("//dr:CobjCategory/@type"); if (StringUtils.isBlank(type) & vocs.vocabularyExists(ModelConstants.DNET_RESULT_TYPOLOGIES)) { - String instanceType = instances + final String instanceType = instances .stream() .map(i -> i.getInstancetype().getClassid()) .findFirst() @@ -158,8 +204,12 @@ public abstract class AbstractMdRecordToOafMapper { return oafs; } - private OafEntity createEntity(Document doc, String type, List instances, KeyValue collectedFrom, - DataInfo info, long lastUpdateTimestamp) { + private OafEntity createEntity(final Document doc, + final String type, + final List instances, + final KeyValue collectedFrom, + final DataInfo info, + final long lastUpdateTimestamp) { switch (type.toLowerCase()) { case "publication": final Publication p = new Publication(); @@ -219,9 +269,7 @@ public abstract class AbstractMdRecordToOafMapper { getRelation( docId, projectId, RESULT_PROJECT, OUTCOME, IS_PRODUCED_BY, entity, validationdDate)); res - .add( - getRelation( - projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); + .add(getRelation(projectId, docId, RESULT_PROJECT, OUTCOME, PRODUCES, entity, validationdDate)); } } @@ -411,10 +459,10 @@ public abstract class AbstractMdRecordToOafMapper { return Lists.newArrayList(id); } } - List idList = doc + final List idList = doc .selectNodes( "normalize-space(//*[local-name()='header']/*[local-name()='identifier' or local-name()='recordIdentifier']/text())"); - Set originalIds = Sets.newHashSet(idList); + final Set originalIds = Sets.newHashSet(idList); if (originalIds.isEmpty()) { throw new IllegalStateException("missing originalID on " + doc.asXML()); @@ -423,8 +471,8 @@ public abstract class AbstractMdRecordToOafMapper { } protected AccessRight prepareAccessRight(final Node node, final String xpath, final String schemeId) { - Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId); - AccessRight accessRight = new AccessRight(); + final Qualifier qualifier = prepareQualifier(node.valueOf(xpath).trim(), schemeId); + final AccessRight accessRight = new AccessRight(); accessRight.setClassid(qualifier.getClassid()); accessRight.setClassname(qualifier.getClassname()); accessRight.setSchemeid(qualifier.getSchemeid()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 3e78edf4c..e85d4c577 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -3,7 +3,10 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.StringReader; +import java.text.SimpleDateFormat; import java.util.Arrays; +import java.util.Date; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -24,6 +27,11 @@ import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.Element; +import org.dom4j.Namespace; +import org.dom4j.QName; +import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +46,8 @@ import scala.Tuple2; public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication { private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class); + private static final Namespace DRI_NS_PREFIX = new Namespace("dri", + "http://www.driver-repository.eu/namespace/dri"); public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -87,7 +97,7 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication spark .read() .parquet(validPaths) - .map((MapFunction) r -> r.getAs("body"), Encoders.STRING()) + .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) .toJavaRDD() .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) .coalesce(1) @@ -99,6 +109,26 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication */ } + private static String enrichRecord(final Row r) { + final String xml = r.getAs("body"); + + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ"); + final String collDate = dateFormat.format(new Date((Long) r.getAs("dateOfCollection"))); + final String tranDate = dateFormat.format(new Date((Long) r.getAs("dateOfTransformation"))); + + try { + final Document doc = new SAXReader().read(new StringReader(xml)); + final Element head = (Element) doc.selectSingleNode("//*[local-name() = 'header']"); + head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText(r.getAs("id")); + head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate); + head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate); + return doc.asXML(); + } catch (final Exception e) { + log.error("Error patching record: " + xml); + throw new RuntimeException("Error patching record: " + xml, e); + } + } + private static Set mdstorePaths(final String mdstoreManagerUrl, final String format, final String layout, diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml new file mode 100644 index 000000000..bfe2dff0b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml @@ -0,0 +1,157 @@ + + + + + graphOutputPath + the target path to store raw graph + + + contentPath + path location to store (or reuse) content from the aggregator + + + mdstoreManagerUrl + the address of the Mdstore Manager + + + isLookupUrl + the address of the lookUp service + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/odf_records_hdfs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + --shouldHashId${shouldHashId} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${workingDir}/graph_raw + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml index 2c24aeb1a..bfe2dff0b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow.xml @@ -100,10 +100,58 @@ --mdLayoutstore --mdInterpretationcleaned - + + + + yarn + cluster + GenerateEntities + eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --sourcePaths${contentPath}/odf_records_hdfs + --targetPath${workingDir}/entities + --isLookupUrl${isLookupUrl} + --shouldHashId${shouldHashId} + + + + + + + + yarn + cluster + GenerateGraph + eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${workingDir}/entities + --graphRawPath${workingDir}/graph_raw + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 61ecff4bd..3211a5f2b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -1,7 +1,11 @@ package eu.dnetlib.dhp.oa.graph.raw; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.lenient; import java.io.IOException; @@ -21,7 +25,15 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest; import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Author; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.Instance; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -407,9 +419,10 @@ public class MappersTest { assertNotNull(d.getTitle()); assertEquals(1, d.getTitle().size()); - assertEquals( - "Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", - d.getTitle().get(0).getValue()); + assertEquals("Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", d + .getTitle() + .get(0) + .getValue()); assertNotNull(d.getDescription()); assertEquals(1, d.getDescription().size()); @@ -435,7 +448,7 @@ public class MappersTest { assertNotNull(d.getInstance()); assertTrue(d.getInstance().size() == 1); - Instance i = d.getInstance().get(0); + final Instance i = d.getInstance().get(0); assertNotNull(i.getAccessright()); assertEquals(ModelConstants.DNET_ACCESS_MODES, i.getAccessright().getSchemeid()); @@ -607,8 +620,7 @@ public class MappersTest { assertEquals("OPEN", p.getInstance().get(0).getAccessright().getClassid()); assertValidId(p.getInstance().get(0).getCollectedfrom().getKey()); assertValidId(p.getInstance().get(0).getHostedby().getKey()); - assertEquals( - "http://creativecommons.org/licenses/by/3.0/de/legalcode", p.getInstance().get(0).getLicense().getValue()); + assertEquals("http://creativecommons.org/licenses/by/3.0/de/legalcode", p.getInstance().get(0).getLicense().getValue()); assertEquals(1, p.getInstance().size()); assertNotNull(p.getInstance().get(0).getAlternateIdentifier()); @@ -633,7 +645,55 @@ public class MappersTest { System.out.println(p.getTitle().get(0).getValue()); } + @Test + void testOdfFromHdfs() throws IOException { + final String xml = IOUtils.toString(getClass().getResourceAsStream("odf_from_hdfs.xml")); + + final List list = new OdfToOafMapper(vocs, false, true).processMdRecord(xml); + + assertEquals(1, list.size()); + + System.out.println(list.get(0).getClass()); + + assertTrue(list.get(0) instanceof Dataset); + + final Dataset p = (Dataset) list.get(0); + + assertValidId(p.getId()); + assertTrue(p.getOriginalId().size() == 1); + assertEquals("df76e73f-0483-49a4-a9bb-63f2f985574a", p.getOriginalId().get(0)); + assertValidId(p.getCollectedfrom().get(0).getKey()); + assertTrue(p.getAuthor().size() > 0); + + final Optional author = p + .getAuthor() + .stream() + .findFirst(); + assertTrue(author.isPresent()); + + assertEquals("Museum Sønderjylland", author.get().getFullname()); + + assertTrue(p.getSubject().size() > 0); + assertTrue(p.getInstance().size() > 0); + + assertNotNull(p.getTitle()); + assertFalse(p.getTitle().isEmpty()); + + assertNotNull(p.getInstance()); + assertTrue(p.getInstance().size() > 0); + p + .getInstance() + .stream() + .forEach(i -> { + assertNotNull(i.getAccessright()); + assertEquals("UNKNOWN", i.getAccessright().getClassid()); + }); + assertEquals("UNKNOWN", p.getInstance().get(0).getRefereed().getClassid()); + } + private void assertValidId(final String id) { + System.out.println(id); + assertEquals(49, id.length()); assertEquals('|', id.charAt(2)); assertEquals(':', id.charAt(15)); @@ -642,14 +702,12 @@ public class MappersTest { private List vocs() throws IOException { return IOUtils - .readLines( - GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); + .readLines(GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); } private List synonyms() throws IOException { return IOUtils - .readLines( - GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); + .readLines(GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml new file mode 100644 index 000000000..b27e5930b --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/odf_from_hdfs.xml @@ -0,0 +1,77 @@ + +
+ df76e73f-0483-49a4-a9bb-63f2f985574a + 2020-09-30T08:17:54Z + eudat-b2find + 2021-05-20T13:43:52.888Z + test________::92fe3efa47883b2f3401e6a4bd92e9d7 + 2020-05-21T05:26:15.93Z + 2020-08-01T11:06:26.977Z +
+ + + + + Museum Sønderjylland + + + + 200202-124 Hjelmvrå + + + This record describes + ancient sites and monuments as well archaeological excavations + undertaken by Danish museums. Excerpt of the Danish description of + events: 1995-04-26: Ved en besigtigelse ud for stedet fandt Nørgård + en større mængde skår i skovens udkant, liggende i nogle + drængrøfter1995-04-26: Leif Nørgård, der er leder af Sønderjyllands + Amatørarkæologer, havde ved en samtale med en tidligere ansat på + motorvejsprojektet gennem Sønderjylland fået at vide, at man på + dette sted havde fundet "urner".1995-04-26: Ved en besigtigelse ud + for stedet fandt Nørgård en større mængde skår i skovens udkant, + liggende i nogle drængrøfter1995-04-26: Leif Nørgård, der er leder + af Sønderjyllands Amatørarkæologer, havde ved en samtale med en + tidligere ansat på motorvejsprojektet gennem Sønderjylland fået at + vide, at man på dette sted havde fundet "urner". + + + + (9.376 LON, 55.220 LAT) + + + + Enkeltfund + Settlement + Single find + Archaeology + + + http://www.kulturarv.dk/fundogfortidsminder/Lokalitet/136540/ + + 2020 + Slots- og Kulturstyrelsen (www.slks.dk) + Danish + + Public + + Dataset + + 0021 + 2020-01-01 + UNKNOWN + Danish + + + +
\ No newline at end of file From 03a510859aa29bfd64ab694e5c1840020fae5ef5 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Mon, 31 May 2021 14:10:51 +0200 Subject: [PATCH 04/11] removed coalesce(1) --- .../raw/MigrateHdfsMdstoresApplication.java | 2 +- .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index e85d4c577..77fd28d77 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -100,7 +100,7 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .map((MapFunction) r -> enrichRecord(r), Encoders.STRING()) .toJavaRDD() .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) - .coalesce(1) + // .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); /* diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 3211a5f2b..533237e5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -419,10 +419,12 @@ public class MappersTest { assertNotNull(d.getTitle()); assertEquals(1, d.getTitle().size()); - assertEquals("Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", d - .getTitle() - .get(0) - .getValue()); + assertEquals( + "Validation of the Goodstrength System for Assessment of Abdominal Wall Strength in Patients With Incisional Hernia", + d + .getTitle() + .get(0) + .getValue()); assertNotNull(d.getDescription()); assertEquals(1, d.getDescription().size()); @@ -620,7 +622,8 @@ public class MappersTest { assertEquals("OPEN", p.getInstance().get(0).getAccessright().getClassid()); assertValidId(p.getInstance().get(0).getCollectedfrom().getKey()); assertValidId(p.getInstance().get(0).getHostedby().getKey()); - assertEquals("http://creativecommons.org/licenses/by/3.0/de/legalcode", p.getInstance().get(0).getLicense().getValue()); + assertEquals( + "http://creativecommons.org/licenses/by/3.0/de/legalcode", p.getInstance().get(0).getLicense().getValue()); assertEquals(1, p.getInstance().size()); assertNotNull(p.getInstance().get(0).getAlternateIdentifier()); @@ -702,12 +705,14 @@ public class MappersTest { private List vocs() throws IOException { return IOUtils - .readLines(GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); + .readLines( + GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/terms.txt")); } private List synonyms() throws IOException { return IOUtils - .readLines(GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); + .readLines( + GraphCleaningFunctionsTest.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/synonyms.txt")); } } From e95075026233ca563837bf0c85de6ead997cf0ac Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 1 Jun 2021 10:48:50 +0200 Subject: [PATCH 05/11] add nodes to import hdfs mdstores --- .../oa/graph/raw_all/oozie_app/workflow.xml | 84 ++++++++++++++++++- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 80f33bd53..0821f04ea 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -40,6 +40,16 @@ false should import content from the aggregator or reuse a previous version + + reuseODF_hdfs + false + should import content from the aggregator or reuse a previous version + + + reuseOAF_hdfs + false + should import content from the aggregator or reuse a previous version + contentPath path location to store (or reuse) content from the aggregator @@ -289,7 +299,7 @@ ${wf:conf('reuseOAF') eq false} - ${wf:conf('reuseOAF') eq true} + ${wf:conf('reuseOAF') eq true} @@ -324,10 +334,78 @@ --mdLayoutstore --mdInterpretationintersection - + + + + + ${wf:conf('reuseODF_hdfs') eq false} + ${wf:conf('reuseODF_hdfs') eq true} + + + + + + yarn + cluster + ImportODF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/odf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatODF + --mdLayoutstore + --mdInterpretationcleaned + + + + + + + + ${wf:conf('reuseOAF_hdfs') eq false} + ${wf:conf('reuseOAF_hdfs') eq true} + + + + + + + yarn + cluster + ImportOAF_hdfs + eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory ${sparkExecutorMemory} + --executor-cores ${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --hdfsPath${contentPath}/oaf_records_hdfs + --mdstoreManagerUrl${mdstoreManagerUrl} + --mdFormatOAF + --mdLayoutstore + --mdInterpretationcleaned + + + + + ${wf:conf('reuseDBOpenorgs') eq false} @@ -426,7 +504,7 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records + --sourcePaths${contentPath}/db_openaire,${contentPath}/db_openorgs,${contentPath}/oaf_records,${contentPath}/odf_records,${contentPath}/oaf_records_hdfs,${contentPath}/odf_records_hdfs --targetPath${workingDir}/entities --isLookupUrl${isLookupUrl} --shouldHashId${shouldHashId} From ede27498223963d3565498e2199bcb8419070fba Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Tue, 1 Jun 2021 12:42:43 +0200 Subject: [PATCH 06/11] orcid pid type --- .../dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index f441f2c2a..1beb616ba 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -76,7 +76,7 @@ public abstract class AbstractMdRecordToOafMapper { protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/"; protected static final Qualifier ORCID_PID_TYPE = qualifier( - ORCID_PENDING, ORCID_CLASSNAME, DNET_PID_TYPES, DNET_PID_TYPES); + "ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES); protected static final Qualifier MAG_PID_TYPE = qualifier( "MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); From 32b0c27217e29e0f9a2aa57d63d4dbec2376a0b6 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Wed, 9 Jun 2021 18:36:11 +0200 Subject: [PATCH 07/11] Aggiornare 'dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java' fix in SQL query: while writing the blacklist constraint it used d.id to indicate the datasource id, but no alias for the datasource was defined. So I removed the alias --- .../PrepareResultInstRepoAssociation.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index a41399627..3cf36e572 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -93,9 +93,9 @@ public class PrepareResultInstRepoAssociation { SparkSession spark, String datasourceOrganizationPath, List blacklist) { String blacklisted = ""; if (blacklist.size() > 0) { - blacklisted = " AND d.id != '" + blacklist.get(0) + "'"; + blacklisted = " AND id != '" + blacklist.get(0) + "'"; for (int i = 1; i < blacklist.size(); i++) { - blacklisted += " AND d.id != '" + blacklist.get(i) + "'"; + blacklisted += " AND id != '" + blacklist.get(i) + "'"; } } From dd997c49e06cc9586eb33a0ca4459d711cd0768e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 10 Jun 2021 14:47:18 +0200 Subject: [PATCH 08/11] fix wrong relation id fix date thai ticket #6791 --- .../DataciteToOAFTransformation.scala | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala index 979ab4371..5c49791d9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala @@ -17,6 +17,7 @@ import org.json4s.jackson.JsonMethods.parse import java.nio.charset.CodingErrorAction import java.text.SimpleDateFormat import java.time.LocalDate +import java.time.chrono.ThaiBuddhistDate import java.time.format.DateTimeFormatter import java.util.{Date, Locale} import java.util.regex.Pattern @@ -164,6 +165,16 @@ object DataciteToOAFTransformation { d } + def fix_thai_date(input:String, format:String) :String = { + try { + val a_date = LocalDate.parse(input,DateTimeFormatter.ofPattern(format)) + val d = ThaiBuddhistDate.of(a_date.getYear, a_date.getMonth.getValue, a_date.getDayOfMonth) + LocalDate.from(d).toString + } catch { + case _: Throwable => "" + } + } + def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = { if (resourceType != null && resourceType.nonEmpty) { val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType) @@ -377,17 +388,31 @@ object DataciteToOAFTransformation { .map(d => d.get) if (a_date.isDefined) { - result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null)) + if(doi.startsWith("10.14457")) + result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get,"[yyyy-MM-dd]"), null)) + else + result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null)) } if (i_date.isDefined && i_date.get.isDefined) { - result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) - result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) + if(doi.startsWith("10.14457")) { + result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null)) + } + else { + result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) + } } else if (publication_year != null) { - result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) - result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) - } + if(doi.startsWith("10.14457")) { + result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null)) + } else { + result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) + result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) + } + } result.setRelevantdate(dates.filter(d => d.date.isDefined && d.dateType.isDefined) .map(d => (extract_date(d.date.get), d.dateType.get)) @@ -468,11 +493,11 @@ object DataciteToOAFTransformation { JField("awardUri", JString(awardUri)) <- fundingReferences } yield awardUri - val relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null) - fix_figshare(result) result.setId(IdentifierFactory.createIdentifier(result)) if (result.getId == null) return List() + val relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null) + fix_figshare(result) if (relations != null && relations.nonEmpty) { List(result) ::: relations } From a900bfb87434f41eac1530e833565a56351afe70 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 11 Jun 2021 16:53:01 +0200 Subject: [PATCH 09/11] delegating the date parsing to https://github.com/sisyphsu/dateparser --- dhp-common/pom.xml | 4 +- .../oaf/utils/GraphCleaningFunctions.java | 82 +++++++++++-- .../schema/oaf/utils/OafMapperUtilsTest.java | 108 +++++++++++++++++- .../dhp/transformation/xslt/DateCleaner.java | 83 +------------- .../transformation/TransformationJobTest.java | 10 +- .../clean/GraphCleaningFunctionsTest.java | 2 + .../eu/dnetlib/dhp/oa/graph/clean/result.json | 2 +- pom.xml | 12 +- 8 files changed, 200 insertions(+), 103 deletions(-) diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index b1494f649..74f31cf35 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -22,8 +22,8 @@ hadoop-common - commons-validator - commons-validator + com.github.sisyphsu + dateparser org.apache.spark diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index da253c681..999272113 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -1,15 +1,23 @@ package eu.dnetlib.dhp.schema.oaf.utils; +import java.time.LocalDate; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.util.*; import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.validator.GenericValidator; +import org.jetbrains.annotations.NotNull; +import com.github.sisyphsu.dateparser.DateParserUtils; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -119,14 +127,42 @@ public class GraphCleaningFunctions extends CleaningFunctions { } else if (value instanceof Relation) { Relation r = (Relation) value; - if (!isValidDate(r.getValidationDate())) { + Optional validationDate = doCleanDate(r.getValidationDate()); + if (validationDate.isPresent()) { + r.setValidationDate(validationDate.get()); + r.setValidated(true); + } else { r.setValidationDate(null); r.setValidated(false); } - } else if (value instanceof Result) { Result r = (Result) value; + + if (Objects.nonNull(r.getDateofacceptance())) { + Optional date = cleanDateField(r.getDateofacceptance()); + if (date.isPresent()) { + r.getDateofacceptance().setValue(date.get()); + } else { + r.setDateofacceptance(null); + } + } + if (Objects.nonNull(r.getRelevantdate())) { + r + .setRelevantdate( + r + .getRelevantdate() + .stream() + .filter(Objects::nonNull) + .filter(sp -> Objects.nonNull(sp.getQualifier())) + .filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid())) + .map(sp -> { + sp.setValue(GraphCleaningFunctions.cleanDate(sp.getValue())); + return sp; + }) + .filter(sp -> StringUtils.isNotBlank(sp.getValue())) + .collect(Collectors.toList())); + } if (Objects.nonNull(r.getPublisher()) && StringUtils.isBlank(r.getPublisher().getValue())) { r.setPublisher(null); } @@ -222,6 +258,14 @@ public class GraphCleaningFunctions extends CleaningFunctions { if (Objects.isNull(i.getRefereed())) { i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); } + if (Objects.nonNull(i.getDateofacceptance())) { + Optional date = cleanDateField(i.getDateofacceptance()); + if (date.isPresent()) { + i.getDateofacceptance().setValue(date.get()); + } else { + i.setDateofacceptance(null); + } + } } } if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { @@ -300,10 +344,34 @@ public class GraphCleaningFunctions extends CleaningFunctions { return value; } - protected static boolean isValidDate(String date) { - return Stream - .of(ModelSupport.DATE_TIME_FORMATS) - .anyMatch(format -> GenericValidator.isDate(date, format, false)); + private static Optional cleanDateField(Field dateofacceptance) { + return Optional + .ofNullable(dateofacceptance) + .map(Field::getValue) + .map(GraphCleaningFunctions::cleanDate) + .filter(Objects::nonNull); + } + + protected static Optional doCleanDate(String date) { + return Optional.ofNullable(cleanDate(date)); + } + + public static String cleanDate(final String inputDate) { + + if (StringUtils.isBlank(inputDate)) { + return null; + } + + try { + final LocalDate date = DateParserUtils + .parseDate(inputDate.trim()) + .toInstant() + .atZone(ZoneId.systemDefault()) + .toLocalDate(); + return DateTimeFormatter.ofPattern(ModelSupport.DATE_FORMAT).format(date); + } catch (DateTimeParseException e) { + return null; + } } // HELPERS diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java index e8135f201..b2cc669fe 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java @@ -4,9 +4,12 @@ package eu.dnetlib.dhp.schema.oaf.utils; import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; -import java.time.format.DateTimeParseException; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.HashSet; import java.util.List; +import java.util.Locale; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -26,10 +29,105 @@ public class OafMapperUtilsTest { @Test public void testDateValidation() { - assertTrue(GraphCleaningFunctions.isValidDate("2016-05-07T12:41:19.202Z")); - assertTrue(GraphCleaningFunctions.isValidDate("2020-09-10 11:08:52")); - assertTrue(GraphCleaningFunctions.isValidDate("2016-04-05")); - assertFalse(GraphCleaningFunctions.isValidDate("2016 April 05")); + assertTrue(GraphCleaningFunctions.doCleanDate("2016-05-07T12:41:19.202Z ").isPresent()); + assertTrue(GraphCleaningFunctions.doCleanDate("2020-09-10 11:08:52 ").isPresent()); + assertTrue(GraphCleaningFunctions.doCleanDate(" 2016-04-05").isPresent()); + + assertEquals("2016-04-05", GraphCleaningFunctions.doCleanDate("2016 Apr 05").get()); + + assertEquals("2009-05-08", GraphCleaningFunctions.doCleanDate("May 8, 2009 5:57:51 PM").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct 7, 1970").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct 7, '70").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct. 7, 1970").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("oct. 7, 70").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 2 15:04:05 2006").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 2 15:04:05 MST 2006").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon Jan 02 15:04:05 -0700 2006").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Monday, 02-Jan-06 15:04:05 MST").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon, 02 Jan 2006 15:04:05 MST").get()); + assertEquals("2017-07-11", GraphCleaningFunctions.doCleanDate("Tue, 11 Jul 2017 16:28:13 +0200 (CEST)").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("Mon, 02 Jan 2006 15:04:05 -0700").get()); + assertEquals("2018-01-04", GraphCleaningFunctions.doCleanDate("Thu, 4 Jan 2018 17:53:36 +0000").get()); + assertEquals("2015-08-10", GraphCleaningFunctions.doCleanDate("Mon Aug 10 15:44:11 UTC+0100 2015").get()); + assertEquals( + "2015-07-03", + GraphCleaningFunctions.doCleanDate("Fri Jul 03 2015 18:04:07 GMT+0100 (GMT Daylight Time)").get()); + assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012 10:09am").get()); + assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012 at 10:09am PST-08").get()); + assertEquals("2012-09-17", GraphCleaningFunctions.doCleanDate("September 17, 2012, 10:10:09").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("October 7, 1970").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("October 7th, 1970").get()); + assertEquals("2006-02-12", GraphCleaningFunctions.doCleanDate("12 Feb 2006, 19:17").get()); + assertEquals("2006-02-12", GraphCleaningFunctions.doCleanDate("12 Feb 2006 19:17").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("7 oct 70").get()); + assertEquals("1970-10-07", GraphCleaningFunctions.doCleanDate("7 oct 1970").get()); + assertEquals("2013-02-03", GraphCleaningFunctions.doCleanDate("03 February 2013").get()); + assertEquals("2013-07-01", GraphCleaningFunctions.doCleanDate("1 July 2013").get()); + assertEquals("2013-02-03", GraphCleaningFunctions.doCleanDate("2013-Feb-03").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("3/31/2014").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("03/31/2014").get()); + assertEquals("1971-08-21", GraphCleaningFunctions.doCleanDate("08/21/71").get()); + assertEquals("1971-01-08", GraphCleaningFunctions.doCleanDate("8/1/71").get()); + assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("4/8/2014 22:05").get()); + assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("04/08/2014 22:05").get()); + assertEquals("2014-08-04", GraphCleaningFunctions.doCleanDate("4/8/14 22:05").get()); + assertEquals("2014-02-04", GraphCleaningFunctions.doCleanDate("04/2/2014 03:00:51").get()); + assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 12:00:00 AM").get()); + assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 01:00:01 PM").get()); + assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 01:00 PM").get()); + assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 1:00 PM").get()); + assertEquals("1965-08-08", GraphCleaningFunctions.doCleanDate("8/8/1965 12:00 AM").get()); + assertEquals("2014-02-04", GraphCleaningFunctions.doCleanDate("4/02/2014 03:00:51").get()); + assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("03/19/2012 10:11:59").get()); + assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("03/19/2012 10:11:59.3186369").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("2014/3/31").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("2014/03/31").get()); + assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014/4/8 22:05").get()); + assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014/04/08 22:05").get()); + assertEquals("2014-04-02", GraphCleaningFunctions.doCleanDate("2014/04/2 03:00:51").get()); + assertEquals("2014-04-02", GraphCleaningFunctions.doCleanDate("2014/4/02 03:00:51").get()); + assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("2012/03/19 10:11:59").get()); + assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("2012/03/19 10:11:59.3186369").get()); + assertEquals("2014-04-08", GraphCleaningFunctions.doCleanDate("2014年04月08日").get()); + assertEquals("2006-01-02", GraphCleaningFunctions.doCleanDate("2006-01-02T15:04:05+0000").get()); + assertEquals("2009-08-13", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09-07:00").get()); + assertEquals("2009-08-12", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09").get()); + assertEquals("2009-08-12", GraphCleaningFunctions.doCleanDate("2009-08-12T22:15:09Z").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 17:24:37.3186369").get()); + assertEquals("2012-08-03", GraphCleaningFunctions.doCleanDate("2012-08-03 18:31:59.257000000").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 17:24:37.123").get()); + assertEquals("2013-04-01", GraphCleaningFunctions.doCleanDate("2013-04-01 22:43").get()); + assertEquals("2013-04-01", GraphCleaningFunctions.doCleanDate("2013-04-01 22:43:22").get()); + assertEquals("2014-12-16", GraphCleaningFunctions.doCleanDate("2014-12-16 06:20:00 UTC").get()); + assertEquals("2014-12-16", GraphCleaningFunctions.doCleanDate("2014-12-16 06:20:00 GMT").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 05:24:37 PM").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:43 +0800").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:43 +0800 +08").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26 13:13:44 +09:00").get()); + assertEquals("2012-08-03", GraphCleaningFunctions.doCleanDate("2012-08-03 18:31:59.257000000 +0000 UTC").get()); + assertEquals("2015-09-30", GraphCleaningFunctions.doCleanDate("2015-09-30 18:48:56.35272715 +0000 UTC").get()); + assertEquals("2015-02-18", GraphCleaningFunctions.doCleanDate("2015-02-18 00:12:00 +0000 GMT").get()); + assertEquals("2015-02-18", GraphCleaningFunctions.doCleanDate("2015-02-18 00:12:00 +0000 UTC").get()); + assertEquals( + "2015-02-08", GraphCleaningFunctions.doCleanDate("2015-02-08 03:02:00 +0300 MSK m=+0.000000001").get()); + assertEquals( + "2015-02-08", GraphCleaningFunctions.doCleanDate("2015-02-08 03:02:00.001 +0300 MSK m=+0.000000001").get()); + assertEquals("2017-07-19", GraphCleaningFunctions.doCleanDate("2017-07-19 03:21:51+00:00").get()); + assertEquals("2014-04-26", GraphCleaningFunctions.doCleanDate("2014-04-26").get()); + assertEquals("2014-04-01", GraphCleaningFunctions.doCleanDate("2014-04").get()); + assertEquals("2014-01-01", GraphCleaningFunctions.doCleanDate("2014").get()); + assertEquals("2014-05-11", GraphCleaningFunctions.doCleanDate("2014-05-11 08:20:13,787").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("3.31.2014").get()); + assertEquals("2014-03-31", GraphCleaningFunctions.doCleanDate("03.31.2014").get()); + assertEquals("1971-08-21", GraphCleaningFunctions.doCleanDate("08.21.71").get()); + assertEquals("2014-03-01", GraphCleaningFunctions.doCleanDate("2014.03").get()); + assertEquals("2014-03-30", GraphCleaningFunctions.doCleanDate("2014.03.30").get()); + assertEquals("2014-06-01", GraphCleaningFunctions.doCleanDate("20140601").get()); + assertEquals("2014-07-22", GraphCleaningFunctions.doCleanDate("20140722105203").get()); + assertEquals("2012-03-19", GraphCleaningFunctions.doCleanDate("1332151919").get()); + assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367189").get()); + assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367111222").get()); + assertEquals("2013-11-12", GraphCleaningFunctions.doCleanDate("1384216367111222333").get()); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/DateCleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/DateCleaner.java index 6e337604f..9da0747e6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/DateCleaner.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/DateCleaner.java @@ -10,87 +10,11 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; +import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions; import net.sf.saxon.s9api.*; public class DateCleaner implements ExtensionFunction, Serializable { - private final static List dateRegex = Arrays - .asList( - // Y-M-D - Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE), - // M-D-Y - Pattern - .compile( - "((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", - Pattern.MULTILINE), - // D-M-Y - Pattern - .compile( - "(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", - Pattern.MULTILINE), - // Y - Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)); - - private final static Pattern incompleteDateRegex = Pattern - .compile("^((18|19|20)\\d\\d){1}([- \\\\ \\/](0?[1-9]|1[012]))?", Pattern.MULTILINE); - - private final static List dformats = Arrays - .asList( - DateTimeFormatter - .ofPattern( - "[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", - Locale.ENGLISH), - DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)); - - public String clean(final String inputDate) { - - Optional cleanedDate = dateRegex - .stream() - .map( - p -> { - final Matcher matcher = p.matcher(inputDate); - if (matcher.find()) - return matcher.group(0); - else - return null; - }) - .filter(Objects::nonNull) - .map(m -> { - Optional cleanDate = dformats - .stream() - .map(f -> { - try { - LocalDate parsedDate = LocalDate.parse(m, f); - if (parsedDate != null) - return parsedDate.toString(); - else - return null; - } catch (Throwable e) { - return null; - } - } - - ) - .filter(Objects::nonNull) - .findAny(); - - return cleanDate.orElse(null); - }) - .filter(Objects::nonNull) - .findAny(); - - if (cleanedDate.isPresent()) - return cleanedDate.get(); - - final Matcher matcher = incompleteDateRegex.matcher(inputDate); - if (matcher.find()) { - final Integer year = Integer.parseInt(matcher.group(1)); - final Integer month = Integer.parseInt(matcher.group(4) == null ? "01" : matcher.group(4)); - return String.format("%d-%02d-01", year, month); - } - return null; - } - @Override public QName getName() { return new QName(QNAME_BASE_URI + "/dateISO", "dateISO"); @@ -117,4 +41,9 @@ public class DateCleaner implements ExtensionFunction, Serializable { final String currentValue = xdmValues[0].itemAt(0).getStringValue(); return new XdmAtomicValue(clean(currentValue)); } + + // for backward compatibility with the existing unit tests + public String clean(String date) { + return GraphCleaningFunctions.cleanDate(date); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 0fdc89533..948a8f93b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -51,11 +51,11 @@ public class TransformationJobTest extends AbstractVocabularyTest { @DisplayName("Test Date cleaner") public void testDateCleaner() throws Exception { DateCleaner dc = new DateCleaner(); - assertEquals(dc.clean("20/09/1982"), "1982-09-20"); - assertEquals(dc.clean("20-09-2002"), "2002-09-20"); - assertEquals(dc.clean("2002-09-20"), "2002-09-20"); - assertEquals(dc.clean("2002-9"), "2002-09-01"); - assertEquals(dc.clean("2021"), "2021-01-01"); + assertEquals("1982-09-20", dc.clean("20/09/1982")); + assertEquals("2002-09-20", dc.clean("20-09-2002")); + assertEquals("2002-09-20", dc.clean("2002-09-20")); + assertEquals("2002-09-01", dc.clean("2002-9")); + assertEquals("2021-01-01", dc.clean("2021")); } @Test diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java index c23354e25..b196d1948 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java @@ -129,6 +129,8 @@ public class GraphCleaningFunctionsTest { assertEquals("CLOSED", p_cleaned.getBestaccessright().getClassid()); assertNull(p_out.getPublisher()); + assertEquals("1970-10-07", p_cleaned.getDateofacceptance().getValue()); + final List pci = p_cleaned.getInstance(); assertNotNull(pci); assertEquals(1, pci.size()); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json index 8670c10f1..6795ccf1b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/clean/result.json @@ -301,7 +301,7 @@ }, "trust": "0.9" }, - "value": "2016-01-01" + "value": "7 oct 1970" }, "dateofcollection": "", "dateoftransformation": "2020-04-22T12:34:08.009Z", diff --git a/pom.xml b/pom.xml index 5b96816d9..4272acae0 100644 --- a/pom.xml +++ b/pom.xml @@ -200,11 +200,11 @@ ${dhp.commons.lang.version} - - commons-validator - commons-validator - 1.7 - + + com.github.sisyphsu + dateparser + 1.0.7 + com.google.guava @@ -736,7 +736,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.5.11] + [2.5.12-SNAPSHOT] [4.0.3] [6.0.5] [3.1.6] From 10bd6ca194e7bf22677d23ee6a07acb50f974b90 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 11 Jun 2021 16:59:56 +0200 Subject: [PATCH 10/11] depending on dhp-schemas:2.5.12 (release) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4272acae0..ac92c6c97 100644 --- a/pom.xml +++ b/pom.xml @@ -736,7 +736,7 @@ 3.3.3 3.4.2 [2.12,3.0) - [2.5.12-SNAPSHOT] + [2.5.12] [4.0.3] [6.0.5] [3.1.6] From 2039bb9f5fdbc949d00ffe7fc6d78d8890f0bc15 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Mon, 14 Jun 2021 09:40:50 +0200 Subject: [PATCH 11/11] orcid / orcid_pending cleaning backported from master branch --- .../oaf/utils/GraphCleaningFunctions.java | 81 ++++++--- .../schema/oaf/utils/OafMapperUtilsTest.java | 5 + .../raw/AbstractMdRecordToOafMapper.java | 4 +- .../raw/MigrateHdfsMdstoresApplication.java | 5 - .../oozie_app/workflow-bak.xml | 157 ------------------ .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 4 +- 6 files changed, 67 insertions(+), 189 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 999272113..7088e56e1 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -26,8 +26,9 @@ import eu.dnetlib.dhp.schema.oaf.*; public class GraphCleaningFunctions extends CleaningFunctions { + public static final String ORCID_CLEANING_REGEX = ".*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9x]{4})"; + public static final int ORCID_LEN = 19; public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)"; - public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/"; public static final String INVALID_AUTHOR_REGEX = ".*deactivated.*"; public static final String TITLE_FILTER_REGEX = "[.*test.*\\W\\d]"; public static final int TITLE_FILTER_RESIDUAL_LENGTH = 10; @@ -281,7 +282,27 @@ public class GraphCleaningFunctions extends CleaningFunctions { } } if (Objects.nonNull(r.getAuthor())) { - final List authors = Lists.newArrayList(); + r + .setAuthor( + r + .getAuthor() + .stream() + .filter(a -> Objects.nonNull(a)) + .filter(a -> StringUtils.isNotBlank(a.getFullname())) + .filter(a -> StringUtils.isNotBlank(a.getFullname().replaceAll("[\\W]", ""))) + .collect(Collectors.toList())); + + boolean nullRank = r + .getAuthor() + .stream() + .anyMatch(a -> Objects.isNull(a.getRank())); + if (nullRank) { + int i = 1; + for (Author author : r.getAuthor()) { + author.setRank(i++); + } + } + for (Author a : r.getAuthor()) { if (Objects.isNull(a.getPid())) { a.setPid(Lists.newArrayList()); @@ -295,40 +316,52 @@ public class GraphCleaningFunctions extends CleaningFunctions { .filter(p -> Objects.nonNull(p.getQualifier())) .filter(p -> StringUtils.isNotBlank(p.getValue())) .map(p -> { - p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, "")); + // hack to distinguish orcid from orcid_pending + String pidProvenance = Optional + .ofNullable(p.getDataInfo()) + .map( + d -> Optional + .ofNullable(d.getProvenanceaction()) + .map(Qualifier::getClassid) + .orElse("")) + .orElse(""); + if (p + .getQualifier() + .getClassid() + .toLowerCase() + .contains(ModelConstants.ORCID)) { + if (pidProvenance + .equals(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY)) { + p.getQualifier().setClassid(ModelConstants.ORCID); + } else { + p.getQualifier().setClassid(ModelConstants.ORCID_PENDING); + } + final String orcid = p + .getValue() + .trim() + .toLowerCase() + .replaceAll(ORCID_CLEANING_REGEX, "$1-$2-$3-$4"); + if (orcid.length() == ORCID_LEN) { + p.setValue(orcid); + } else { + p.setValue(""); + } + } return p; }) .filter(p -> StringUtils.isNotBlank(p.getValue())) .collect( Collectors .toMap( - StructuredProperty::getValue, Function.identity(), (p1, p2) -> p1, + p -> p.getQualifier().getClassid() + p.getValue(), + Function.identity(), + (p1, p2) -> p1, LinkedHashMap::new)) .values() .stream() .collect(Collectors.toList())); } - if (StringUtils.isBlank(a.getFullname())) { - if (StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname())) { - a.setFullname(a.getSurname() + ", " + a.getName()); - } - } - if (StringUtils.isNotBlank(a.getFullname()) && isValidAuthorName(a)) { - authors.add(a); - } } - - boolean nullRank = authors - .stream() - .anyMatch(a -> Objects.isNull(a.getRank())); - if (nullRank) { - int i = 1; - for (Author author : authors) { - author.setRank(i++); - } - } - r.setAuthor(authors); - } if (value instanceof Publication) { diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java index b2cc669fe..eefa1e9a3 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java @@ -131,6 +131,11 @@ public class OafMapperUtilsTest { } + @Test + public void testDate() { + System.out.println(GraphCleaningFunctions.cleanDate("23-FEB-1998")); + } + @Test public void testMergePubs() throws IOException { Publication p1 = read("publication_1.json", Publication.class); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index 1beb616ba..1e80dfd46 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -76,7 +76,9 @@ public abstract class AbstractMdRecordToOafMapper { protected static final String DATACITE_SCHEMA_KERNEL_3 = "http://datacite.org/schema/kernel-3"; protected static final String DATACITE_SCHEMA_KERNEL_3_SLASH = "http://datacite.org/schema/kernel-3/"; protected static final Qualifier ORCID_PID_TYPE = qualifier( - "ORCID", "Open Researcher and Contributor ID", DNET_PID_TYPES, DNET_PID_TYPES); + ModelConstants.ORCID_PENDING, + ModelConstants.ORCID_CLASSNAME, + DNET_PID_TYPES, DNET_PID_TYPES); protected static final Qualifier MAG_PID_TYPE = qualifier( "MAGIdentifier", "Microsoft Academic Graph Identifier", DNET_PID_TYPES, DNET_PID_TYPES); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java index 77fd28d77..f4e783edc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.java @@ -102,11 +102,6 @@ public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication .mapToPair(xml -> new Tuple2<>(new Text(UUID.randomUUID() + ":" + type), new Text(xml))) // .coalesce(1) .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - - /* - * .foreach(xml -> { try { writer.append(new Text(UUID.randomUUID() + ":" + type), new Text(xml)); } catch - * (final Exception e) { throw new RuntimeException(e); } }); - */ } private static String enrichRecord(final Row r) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml deleted file mode 100644 index bfe2dff0b..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_hdfs_stores/oozie_app/workflow-bak.xml +++ /dev/null @@ -1,157 +0,0 @@ - - - - - graphOutputPath - the target path to store raw graph - - - contentPath - path location to store (or reuse) content from the aggregator - - - mdstoreManagerUrl - the address of the Mdstore Manager - - - isLookupUrl - the address of the lookUp service - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - ${jobTracker} - ${nameNode} - - - mapreduce.job.queuename - ${queueName} - - - oozie.launcher.mapred.job.queue.name - ${oozieLauncherQueueName} - - - oozie.action.sharelib.for.spark - ${oozieActionShareLibForSpark2} - - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn - cluster - ImportODF_hdfs - eu.dnetlib.dhp.oa.graph.raw.MigrateHdfsMdstoresApplication - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --hdfsPath${contentPath}/odf_records_hdfs - --mdstoreManagerUrl${mdstoreManagerUrl} - --mdFormatODF - --mdLayoutstore - --mdInterpretationcleaned - - - - - - - - yarn - cluster - GenerateEntities - eu.dnetlib.dhp.oa.graph.raw.GenerateEntitiesApplication - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - - --sourcePaths${contentPath}/odf_records_hdfs - --targetPath${workingDir}/entities - --isLookupUrl${isLookupUrl} - --shouldHashId${shouldHashId} - - - - - - - - yarn - cluster - GenerateGraph - eu.dnetlib.dhp.oa.graph.raw.DispatchEntitiesApplication - dhp-graph-mapper-${projectVersion}.jar - - --executor-memory ${sparkExecutorMemory} - --executor-cores ${sparkExecutorCores} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --sourcePath${workingDir}/entities - --graphRawPath${workingDir}/graph_raw - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 08a596db5..5b229a625 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -195,8 +195,8 @@ public class MappersTest { .findFirst() .get(); assertEquals("0000-0001-6651-1178", pid.getValue()); - assertEquals("ORCID", pid.getQualifier().getClassid()); - assertEquals("Open Researcher and Contributor ID", pid.getQualifier().getClassname()); + assertEquals(ModelConstants.ORCID_PENDING, pid.getQualifier().getClassid()); + assertEquals(ModelConstants.ORCID_CLASSNAME, pid.getQualifier().getClassname()); assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemeid()); assertEquals(ModelConstants.DNET_PID_TYPES, pid.getQualifier().getSchemename()); assertEquals("Votsi,Nefta", author.get().getFullname());