diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/person/CoAuthorshipIterator.java similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/person/CoAuthorshipIterator.java index 76e4c4851..853f223d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/CoAuthorshipIterator.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/person/CoAuthorshipIterator.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.actionmanager.personentity; +package eu.dnetlib.dhp.common.person; import java.util.Arrays; import java.util.Iterator; @@ -61,7 +61,7 @@ public class CoAuthorshipIterator implements Iterator { private Relation getRelation(String orcid1, String orcid2) { String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); - return OafMapperUtils + Relation relation = OafMapperUtils .getRelation( source, target, ModelConstants.PERSON_PERSON_RELTYPE, ModelConstants.PERSON_PERSON_SUBRELTYPE, @@ -76,5 +76,7 @@ public class CoAuthorshipIterator implements Iterator { ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), "0.91"), null); + relation.setValidated(true); + return relation; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/person/Coauthors.java similarity index 70% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/person/Coauthors.java index 17f46d5c7..ff9324d2e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/Coauthors.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/person/Coauthors.java @@ -1,12 +1,9 @@ -package eu.dnetlib.dhp.actionmanager.personentity; +package eu.dnetlib.dhp.common.person; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; -import eu.dnetlib.dhp.schema.oaf.Relation; - public class Coauthors implements Serializable { private List coauthors; diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index dfa9c5ad0..fdfd63a15 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -363,6 +363,8 @@ public class GraphCleaningFunctions extends CleaningFunctions { // nothing to clean here } else if (value instanceof Project) { // nothing to clean here + } else if (value instanceof Person) { + // nothing to clean here } else if (value instanceof Organization) { Organization o = (Organization) value; if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java index 74cd1b42a..761b9170f 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/ModelHardLimits.java @@ -1,6 +1,12 @@ package eu.dnetlib.dhp.schema.oaf.utils; +import java.util.Map; + +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.common.ModelConstants; + public class ModelHardLimits { private ModelHardLimits() { @@ -20,6 +26,12 @@ public class ModelHardLimits { public static final int MAX_ABSTRACT_LENGTH = 150000; public static final int MAX_RELATED_ABSTRACT_LENGTH = 500; public static final int MAX_INSTANCES = 10; + public static final Map MAX_RELATIONS_BY_RELCLASS = Maps.newHashMap(); + + static { + MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.PERSON_PERSON_HASCOAUTHORED, 500L); + MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.RESULT_PERSON_HASAUTHORED, 500L); + } public static String getCollectionName(String format) { return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 56cbda4d6..f72fd4269 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -151,12 +151,17 @@ public class PromoteActionPayloadForGraphTableJob { SparkSession spark, String path, Class rowClazz) { logger.info("Reading graph table from path: {}", path); - return spark - .read() - .textFile(path) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), - Encoders.bean(rowClazz)); + if (HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) { + return spark + .read() + .textFile(path) + .map( + (MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz), + Encoders.bean(rowClazz)); + } else { + logger.info("Found empty graph table from path: {}", path); + return spark.emptyDataset(Encoders.bean(rowClazz)); + } } private static Dataset readActionPayload( @@ -223,7 +228,7 @@ public class PromoteActionPayloadForGraphTableJob { rowClazz, actionPayloadClazz); - if (shouldGroupById) { + if (Boolean.TRUE.equals(shouldGroupById)) { return PromoteActionPayloadFunctions .groupGraphTableByIdAndMerge( joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); @@ -250,6 +255,8 @@ public class PromoteActionPayloadForGraphTableJob { return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation()); case "eu.dnetlib.dhp.schema.oaf.Software": return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software()); + case "eu.dnetlib.dhp.schema.oaf.Person": + return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Person()); default: throw new RuntimeException("unknown class: " + clazz.getCanonicalName()); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java index f0b094240..a3b975d0a 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadFunctions.java @@ -50,7 +50,7 @@ public class PromoteActionPayloadFunctions { PromoteAction.Strategy promoteActionStrategy, Class rowClazz, Class actionPayloadClazz) { - if (!isSubClass(rowClazz, actionPayloadClazz)) { + if (Boolean.FALSE.equals(isSubClass(rowClazz, actionPayloadClazz))) { throw new RuntimeException( "action payload type must be the same or be a super type of table row type"); } diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt index dd8f5e14e..14409a42a 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/import.txt @@ -7,3 +7,4 @@ promote_action_payload_for_project_table classpath eu/dnetlib/dhp/actionmanager/ promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app +promote_action_payload_for_person_table classpath eu/dnetlib/dhp/actionmanager/wf/person/oozie_app diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml index 65ddd402b..7ccfb342e 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/main/oozie_app/workflow.xml @@ -148,6 +148,7 @@ + @@ -270,6 +271,21 @@ + + + ${wf:appPath()}/promote_action_payload_for_person_table + + + + inputActionPayloadRootPath + ${workingDir}/action_payload_by_type + + + + + + + diff --git a/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml new file mode 100644 index 000000000..1bacd09f1 --- /dev/null +++ b/dhp-workflows/dhp-actionmanager/src/main/resources/eu/dnetlib/dhp/actionmanager/wf/person/oozie_app/workflow.xml @@ -0,0 +1,129 @@ + + + + activePromotePersonActionPayload + when true will promote actions with eu.dnetlib.dhp.schema.oaf.Person payload + + + inputGraphRootPath + root location of input materialized graph + + + inputActionPayloadRootPath + root location of action payloads to promote + + + outputGraphRootPath + root location for output materialized graph + + + mergeAndGetStrategy + strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + ${(activePromotePersonActionPayload eq "true") and + (fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")} + + + + + + + + yarn-cluster + cluster + PromotePersonActionPayloadForPersonTable + eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob + dhp-actionmanager-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --inputGraphTablePath${inputGraphRootPath}/person + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Person + --inputActionPayloadPath${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Person + --actionPayloadClassNameeu.dnetlib.dhp.schema.oaf.Person + --outputGraphTablePath${outputGraphRootPath}/person + --mergeAndGetStrategy${mergeAndGetStrategy} + --promoteActionStrategy${promoteActionStrategy} + + + + + + + + + + + -pb + ${inputGraphRootPath}/person + ${outputGraphRootPath}/person + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java index debf7e38e..1131f85e9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/personentity/ExtractPerson.java @@ -2,16 +2,23 @@ package eu.dnetlib.dhp.actionmanager.personentity; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static org.apache.spark.sql.functions.*; +import java.io.BufferedWriter; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -21,6 +28,7 @@ import org.apache.spark.sql.*; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.spark_project.jetty.util.StringUtil; import com.fasterxml.jackson.databind.ObjectMapper; @@ -28,10 +36,14 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.collection.orcid.model.Author; import eu.dnetlib.dhp.collection.orcid.model.Employment; import eu.dnetlib.dhp.collection.orcid.model.Work; +import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.common.person.CoAuthorshipIterator; +import eu.dnetlib.dhp.common.person.Coauthors; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Person; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -44,7 +56,7 @@ import scala.Tuple2; public class ExtractPerson implements Serializable { private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); - + private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String OPENAIRE_PREFIX = "openaire____"; private static final String SEPARATOR = "::"; @@ -61,6 +73,41 @@ public class ExtractPerson implements Serializable { private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; + public static final String FUNDER_AUTHORS_CLASSID = "sysimport:crosswalk:funderdatabase"; + public static final String FUNDER_AUTHORS_CLASSNAME = "Imported from Funder Database"; + public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556"; + public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; + + public static List collectedfromOpenAIRE = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + + public static final DataInfo ORCIDDATAINFO = OafMapperUtils + .dataInfo( + false, + null, + false, + false, + OafMapperUtils + .qualifier( + ORCID_AUTHORS_CLASSID, + ORCID_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"); + + public static final DataInfo FUNDERDATAINFO = OafMapperUtils + .dataInfo( + false, + null, + false, + false, + OafMapperUtils + .qualifier( + FUNDER_AUTHORS_CLASSID, + FUNDER_AUTHORS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.91"); public static void main(final String[] args) throws IOException, ParseException { @@ -91,19 +138,130 @@ public class ExtractPerson implements Serializable { final String workingDir = parser.get("workingDir"); log.info("workingDir {}", workingDir); + final String dbUrl = parser.get("postgresUrl"); + final String dbUser = parser.get("postgresUser"); + final String dbPassword = parser.get("postgresPassword"); + + final String hdfsNameNode = parser.get("hdfsNameNode"); + SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - createActionSet(spark, inputPath, outputPath, workingDir); + extractInfoForActionSetFromORCID(spark, inputPath, workingDir); + extractInfoForActionSetFromProjects( + spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode); + createActionSet(spark, outputPath, workingDir); }); } - private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { + private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir, + String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", hdfsNameNode); + + FileSystem fileSystem = FileSystem.get(conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fos = fileSystem.create(hdfsWritePath); + try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) { + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) { + dbClient.processResults(QUERY, rs -> writeRelation(getRelationWithProject(rs), writer)); + } + + } catch (IOException e) { + e.printStackTrace(); + } + + } + + public static Relation getRelationWithProject(ResultSet rs) { + try { + return getProjectRelation( + rs.getString("project"), rs.getString("pid"), + rs.getString("role")); + } catch (final SQLException e) { + throw new RuntimeException(e); + } + } + + private static Relation getProjectRelation(String project, String orcid, String role) { + + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + String target = project.substring(0, 14) + + IdentifierFactory.md5(project.substring(15)); + List properties = new ArrayList<>(); + + Relation relation = OafMapperUtils + .getRelation( + source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE, + ModelConstants.PROJECT_PERSON_PARTICIPATES, + collectedfromOpenAIRE, + FUNDERDATAINFO, + null); + relation.setValidated(true); + + if (StringUtil.isNotBlank(role)) { + KeyValue kv = new KeyValue(); + kv.setKey("role"); + kv.setValue(role); + properties.add(kv); + } + + if (!properties.isEmpty()) + relation.setProperties(properties); + return relation; + + } + + protected static void writeRelation(final Relation relation, BufferedWriter writer) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(relation)); + writer.newLine(); + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + + private static void createActionSet(SparkSession spark, String outputPath, String workingDir) { + + Dataset people; + people = spark + .read() + .textFile(workingDir + "/people") + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, Person.class), + Encoders.bean(Person.class)); + + people + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)) + .union( + getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/coauthorship") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/affiliation") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .union( + getRelations(spark, workingDir + "/project") + .toJavaRDD() + .map(r -> new AtomicAction(r.getClass(), r))) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + + private static void extractInfoForActionSetFromORCID(SparkSession spark, String inputPath, String workingDir) { Dataset authors = spark .read() .parquet(inputPath + "Authors") @@ -129,18 +287,13 @@ public class ExtractPerson implements Serializable { .parquet(inputPath + "Employments") .as(Encoders.bean(Employment.class)); - Dataset peopleToMap = authors - .joinWith(works, authors.col("orcid").equalTo(works.col("orcid"))) - .map((MapFunction, Author>) t2 -> t2._1(), Encoders.bean(Author.class)) - .groupByKey((MapFunction) a -> a.getOrcid(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Author.class)); - Dataset employment = employmentDataset - .joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) + .joinWith(authors, employmentDataset.col("orcid").equalTo(authors.col("orcid"))) .map((MapFunction, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); - Dataset people; - peopleToMap.map((MapFunction) op -> { + // Mapping all the orcid profiles even if the profile has no visible works + + authors.map((MapFunction) op -> { Person person = new Person(); person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); person @@ -193,6 +346,7 @@ public class ExtractPerson implements Serializable { ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null)); person.setDateofcollection(op.getLastModifiedDate()); person.setOriginalId(Arrays.asList(op.getOrcid())); + person.setDataInfo(ORCIDDATAINFO); return person; }, Encoders.bean(Person.class)) .write() @@ -246,34 +400,6 @@ public class ExtractPerson implements Serializable { .option("compression", "gzip") .mode(SaveMode.Overwrite) .json(workingDir + "/affiliation"); - - people = spark - .read() - .textFile(workingDir + "/people") - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, Person.class), - Encoders.bean(Person.class)); - - people.show(false); - people - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .union( - getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r))) - .union( - getRelations(spark, workingDir + "/coauthorship") - .toJavaRDD() - .map(r -> new AtomicAction(r.getClass(), r))) - .union( - getRelations(spark, workingDir + "/affiliation") - .toJavaRDD() - .map(r -> new AtomicAction(r.getClass(), r))) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); } private static Dataset getRelations(SparkSession spark, String path) { @@ -307,23 +433,17 @@ public class ExtractPerson implements Serializable { source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, ModelConstants.ORG_PERSON_PARTICIPATES, Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.91"), + ORCIDDATAINFO, null); + relation.setValidated(true); - if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtils.isNotBlank(row.getStartDate())) { + if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { KeyValue kv = new KeyValue(); kv.setKey("startDate"); kv.setValue(row.getStartDate()); properties.add(kv); } - if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtils.isNotBlank(row.getEndDate())) { + if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) { KeyValue kv = new KeyValue(); kv.setKey("endDate"); kv.setValue(row.getEndDate()); @@ -336,45 +456,6 @@ public class ExtractPerson implements Serializable { } - private static Collection getCoAuthorshipRelations(String orcid1, String orcid2) { - String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1); - String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2); - - return Arrays - .asList( - OafMapperUtils - .getRelation( - source, target, ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.91"), - null), - OafMapperUtils - .getRelation( - target, source, ModelConstants.PERSON_PERSON_RELTYPE, - ModelConstants.PERSON_PERSON_SUBRELTYPE, - ModelConstants.PERSON_PERSON_HASCOAUTHORED, - Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.91"), - null)); - - } - private static @NotNull Iterator getAuthorshipRelationIterator(Work w) { if (Optional.ofNullable(w.getPids()).isPresent()) @@ -417,21 +498,15 @@ public class ExtractPerson implements Serializable { default: return null; } - - return OafMapperUtils + Relation relation = OafMapperUtils .getRelation( source, target, ModelConstants.RESULT_PERSON_RELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_HASAUTHORED, Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), - OafMapperUtils - .dataInfo( - false, null, false, false, - OafMapperUtils - .qualifier( - ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "0.91"), + ORCIDDATAINFO, null); + relation.setValidated(true); + return relation; } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json index 5175552e7..1894a6beb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/as_parameters.json @@ -21,5 +21,30 @@ "paramLongName": "workingDir", "paramDescription": "the hdfs name node", "paramRequired": false +}, + { + "paramName": "pu", + "paramLongName": "postgresUrl", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + + { + "paramName": "ps", + "paramLongName": "postgresUser", + "paramDescription": "the hdfs name node", + "paramRequired": false + }, + { + "paramName": "pp", + "paramLongName": "postgresPassword", + "paramDescription": "the hdfs name node", + "paramRequired": false +},{ + "paramName": "nn", + "paramLongName": "hdfsNameNode", + "paramDescription": "the hdfs name node", + "paramRequired": false } + ] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties index d2269718c..b9325bcb7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/job.properties @@ -1,2 +1,5 @@ inputPath=/data/orcid_2023/tables/ -outputPath=/user/miriam.baglioni/peopleAS \ No newline at end of file +outputPath=/user/miriam.baglioni/peopleAS +postgresUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus +postgresUser=dnet +postgresPassword=dnetPwd \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml index 166e7bb9c..5b613a76a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/personentity/oozie_app/workflow.xml @@ -9,6 +9,18 @@ outputPath the path where to store the actionset + + postgresUrl + the path where to store the actionset + + + postgresUser + the path where to store the actionset + + + postgresPassword + the path where to store the actionset + sparkDriverMemory memory for driver process @@ -102,6 +114,10 @@ --inputPath${inputPath} --outputPath${outputPath} --workingDir${workingDir} + --hdfsNameNode${nameNode} + --postgresUrl${postgresUrl} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} diff --git a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml index dd7827da4..563a549f3 100644 --- a/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-blacklist/src/main/resources/eu/dnetlib/dhp/blacklist/oozie_app/workflow.xml @@ -63,6 +63,7 @@ + @@ -120,6 +121,15 @@ + + + ${nameNode}/${sourcePath}/person + ${nameNode}/${outputPath}/person + + + + + ${nameNode}/${sourcePath}/datasource diff --git a/dhp-workflows/dhp-enrichment/pom.xml b/dhp-workflows/dhp-enrichment/pom.xml index 9698dee03..41f57e6df 100644 --- a/dhp-workflows/dhp-enrichment/pom.xml +++ b/dhp-workflows/dhp-enrichment/pom.xml @@ -48,12 +48,7 @@ io.github.classgraph classgraph - - eu.dnetlib.dhp - dhp-aggregation - 1.2.5-SNAPSHOT - compile - + diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java new file mode 100644 index 000000000..5a63c9991 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/person/SparkExtractPersonRelations.java @@ -0,0 +1,302 @@ + +package eu.dnetlib.dhp.person; + +import static com.ibm.icu.text.PluralRules.Operand.w; +import static eu.dnetlib.dhp.PropagationConstant.*; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.person.CoAuthorshipIterator; +import eu.dnetlib.dhp.common.person.Coauthors; +import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import scala.Tuple2; + +public class SparkExtractPersonRelations { + + private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class); + private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; + + public static final DataInfo DATAINFO = OafMapperUtils + .dataInfo( + false, + "openaire", + true, + false, + OafMapperUtils + .qualifier( + ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY, + ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "0.85"); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkCountryPropagationJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String sourcePath = parser.get("sourcePath"); + log.info("sourcePath: {}", sourcePath); + + final String workingPath = parser.get("outputPath"); + log.info("workingPath: {}", workingPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + + extractRelations( + spark, + sourcePath, + workingPath); + removeIsolatedPerson(spark, sourcePath, workingPath); + }); + } + + private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) { + Dataset personDataset = spark + .read() + .schema(Encoders.bean(Person.class).schema()) + .json(sourcePath + "person") + .as(Encoders.bean(Person.class)); + + Dataset relationDataset = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(sourcePath + "relation") + .as(Encoders.bean(Relation.class)); + + personDataset + .join(relationDataset, personDataset.col("id").equalTo(relationDataset.col("source")), "left_semi") + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(workingPath + "person"); + + spark + .read() + .schema(Encoders.bean(Person.class).schema()) + .json(workingPath + "person") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(sourcePath + "person"); + } + + private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) { + + Dataset> relationDataset = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(sourcePath + "relation") + .as(Encoders.bean(Relation.class)) + .map( + (MapFunction>) r -> new Tuple2<>( + r.getSource() + r.getRelClass() + r.getTarget(), r), + Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class))); + + ModelSupport.entityTypes + .keySet() + .stream() + .filter(ModelSupport::isResult) + .forEach( + e -> { + // 1. search for results having orcid_pending and orcid in the set of pids for the authors + Dataset resultWithOrcids = spark + .read() + .schema(Encoders.bean(Result.class).schema()) + .json(sourcePath + e.name()) + .as(Encoders.bean(Result.class)) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible() && + Optional + .ofNullable(r.getAuthor()) + .isPresent()) + .filter( + (FilterFunction) r -> r + .getAuthor() + .stream() + .anyMatch( + a -> Optional + .ofNullable( + a + .getPid()) + .isPresent() && + a + .getPid() + .stream() + .anyMatch( + p -> Arrays + .asList("orcid", "orcid_pending") + .contains(p.getQualifier().getClassid().toLowerCase())))); + // 2. create authorship relations between the result identifier and the person entity with + // orcid_pending. + Dataset> newRelations = resultWithOrcids + .flatMap( + (FlatMapFunction) r -> getAuthorshipRelations(r), + Encoders.bean(Relation.class)) +// .groupByKey((MapFunction) r-> r.getSource()+r.getTarget(), Encoders.STRING() ) +// .mapGroups((MapGroupsFunction) (k,it) -> it.next(), Encoders.bean(Relation.class) ) + .map( + (MapFunction>) r -> new Tuple2<>( + r.getSource() + r.getRelClass() + r.getTarget(), r), + Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class))); + newRelations + .joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left") + .map((MapFunction, Tuple2>, Relation>) t2 -> { + if (t2._2() == null) + return t2._1()._2(); + return null; + }, Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r != null) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingPath); + + // 2.1 store in a separate location the relation between the person and the pids for the result? + + // 3. create co_authorship relations between the pairs of authors with orcid/orcid_pending pids + newRelations = resultWithOrcids + .map((MapFunction) r -> getAuthorsPidList(r), Encoders.bean(Coauthors.class)) + .flatMap( + (FlatMapFunction) c -> new CoAuthorshipIterator(c.getCoauthors()), + Encoders.bean(Relation.class)) + .groupByKey( + (MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (k, it) -> it.next(), + Encoders.bean(Relation.class)) + .map( + (MapFunction>) r -> new Tuple2<>( + r.getSource() + r.getRelClass() + r.getTarget(), r), + Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class))); + newRelations + .joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left") + .map((MapFunction, Tuple2>, Relation>) t2 -> { + if (t2._2() == null) + return t2._1()._2(); + return null; + }, Encoders.bean(Relation.class)) + .filter((FilterFunction) r -> r != null) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(workingPath); + + }); + spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(workingPath) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(sourcePath + "relation"); + + } + + private static Coauthors getAuthorsPidList(Result r) { + Coauthors coauth = new Coauthors(); + coauth + .setCoauthors( + r + .getAuthor() + .stream() + .filter( + a -> a + .getPid() + .stream() + .anyMatch( + p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid()))) + .map(a -> { + Optional tmp = a + .getPid() + .stream() + .filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")) + .findFirst(); + if (tmp.isPresent()) + return tmp.get().getValue(); + tmp = a + .getPid() + .stream() + .filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")) + .findFirst(); + if (tmp.isPresent()) + return tmp.get().getValue(); + + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + return coauth; + + } + + private static Iterator getAuthorshipRelations(Result r) { + List relationList = new ArrayList<>(); + for (Author a : r.getAuthor()) + + relationList.addAll(a.getPid().stream().map(p -> { + + if (p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")) + return getRelation(p.getValue(), r.getId()); + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + + return relationList.iterator(); + } + + private static Relation getRelation(String orcid, String resultId) { + + String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid); + + Relation relation = OafMapperUtils + .getRelation( + source, resultId, ModelConstants.RESULT_PERSON_RELTYPE, + ModelConstants.RESULT_PERSON_SUBRELTYPE, + ModelConstants.RESULT_PERSON_HASAUTHORED, + null, // collectedfrom = null + DATAINFO, + null); + + return relation; + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/import.txt b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/import.txt index b20259414..8922b6ac6 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/import.txt +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/import.txt @@ -7,4 +7,5 @@ community_organization classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunit result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app community_project classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app -country_propagation classpath eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app \ No newline at end of file +country_propagation classpath eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app +person_propagation classpath eu/dnetlib/dhp/wf/subworkflows/person/oozie_app \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml index 8e91707b6..4351cd595 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/main/oozie_app/workflow.xml @@ -122,6 +122,7 @@ ${wf:conf('resumeFrom') eq 'CommunityProject'} ${wf:conf('resumeFrom') eq 'CommunitySemanticRelation'} ${wf:conf('resumeFrom') eq 'CountryPropagation'} + ${wf:conf('resumeFrom') eq 'PersonPropagation'} @@ -291,10 +292,24 @@ + + + + + + ${wf:appPath()}/person_propagation + + + + + sourcePath + ${outputPath} + + + - ${wf:appPath()}/country_propagation @@ -319,6 +334,8 @@ + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml index ba3633e07..8eaa79c53 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/orcidtoresultfromsemrel/oozie_app/workflow.xml @@ -34,6 +34,7 @@ + @@ -80,6 +81,17 @@ + + + ${jobTracker} + ${nameNode} + ${nameNode}/${sourcePath}/person + ${nameNode}/${outputPath}/person + + + + + diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json new file mode 100644 index 000000000..df65d5320 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json @@ -0,0 +1,21 @@ +[ + { + "paramName":"s", + "paramLongName":"sourcePath", + "paramDescription": "the path of the sequencial file to read", + "paramRequired": true + }, + { + "paramName": "out", + "paramLongName": "outputPath", + "paramDescription": "the path used to store temporary output files", + "paramRequired": true + }, + + { + "paramName": "ssm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "true if the spark session is managed, false otherwise", + "paramRequired": false + } +] diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/job.properties b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/job.properties new file mode 100644 index 000000000..61bd3d121 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/job.properties @@ -0,0 +1 @@ +sourcePath=/tmp/miriam/13_graph_copy \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/config-default.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/config-default.xml new file mode 100644 index 000000000..1cb0b8a5e --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/config-default.xml @@ -0,0 +1,58 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + + hive_metastore_uris + thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083 + + + spark2YarnHistoryServerAddress + http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089 + + + spark2EventLogDir + /user/spark/spark2ApplicationHistory + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + + + sparkExecutorNumber + 4 + + + sparkDriverMemory + 15G + + + sparkExecutorMemory + 5G + + + sparkExecutorCores + 4 + + + spark2MaxExecutors + 50 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/workflow.xml new file mode 100644 index 000000000..c9b914384 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/wf/subworkflows/person/oozie_app/workflow.xml @@ -0,0 +1,68 @@ + + + + sourcePath + the source path + + + + + + ${jobTracker} + ${nameNode} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + + + + + + + + yarn + cluster + personPropagation + eu.dnetlib.dhp.person.SparkExtractPersonRelations + dhp-enrichment-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.speculation=false + --conf spark.hadoop.mapreduce.map.speculative=false + --conf spark.hadoop.mapreduce.reduce.speculative=false + --conf spark.sql.shuffle.partitions=7680 + + --sourcePath${sourcePath}/ + --outputPath${workingDir}/relation + + + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/person/PersonPropagationJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/person/PersonPropagationJobTest.java new file mode 100644 index 000000000..ff6350325 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/person/PersonPropagationJobTest.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.person; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob; +import eu.dnetlib.dhp.schema.oaf.*; +import scala.Tuple2; + +public class PersonPropagationJobTest { + + private static final Logger log = LoggerFactory.getLogger(PersonPropagationJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PersonPropagationJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PersonPropagationJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PersonPropagationJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + void testPersonPropagation() throws Exception { + final String sourcePath = getClass() + .getResource("/eu/dnetlib/dhp/personpropagation/graph") + .getPath(); + + SparkExtractPersonRelations + .main( + new String[] { + "--isSparkSessionManaged", Boolean.FALSE.toString(), + "--sourcePath", sourcePath, + "--outputPath", workingDir.toString() + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + // TODO write assertions and find relevant information for hte resource files + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/dataset/part-00000 b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/dataset/part-00000 new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/otherresearchproduct/part-00000 b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/otherresearchproduct/part-00000 new file mode 100644 index 000000000..47a3fdccb --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/otherresearchproduct/part-00000 @@ -0,0 +1 @@ +{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "resourcetype": {"classid": "Taxonomic treatment", "classname": "Taxonomic treatment", "schemename": "dnet:dataCite_resource", "schemeid": "dnet:dataCite_resource"}, "pid": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}], "bestaccessright": {"classid": "OPEN", "classname": "Open Access", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "relevantdate": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "issued", "classname": "issued", "schemename": "dnet:dataCite_date", "schemeid": "dnet:dataCite_date"}, "value": "2023-11-28"}], "contributor": [], "id": "50|doi_________::fa6db8629c4a8d13ec21e445b309d1c8", "description": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "11.1 Saltia papposa (Forrsk.) Moq., Prodr. [A. P. de Candolle] 13(2): 325. 1849 \\u2261 Achyranthes papposa Forssk., Fl. Aegypt.-Arab.: 48. 1775. Lectotype (designate here): \\u2014 YEMEN. Zabid, s.d., Forssk\\u00e5l 205 (C10001569!, image of the lectotype available at https://plants.jstor. org/stable/viewer/10.5555/al.ap.specimen.c10001569?page=1); isolectotypes: C10001570! (image of the isolectotype available at https://plants.jstor.org/stable/viewer/10.5555/al.ap.specimen.c10001570?page=1) and BM000950560! (image of the isolectotype available at https://data.nhm.ac.uk/object/c634a45c-983a-42f3-9c4d-1d1b06f5f88b/1691539200000). Typification of the name Achyranthes papposa:\\u2014 Forssk\\u00e5l (1775: 48) published Achyranthes papposa by giving a short diagnosis (\\u201c foliis alternis; crassiusculis; lineari-cuneatis, obtusis \\u201d) and a detailed description; the provenance [\\u201c Zeb\\u00edd \\u201d (currently Zabid), a city of W-Yemen] is reported [see also Forssk\\u00e5l (1775: CVII) who indicated \\u201cMi.\\u201d as provenance of A. papposa, \\u201cMi.\\u201d meaning \\u201cMontium Regionis Inferior\\u201d (Forssk\\u00e5l 1775: CI)]. We traced two specimens at C, where Forsskal\\u2019s herbarium and types are mostly preserved (HUH Index of Botanists 2013c), i.e. viz. C10001569 and C10001570, both collected at Zabid; a further specimen is kept at BM (BM000950560) and it was annotated by Frank Nigel Hepper as an isotype. These three specimens are part of the original material for A. papposa (see also Hepper & Friis 1994). C10001569 bears a branch of a plant with more flowers than in C10001570. Since the morphology of the flowers is important to identify Saltia papposa (Townsend 1993), we here designate C10001569 as the lectotype of A. papposa. C10001570 and BM000950560 are isolectotypes. Chorology:\\u2015 Endemic to the Arabian Peninsula (Saudi Arabia and Yemen; POWO 2023). Occurrence in Saudi Arabia:\\u2015 Doubtfully in Makkah (Miller & Cope 1996). We did not trace any specimen collected in Saudi Arabia, but it is not impossible that Saltia papposa occurs in the country, probably in the south-eastern coastal area (Jazan?) (see also Ghazanfar & Fisher 2013: 178\\u2013179)."}], "eoscifguidelines": [], "author": [{"surname": "Hassan", "name": "Walaa A.", "pid": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "orcid_pending", "classname": "Open Researcher and Contributor ID", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "0000-0001-7605-9058"}], "rank": 1, "affiliation": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "Botany and Microbiology Department, Faculty of Science, Beni-Suef University, Beni-Suef, Egypt & azmeyw @ gmail. com; https: // orcid. org / 0000 - 0001 - 7605 - 9058"}], "fullname": "Hassan, Walaa A."}, {"surname": "Al-Shaye", "name": "Najla A.", "pid": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "orcid_pending", "classname": "Open Researcher and Contributor ID", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "0000-0002-0447-8613"}], "rank": 2, "affiliation": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "Department of Biology, College of Science, Princess Nourah bint Abdulrahman University, Riyadh, Saudi Arabia & naaalshaye @ pnu. edu. sa; https: // orcid. org / 0000 - 0002 - 0447 - 8613"}], "fullname": "Al-Shaye, Najla A."}, {"surname": "Iamonico", "name": "Duilio", "pid": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "orcid_pending", "classname": "Open Researcher and Contributor ID", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "0000-0001-5491-7568"}], "rank": 3, "affiliation": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "Department of Environmental Biology, Univeristy of Rome Sapienza, 00185 Rome, Italy & duilio. iamonico @ uniroma 1. it; https: // orcid. org / 0000 - 0001 - 5491 - 7568"}], "fullname": "Iamonico, Duilio"}], "contactgroup": [], "collectedfrom": [{"value": "ZENODO", "key": "10|opendoar____::358aee4cc897452c00244351e4d91f69"}], "instance": [{"refereed": {"classid": "0002", "classname": "nonPeerReviewed", "schemename": "dnet:review_levels", "schemeid": "dnet:review_levels"}, "hostedby": {"value": "ZENODO", "key": "10|opendoar____::358aee4cc897452c00244351e4d91f69"}, "license": {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "CC 0"}, "url": ["http://dx.doi.org/10.5281/zenodo.10249277", "http://treatment.plazi.org/id/97224201FFE29001FF4C6AB685F912EB"], "pid": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.5281/zenodo.10249277"}], "instanceTypeMapping": [{"originalType": "Taxonomic treatment", "vocabularyName": "openaire::coar_resource_types_3_1"}], "alternateIdentifier": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "oai", "classname": "Open Archives Initiative", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "oai:zenodo.org:10249277"}], "dateofacceptance": {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "2023-11-28"}, "collectedfrom": {"value": "ZENODO", "key": "10|opendoar____::358aee4cc897452c00244351e4d91f69"}, "accessright": {"classid": "OPEN", "classname": "Open Access", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "instancetype": {"classid": "0020", "classname": "Other ORP type", "schemename": "dnet:publication_resource", "schemeid": "dnet:publication_resource"}}], "dateofcollection": "2023-12-21T22:24:48+0000", "fulltext": [], "dateoftransformation": "2024-01-18T06:50:15.691Z", "dateofacceptance": {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "2023-11-28"}, "format": [], "tool": [], "subject": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Tracheophyta"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Magnoliopsida"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Amaranthaceae"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Saltia"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Saltia papposa"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Biodiversity"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Plantae"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Caryophyllales"}, {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "keyword", "classname": "keyword", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "Taxonomy"}], "coverage": [], "externalReference": [], "publisher": {"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "value": "Zenodo"}, "lastupdatetimestamp": 1721832280654, "language": {"classid": "und", "classname": "Undetermined", "schemename": "dnet:languages", "schemeid": "dnet:languages"}, "resulttype": {"classid": "other", "classname": "other", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "country": [], "extraInfo": [], "originalId": ["oai:zenodo.org:10249277", "50|od______2659::42fc9730cd6f5de3b0e3bfacdc347177"], "contactperson": [], "source": [], "context": [], "title": [{"dataInfo": {"invisible": false, "trust": "0.9", "deletedbyinference": false, "inferred": false, "provenanceaction": {"classid": "sysimport:crosswalk:repository", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "qualifier": {"classid": "main title", "classname": "main title", "schemename": "dnet:dataCite_title", "schemeid": "dnet:dataCite_title"}, "value": "Saltia papposa Moq., Prodr."}]} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/publication/part-00000 b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/publication/part-00000 new file mode 100644 index 000000000..af1b5d55c --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/publication/part-00000 @@ -0,0 +1 @@ +{"dataInfo": {"invisible": false, "trust": "0.91", "deletedbyinference": true, "inferred": false, "provenanceaction": {"classid": "sysimport:actionset", "classname": "Harvested", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}}, "resourcetype": {"classid": "publication", "classname": "publication", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "pid": [{"qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.11646/phytotaxa.379.3.5"}], "bestaccessright": {"classid": "UNKNOWN", "classname": "not available", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "relevantdate": [{"qualifier": {"classid": "created", "classname": "created", "schemename": "dnet:dataCite_date", "schemeid": "dnet:dataCite_date"}, "value": "2018-11-29"}, {"qualifier": {"classid": "published-online", "classname": "published-online", "schemename": "dnet:dataCite_date", "schemeid": "dnet:dataCite_date"}, "value": "2018-11-29"}], "collectedfrom": [{"value": "Crossref", "key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"}], "id": "50|doi_________::b2eae15cfe9b0d7f416b6dcfc84c09f9", "description": [{"value": "As part of the ongoing studies on the genus Polycarpon Linnaeus (1759: 859, 881) (see e.g., Iamonico 2015a, 2015b, 2015c, Iamonico & Domina 2015), and on the Italian loci classici (see e.g., Peruzzi et al. 2015, Brundu et al. 2015, Domina et al. 2016, Di Gristina et al. 2017, Domina et al. 2017, 2018a, 2018b), we present here a note regarding Hagaea alsinifolia Bivona-Bernardi (1815: 7\\u20138) [currently accepted (see Bartolucci et al. 2018) as Polycarpon tetraphyllum Linnaeus (1759: 881) subsp. alsinifolium (Biv.) Ball (1877: 370)]."}], "lastupdatetimestamp": 1648743612067, "author": [{"surname": "IAMONICO", "fullname": "DUILIO IAMONICO", "pid": [], "name": "DUILIO", "rank": 1}, {"surname": "DOMINA", "fullname": "GIANNIANTONIO DOMINA", "pid": [], "name": "GIANNIANTONIO", "rank": 2}], "instance": [{"refereed": {"classid": "0001", "classname": "peerReviewed", "schemename": "dnet:review_levels", "schemeid": "dnet:review_levels"}, "hostedby": {"dataInfo": {"invisible": false, "deletedbyinference": false}, "value": "Phytotaxa", "key": "10|issn___print::9336d3bbf63c241b54726a55fa38c0ef"}, "url": ["https://doi.org/10.11646/phytotaxa.379.3.5"], "pid": [{"qualifier": {"classid": "doi", "classname": "Digital Object Identifier", "schemename": "dnet:pid_types", "schemeid": "dnet:pid_types"}, "value": "10.11646/phytotaxa.379.3.5"}], "instanceTypeMapping": [{"originalType": "journal-article", "typeLabel": "research article", "vocabularyName": "openaire::coar_resource_types_3_1", "typeCode": "http://purl.org/coar/resource_type/c_2df8fbb1"}, {"originalType": "http://purl.org/coar/resource_type/c_2df8fbb1", "typeLabel": "Article", "vocabularyName": "openaire::user_resource_types", "typeCode": "Article"}], "dateofacceptance": {"value": "2018-11-29"}, "collectedfrom": {"value": "Crossref", "key": "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2"}, "accessright": {"classid": "UNKNOWN", "classname": "not available", "schemename": "dnet:access_modes", "schemeid": "dnet:access_modes"}, "instancetype": {"classid": "0001", "classname": "Article", "schemename": "dnet:publication_resource", "schemeid": "dnet:publication_resource"}}], "dateofcollection": "2024-07-26T02:32:47.105", "metaResourceType": {"classid": "Research Literature", "classname": "Research Literature", "schemename": "openaire::meta_resource_types", "schemeid": "openaire::meta_resource_types"}, "context": [], "journal": {"issnPrinted": "1179-3155", "vol": "379", "sp": "267", "issnOnline": "1179-3163", "name": "Phytotaxa"}, "subject": [{"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": ""}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "0106 biological sciences"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": ""}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "0301 basic medicine"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": ""}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "03 medical and health sciences"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": "0.5"}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "03010801 Mycology/Symbiosis"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": "0.5"}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "030108 mycology & parasitology"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": "0.5"}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "010603 evolutionary biology"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": ""}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "01 natural sciences"}, {"dataInfo": {"deletedbyinference": false, "provenanceaction": {"classid": "subject:fos", "classname": "Inferred by OpenAIRE", "schemename": "dnet:provenanceActions", "schemeid": "dnet:provenanceActions"}, "inferred": true, "inferenceprovenance": "update", "invisible": false, "trust": "0.5"}, "qualifier": {"classid": "FOS", "classname": "Fields of Science and Technology classification", "schemename": "dnet:subject_classification_typologies", "schemeid": "dnet:subject_classification_typologies"}, "value": "01060304 Pollination/Angiosperms"}], "externalReference": [], "publisher": {"value": "Magnolia Press"}, "eoscifguidelines": [], "language": {"classid": "und", "classname": "Undetermined", "schemename": "dnet:languages", "schemeid": "dnet:languages"}, "resulttype": {"classid": "publication", "classname": "publication", "schemename": "dnet:result_typologies", "schemeid": "dnet:result_typologies"}, "country": [], "title": [{"qualifier": {"classid": "main title", "classname": "main title", "schemename": "dnet:dataCite_title", "schemeid": "dnet:dataCite_title"}, "value": "Epitypification of Hagaea alsinifolia (Polyycarpon tetraphyllum subsp. Alsinifolium, Caryophyllaceae)"}], "originalId": ["10.11646/phytotaxa.379.3.5", "50|doiboost____|b2eae15cfe9b0d7f416b6dcfc84c09f9"], "source": [{"value": "Crossref"}], "dateofacceptance": {"value": "2018-11-29"}} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/relation/part-00000 b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/relation/part-00000 new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/software/part-00000 b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/person/graph/software/part-00000 new file mode 100644 index 000000000..e69de29bb diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 00505fedc..ed7d3aa1b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -522,10 +522,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i case "resultOrganization_affiliation_isAuthorInstitutionOf": if (!"organization".equals(sourceType)) { throw new IllegalStateException( - String - .format( - "invalid claim, sourceId: %s, targetId: %s, semantics: %s", sourceId, targetId, - semantics)); + String + .format( + "invalid claim, sourceId: %s, targetId: %s, semantics: %s", sourceId, targetId, + semantics)); } r1 = setRelationSemantic(r1, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF); r2 = setRelationSemantic(r2, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION); @@ -533,10 +533,10 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i case "resultOrganization_affiliation_hasAuthorInstitution": if (!"organization".equals(targetType)) { throw new IllegalStateException( - String - .format( - "invalid claim, sourceId: %s, targetId: %s, semantics: %s", sourceId, targetId, - semantics)); + String + .format( + "invalid claim, sourceId: %s, targetId: %s, semantics: %s", sourceId, targetId, + semantics)); } r1 = setRelationSemantic(r1, RESULT_ORGANIZATION, AFFILIATION, HAS_AUTHOR_INSTITUTION); r2 = setRelationSemantic(r2, RESULT_ORGANIZATION, AFFILIATION, IS_AUTHOR_INSTITUTION_OF); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml index 72fc9e338..4031da15a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/enrich/orcid/oozie_app/workflow.xml @@ -89,6 +89,14 @@ ${nameNode}/${graphPath}/project ${nameNode}/${targetPath}/project + + + + + + ${nameNode}/${graphPath}/person + ${nameNode}/${targetPath}/person + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 4188cb018..2512fc5bc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -142,6 +142,7 @@ + @@ -390,6 +391,41 @@ + + + yarn + cluster + Clean person + eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=2000 + + --inputPath${graphInputPath}/person + --outputPath${graphOutputPath}/person + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Person + --isLookupUrl${isLookupUrl} + --contextId${contextId} + --verifyParam${verifyParam} + --country${country} + --verifyCountryParam${verifyCountryParam} + --hostedBy${workingDir}/working/hostedby + --collectedfrom${collectedfrom} + --masterDuplicatePath${workingDir}/masterduplicate + --deepClean${shouldClean} + + + + + yarn diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml index eec67fc5c..872ef8a2d 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/hive/oozie_app/workflow.xml @@ -102,6 +102,7 @@ + @@ -308,6 +309,35 @@ + + + yarn + cluster + Import table person + eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --conf spark.sql.shuffle.partitions=1000 + + --inputPath${inputPath}/person + --hiveDbName${hiveDbName} + --classNameeu.dnetlib.dhp.schema.oaf.Person + --hiveMetastoreUris${hiveMetastoreUris} + --numPartitions1000 + + + + + yarn diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index c4d1b6b58..c2852267c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -16,8 +16,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.common.RelationInverse; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; @@ -32,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.common.RelationInverse; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; @@ -366,6 +366,7 @@ class MigrateDbEntitiesApplicationTest { assertValidId(r1.getCollectedfrom().get(0).getKey()); assertValidId(r2.getCollectedfrom().get(0).getKey()); } + @Test void testProcessClaims_affiliation() throws Exception { final List fields = prepareMocks("claimsrel_resultset_affiliation.json"); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index add1c80fa..a7421ebd1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -233,6 +233,14 @@ public class CreateRelatedEntitiesJob_phase1 { if (!f.isEmpty()) { re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList())); } + break; + case person: + final Person person = (Person) entity; + + re.setGivenName(person.getGivenName()); + re.setFamilyName(person.getFamilyName()); + re.setAlternativeNames(person.getAlternativeNames()); + break; } return re; diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java index 351526336..2593ef6fe 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PayloadConverterJob.java @@ -2,10 +2,12 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import static eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits.MAX_RELATIONS_BY_RELCLASS; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import org.apache.commons.io.IOUtils; @@ -15,11 +17,13 @@ import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.apache.spark.util.LongAccumulator; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -27,11 +31,13 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import eu.dnetlib.dhp.schema.solr.SolrRecord; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -124,6 +130,9 @@ public class PayloadConverterJob { .map(Oaf::getDataInfo) .map(DataInfo::getDeletedbyinference) .orElse(false)) + .map( + (MapFunction) PayloadConverterJob::pruneRelatedEntities, + Encoders.kryo(JoinedEntity.class)) .map( (MapFunction>) je -> new Tuple2<>( recordFactory.build(je, validateXML), @@ -139,6 +148,32 @@ public class PayloadConverterJob { .json(outputPath); } + /** + * This function iterates through the RelatedEntityWrapper(s) associated to the JoinedEntity and rules out + * those exceeding the maximum allowed frequency defined in eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits#MAX_RELATIONS_BY_RELCLASS + */ + private static JoinedEntity pruneRelatedEntities(JoinedEntity je) { + Map freqs = Maps.newHashMap(); + List rew = Lists.newArrayList(); + + if (je.getLinks() != null) { + je.getLinks().forEach(link -> { + final String relClass = link.getRelation().getRelClass(); + + final Long count = freqs.getOrDefault(relClass, 0L); + final Long max = MAX_RELATIONS_BY_RELCLASS.getOrDefault(relClass, Long.MAX_VALUE); + + if (count <= max) { + rew.add(link); + freqs.put(relClass, freqs.getOrDefault(relClass, 0L) + 1); + } + }); + je.setLinks(rew); + } + + return je; + } + private static void removeOutputDir(final SparkSession spark, final String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 69aa940c9..738d75189 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -5,7 +5,6 @@ import java.io.StringReader; import java.util.*; import java.util.stream.Collectors; -import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -24,6 +23,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory; +import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits; import eu.dnetlib.dhp.schema.solr.*; import eu.dnetlib.dhp.schema.solr.AccessRight; import eu.dnetlib.dhp.schema.solr.Author; @@ -38,6 +38,8 @@ import eu.dnetlib.dhp.schema.solr.Measure; import eu.dnetlib.dhp.schema.solr.OpenAccessColor; import eu.dnetlib.dhp.schema.solr.OpenAccessRoute; import eu.dnetlib.dhp.schema.solr.Organization; +import eu.dnetlib.dhp.schema.solr.Person; +import eu.dnetlib.dhp.schema.solr.PersonTopic; import eu.dnetlib.dhp.schema.solr.Pid; import eu.dnetlib.dhp.schema.solr.Project; import eu.dnetlib.dhp.schema.solr.Result; @@ -90,6 +92,8 @@ public class ProvisionModelSupport { r.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) { r.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); + } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Person) { + r.setPerson(mapPerson((eu.dnetlib.dhp.schema.oaf.Person) e)); } r .setLinks( @@ -152,11 +156,16 @@ public class ProvisionModelSupport { rr.setResulttype(mapQualifier(re.getResulttype())); rr.setTitle(Optional.ofNullable(re.getTitle()).map(StructuredProperty::getValue).orElse(null)); rr.setDescription(StringUtils.left(re.getDescription(), ModelHardLimits.MAX_RELATED_ABSTRACT_LENGTH)); - rr.setAuthor(Optional.ofNullable(re.getAuthor()) - .map(aa -> aa.stream() - .limit(ModelHardLimits.MAX_RELATED_AUTHORS) - .collect(Collectors.toList())) - .orElse(null)); + rr + .setAuthor( + Optional + .ofNullable(re.getAuthor()) + .map( + aa -> aa + .stream() + .limit(ModelHardLimits.MAX_RELATED_AUTHORS) + .collect(Collectors.toList())) + .orElse(null)); if (relation.getValidated() == null) { relation.setValidated(false); @@ -192,6 +201,18 @@ public class ProvisionModelSupport { return ps; } + private static Person mapPerson(eu.dnetlib.dhp.schema.oaf.Person p) { + Person ps = new Person(); + ps.setFamilyName(p.getFamilyName()); + ps.setGivenName(p.getGivenName()); + ps.setAlternativeNames(p.getAlternativeNames()); + ps.setBiography(p.getBiography()); + ps.setConsent(p.getConsent()); + // ps.setSubject(...)); + + return ps; + } + private static Funding mapFunding(List fundingtree, VocabularyGroup vocs) { SAXReader reader = new SAXReader(); return Optional @@ -688,10 +709,10 @@ public class ProvisionModelSupport { .map( a -> Author .newInstance( - StringUtils.left(a.getFullname(), ModelHardLimits.MAX_AUTHOR_FULLNAME_LENGTH), - a.getName(), - a.getSurname(), - a.getRank(), asPid(a.getPid()))) + StringUtils.left(a.getFullname(), ModelHardLimits.MAX_AUTHOR_FULLNAME_LENGTH), + a.getName(), + a.getSurname(), + a.getRank(), asPid(a.getPid()))) .collect(Collectors.toList())) .orElse(null); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index ee010910c..2a6332857 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -51,6 +51,11 @@ public class RelatedEntity implements Serializable { private Qualifier contracttype; private List fundingtree; + // person + private String givenName; + private String familyName; + private List alternativeNames; + public String getId() { return id; } @@ -251,6 +256,30 @@ public class RelatedEntity implements Serializable { this.fundingtree = fundingtree; } + public String getGivenName() { + return givenName; + } + + public void setGivenName(String givenName) { + this.givenName = givenName; + } + + public String getFamilyName() { + return familyName; + } + + public void setFamilyName(String familyName) { + this.familyName = familyName; + } + + public List getAlternativeNames() { + return alternativeNames; + } + + public void setAlternativeNames(List alternativeNames) { + this.alternativeNames = alternativeNames; + } + @Override public boolean equals(Object o) { if (this == o) @@ -280,7 +309,10 @@ public class RelatedEntity implements Serializable { && Objects.equal(code, that.code) && Objects.equal(acronym, that.acronym) && Objects.equal(contracttype, that.contracttype) - && Objects.equal(fundingtree, that.fundingtree); + && Objects.equal(fundingtree, that.fundingtree) + && Objects.equal(givenName, that.givenName) + && Objects.equal(familyName, that.familyName) + && Objects.equal(alternativeNames, that.alternativeNames); } @Override @@ -309,6 +341,9 @@ public class RelatedEntity implements Serializable { code, acronym, contracttype, - fundingtree); + fundingtree, + familyName, + givenName, + alternativeNames); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 3c8f5cef5..97d2d3989 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -1035,6 +1035,48 @@ public class XmlRecordFactory implements Serializable { .collect(Collectors.toList())); } + break; + case person: + final Person person = (Person) entity; + + if (person.getGivenName() != null) { + metadata.add(XmlSerializationUtils.asXmlElement("givenname", person.getGivenName())); + } + if (person.getFamilyName() != null) { + metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName())); + } + if (person.getAlternativeNames() != null) { + metadata + .addAll( + person + .getAlternativeNames() + .stream() + .map(altName -> XmlSerializationUtils.asXmlElement("alternativename", altName)) + .collect(Collectors.toList())); + } + if (person.getBiography() != null) { + metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography())); + } + if (person.getSubject() != null) { + metadata + .addAll( + person + .getSubject() + .stream() + .map(pt -> { + List> attrs = Lists.newArrayList(); + attrs.add(new Tuple2<>("schema", pt.getSchema())); + attrs.add(new Tuple2<>("value", pt.getValue())); + attrs.add(new Tuple2<>("fromYear", String.valueOf(pt.getFromYear()))); + attrs.add(new Tuple2<>("toYear", String.valueOf(pt.getToYear()))); + return XmlSerializationUtils.asXmlElement("subject", attrs); + }) + .collect(Collectors.toList())); + } + if (person.getConsent() != null) { + metadata.add(XmlSerializationUtils.asXmlElement("consent", String.valueOf(person.getConsent()))); + } + break; default: throw new IllegalArgumentException("invalid entity type: " + type); @@ -1240,6 +1282,25 @@ public class XmlRecordFactory implements Serializable { .collect(Collectors.toList())); } break; + + case person: + + if (isNotBlank(re.getGivenName())) { + metadata.add(XmlSerializationUtils.asXmlElement("givenname", re.getGivenName())); + } + if (isNotBlank(re.getFamilyName())) { + metadata.add(XmlSerializationUtils.asXmlElement("familyname", re.getFamilyName())); + } + if (re.getAlternativeNames() != null && !re.getAlternativeNames().isEmpty()) { + metadata + .addAll( + re + .getAlternativeNames() + .stream() + .map(name -> XmlSerializationUtils.asXmlElement("alternativename", name)) + .collect(Collectors.toList())); + } + break; default: throw new IllegalArgumentException("invalid target type: " + targetType); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 15d3b6300..879911ccc 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -180,6 +180,7 @@ + @@ -378,6 +379,34 @@ + + + yarn + cluster + Join[relation.target = person.id] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=5000 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputRelationsPath${workingDir}/relation + --inputEntityPath${inputGraphRootPath}/person + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Person + --outputPath${workingDir}/join_partial/person + + + + + @@ -388,6 +417,7 @@ + @@ -593,6 +623,35 @@ + + + yarn + cluster + Join[person.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=5000 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/person + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Person + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/person + --numPartitions10000 + + + + +