diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java index 9f8727d308..1d47808ef1 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/orcidnodoi/SparkGenEnrichedOrcidWorks.java @@ -4,6 +4,7 @@ package eu.dnetlib.doiboost.orcidnodoi; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -32,10 +33,7 @@ import com.google.gson.JsonParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.orcid.AuthorData; -import eu.dnetlib.dhp.schema.orcid.AuthorSummary; -import eu.dnetlib.dhp.schema.orcid.Work; -import eu.dnetlib.dhp.schema.orcid.WorkDetail; +import eu.dnetlib.dhp.schema.orcid.*; import eu.dnetlib.doiboost.orcid.json.JsonHelper; import eu.dnetlib.doiboost.orcid.util.HDFSUtil; import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf; @@ -111,6 +109,10 @@ public class SparkGenEnrichedOrcidWorks { Encoders.bean(WorkDetail.class)); logger.info("Works data loaded: " + workDataset.count()); + final LongAccumulator warnNotFoundContributors = spark + .sparkContext() + .longAccumulator("warnNotFoundContributors"); + JavaRDD> enrichedWorksRDD = workDataset .joinWith( authorDataset, @@ -119,7 +121,21 @@ public class SparkGenEnrichedOrcidWorks { (MapFunction, Tuple2>) value -> { WorkDetail w = value._1; AuthorData a = value._2; - AuthorMatcher.match(a, w.getContributors()); + if (w.getContributors() == null + || (w.getContributors() != null && w.getContributors().size() == 0)) { + Contributor c = new Contributor(); + c.setName(a.getName()); + c.setSurname(a.getSurname()); + c.setCreditName(a.getCreditName()); + c.setOid(a.getOid()); + List contributors = Arrays.asList(c); + w.setContributors(contributors); + if (warnNotFoundContributors != null) { + warnNotFoundContributors.add(1); + } + } else { + AuthorMatcher.match(a, w.getContributors()); + } return new Tuple2<>(a.getOid(), JsonHelper.createOidWork(w)); }, Encoders.tuple(Encoders.STRING(), Encoders.STRING())) @@ -172,7 +188,7 @@ public class SparkGenEnrichedOrcidWorks { OBJECT_MAPPER.writeValueAsString(new AtomicAction<>(Publication.class, p)))) .mapToPair(t -> new Tuple2(new Text(t._1()), new Text(t._2()))) .saveAsNewAPIHadoopFile( - workingPath.concat(outputEnrichedWorksPath), + outputEnrichedWorksPath, Text.class, Text.class, SequenceFileOutputFormat.class, @@ -180,6 +196,7 @@ public class SparkGenEnrichedOrcidWorks { logger.info("parsedPublications: " + parsedPublications.value().toString()); logger.info("enrichedPublications: " + enrichedPublications.value().toString()); + logger.info("warnNotFoundContributors: " + warnNotFoundContributors.value().toString()); logger.info("errorsGeneric: " + errorsGeneric.value().toString()); logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString()); logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString()); diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml index 365c4d5b4f..04ca05af2b 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/orcidnodoi/oozie_app/workflow.xml @@ -7,9 +7,14 @@ outputPath + /data/orcid_activities_2020/no_doi_dataset_prod/ path where to store the action set - + + processOutputPath + /data/orcid_activities_2020/process_no_doi_dataset_prod + temporary path where to store the action set + spark2GenNoDoiDatasetMaxExecutors 40 @@ -66,7 +71,7 @@ - + @@ -92,7 +97,7 @@ --workingPath${workingPath}/ --hdfsServerUri${nameNode} --orcidDataFolderlast_orcid_dataset - --outputEnrichedWorksPathno_doi_dataset + --outputEnrichedWorksPath${processOutputPath} @@ -100,7 +105,7 @@ - ${workingPath}/no_doi_dataset/* + ${processOutputPath}/* ${outputPath} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index 6a7ee78031..4dafd4fa36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -1,11 +1,10 @@ package eu.dnetlib.dhp.sx.graph.scholix -import eu.dnetlib.dhp.schema.oaf.{Dataset, Relation, Result, StructuredProperty} -import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource} +import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.sx.scholix._ import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology} import eu.dnetlib.dhp.utils.DHPUtils -import org.apache.spark.sql.Encoders.bean import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder, Encoders} import org.json4s @@ -301,14 +300,14 @@ object ScholixUtils { if (r.getPid == null || r.getPid.isEmpty) return null - val pids:List[ScholixIdentifier] = extractTypedIdentifierFromInstance(r) - if (pids.isEmpty) + val persistentIdentifiers:List[ScholixIdentifier] = extractTypedIdentifierFromInstance(r) + if (persistentIdentifiers.isEmpty) return null - s.setLocalIdentifier(pids.asJava) - if (r.isInstanceOf[Dataset]) - s.setTypology(Typology.dataset) - else + s.setLocalIdentifier(persistentIdentifiers.asJava) + if (r.isInstanceOf[Publication] ) s.setTypology(Typology.publication) + else + s.setTypology(Typology.dataset) s.setSubType(r.getInstance().get(0).getInstancetype.getClassname)