From d9532446eb7583be4b01eaa0ec2f12f822ec59d9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 10 Dec 2020 16:14:16 +0100 Subject: [PATCH] imported more diffs from master branch; code formatting --- .../dhp/schema/common/ModelConstants.java | 4 ++ .../dhp/actionmanager/bipfinder/BipScore.java | 4 +- .../bipfinder/SparkAtomicActionScoreJob.java | 1 - .../dhp/oa/dedup/GroupEntitiesSparkJob.java | 4 +- .../dnetlib/doiboost/orcid/ORCIDToOAF.scala | 37 +++++++------ .../orcid/SparkConvertORCIDToOAF.scala | 32 +++++------ .../dhp/oa/provision/XmlIndexingJob.java | 55 +++++++++++++------ 7 files changed, 83 insertions(+), 54 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index 26149c62c..e72a0d69c 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -7,6 +7,10 @@ import eu.dnetlib.dhp.schema.oaf.Qualifier; public class ModelConstants { + public static final String ORCID = "orcid"; + public static final String ORCID_PENDING = "orcid_pending"; + public static final String ORCID_CLASSNAME = "Open Researcher and Contributor ID"; + public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"; public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java index da9a4bd0c..247546694 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipScore.java @@ -9,8 +9,8 @@ import java.util.List; */ public class BipScore implements Serializable { - private String id; //doi - private List scoreList; //unit as given in the inputfile + private String id; // doi + private List scoreList; // unit as given in the inputfile public String getId() { return id; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 23764f38e..50bda898c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -144,7 +144,6 @@ public class SparkAtomicActionScoreJob implements Serializable { } - private static List getMeasure(BipScore value) { return value .getScoreList() diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java index 3f01db0fa..a3726d60a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -10,7 +10,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -22,6 +21,7 @@ import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.jayway.jsonpath.Configuration; import com.jayway.jsonpath.DocumentContext; @@ -44,7 +44,7 @@ public class GroupEntitiesSparkJob { private final static String ID_JPATH = "$.id"; private static ObjectMapper OBJECT_MAPPER = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); public static void main(String[] args) throws Exception { diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala index 1e01ed16d..1d669f4b5 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/ORCIDToOAF.scala @@ -1,7 +1,7 @@ package eu.dnetlib.doiboost.orcid -import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory -import eu.dnetlib.dhp.schema.oaf.{Author, Publication} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} +import eu.dnetlib.dhp.schema.orcid.OrcidDOI import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier} import org.apache.commons.lang.StringUtils @@ -44,23 +44,19 @@ object ORCIDToOAF { } - def convertTOOAF(input:ORCIDElement) :Publication = { - val doi = input.doi + def convertTOOAF(input:OrcidDOI) :Publication = { + val doi = input.getDoi val pub:Publication = new Publication - pub.setPid(List(createSP(doi, "doi", PID_TYPES)).asJava) + pub.setPid(List(createSP(doi.toLowerCase, "doi", PID_TYPES)).asJava) pub.setDataInfo(generateDataInfo()) - - //IMPORTANT - //The old method pub.setId(IdentifierFactory.createIdentifier(pub)) - //will be replaced using IdentifierFactory pub.setId(generateIdentifier(pub, doi.toLowerCase)) - pub.setId(IdentifierFactory.createIdentifier(pub)) - - try{ - pub.setAuthor(input.authors.map(a=> { - generateAuthor(a.name, a.surname, a.creditName, a.oid) - }).asJava) + + val l:List[Author]= input.getAuthors.asScala.map(a=> { + generateAuthor(a.getName, a.getSurname, a.getCreditName, a.getOid) + })(collection.breakOut) + + pub.setAuthor(l.asJava) pub.setCollectedfrom(List(DoiBoostMappingUtil.createORIDCollectedFrom()).asJava) pub.setDataInfo(DoiBoostMappingUtil.generateDataInfo()) pub @@ -71,6 +67,13 @@ object ORCIDToOAF { } } + def generateOricPIDDatainfo():DataInfo = { + val di =DoiBoostMappingUtil.generateDataInfo("0.91") + di.getProvenanceaction.setClassid("sysimport:crosswalk:entityregistry") + di.getProvenanceaction.setClassname("Harvested") + di + } + def generateAuthor(given: String, family: String, fullName:String, orcid: String): Author = { val a = new Author a.setName(given) @@ -80,10 +83,10 @@ object ORCIDToOAF { else a.setFullname(s"$given $family") if (StringUtils.isNotBlank(orcid)) - a.setPid(List(createSP(orcid, ORCID, PID_TYPES)).asJava) + a.setPid(List(createSP(orcid, ORCID, PID_TYPES, generateOricPIDDatainfo())).asJava) a } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala index f1c7c58b4..f1d718d0d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcid/SparkConvertORCIDToOAF.scala @@ -45,24 +45,24 @@ object SparkConvertORCIDToOAF { Encoders.kryo(classOf[Publication]) } -def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] - implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI] - implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) + def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapOrcid: Encoder[OrcidDOI] = Encoders.kryo[OrcidDOI] + implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPubs) - val mapper = new ObjectMapper() - mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + val mapper = new ObjectMapper() + mapper.getDeserializationConfig.withFeatures(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) - val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI]))) + val dataset:Dataset[OrcidDOI] = spark.createDataset(spark.sparkContext.textFile(sourcePath).map(s => mapper.readValue(s,classOf[OrcidDOI]))) - logger.info("Converting ORCID to OAF") - dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null) - .map(d => (d.getId, d)) - .groupByKey(_._1)(Encoders.STRING) - .agg(getPublicationAggregator().toColumn) - .map(p => p._2) - .write.mode(SaveMode.Overwrite).save(targetPath) -} + logger.info("Converting ORCID to OAF") + dataset.map(o => ORCIDToOAF.convertTOOAF(o)).filter(p=>p!=null) + .map(d => (d.getId, d)) + .groupByKey(_._1)(Encoders.STRING) + .agg(getPublicationAggregator().toColumn) + .map(p => p._2) + .write.mode(SaveMode.Overwrite).save(targetPath) + } def main(args: Array[String]): Unit = { @@ -85,4 +85,4 @@ def run(spark:SparkSession,sourcePath:String, targetPath:String):Unit = { } -} +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java index 48538c059..9ff387c8c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlIndexingJob.java @@ -10,6 +10,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Optional; +import javax.swing.text.html.Option; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerException; import javax.xml.transform.stream.StreamResult; @@ -42,6 +43,10 @@ public class XmlIndexingJob { private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class); + public enum OutputFormat { + SOLR, HDFS + } + private static final Integer DEFAULT_BATCH_SIZE = 1000; protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'"; @@ -52,6 +57,8 @@ public class XmlIndexingJob { private int batchSize; + private OutputFormat outputFormat; + private String outputPath; private SparkSession spark; @@ -80,14 +87,22 @@ public class XmlIndexingJob { final String outputPath = Optional .ofNullable(parser.get("outputPath")) + .map(StringUtils::trim) .orElse(null); log.info("outputPath: {}", outputPath); - final Integer batchSize = parser.getObjectMap().containsKey("batchSize") - ? Integer.valueOf(parser.get("batchSize")) - : DEFAULT_BATCH_SIZE; + final Integer batchSize = Optional + .ofNullable(parser.get("batchSize")) + .map(Integer::valueOf) + .orElse(DEFAULT_BATCH_SIZE); log.info("batchSize: {}", batchSize); + final OutputFormat outputFormat = Optional + .ofNullable(parser.get("outputFormat")) + .map(OutputFormat::valueOf) + .orElse(OutputFormat.SOLR); + log.info("outputFormat: {}", outputFormat); + final SparkConf conf = new SparkConf(); conf.registerKryoClasses(new Class[] { SerializableSolrInputDocument.class @@ -100,15 +115,18 @@ public class XmlIndexingJob { final String isLookupUrl = parser.get("isLookupUrl"); log.info("isLookupUrl: {}", isLookupUrl); final ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService(isLookupUrl)); - new XmlIndexingJob(spark, inputPath, format, batchSize, outputPath).run(isLookup); + new XmlIndexingJob(spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup); }); } - public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, String outputPath) { + public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, + OutputFormat outputFormat, + String outputPath) { this.spark = spark; this.inputPath = inputPath; this.format = format; this.batchSize = batchSize; + this.outputFormat = outputFormat; this.outputPath = outputPath; } @@ -137,17 +155,22 @@ public class XmlIndexingJob { .map(s -> toIndexRecord(SaxonTransformerFactory.newInstance(indexRecordXslt), s)) .map(s -> new StreamingInputDocumentFactory(version, dsId).parseDocument(s)); - if (StringUtils.isNotBlank(outputPath)) { - spark - .createDataset( - docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), - Encoders.kryo(SerializableSolrInputDocument.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } else { - final String collection = ProvisionConstants.getCollectionName(format); - SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + switch (outputFormat) { + case SOLR: + final String collection = ProvisionConstants.getCollectionName(format); + SolrSupport.indexDocs(zkHost, collection, batchSize, docs.rdd()); + break; + case HDFS: + spark + .createDataset( + docs.map(s -> new SerializableSolrInputDocument(s)).rdd(), + Encoders.kryo(SerializableSolrInputDocument.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + break; + default: + throw new IllegalArgumentException("invalid outputFormat: " + outputFormat); } }