Compare commits

...

27 Commits

Author SHA1 Message Date
Miriam Baglioni 1fce7d5a0f [Person] remove the isolated nodes from the person set 2024-10-25 10:05:17 +02:00
Miriam Baglioni 32f444984e [person] - 2024-10-24 17:51:42 +02:00
Miriam Baglioni a7699558ed [person] - 2024-10-24 16:15:12 +02:00
Miriam Baglioni 01679c935a [person] added test class to be implemented 2024-10-24 15:27:06 +02:00
Miriam Baglioni c773421cc7 [person] added new substep in propagation worflow main 2024-10-24 14:44:13 +02:00
Miriam Baglioni cf07ed9058 [person] refactoring 2024-10-24 14:35:14 +02:00
Miriam Baglioni c921cf7ee0 [personEntity] removed the deletedbyinference results (not indexed, but still in the graph). Changed the writing mode: append instead of overwrite 2024-10-24 09:57:20 +02:00
Miriam Baglioni aac5eb3499 [personEntity] changed the data info for the relations with projects. added missing parameters to the job.properties file 2024-10-22 11:54:16 +02:00
Miriam Baglioni 821540f94a [personEntity] updated the property file to include also the db parameters. The same for the wf definition. Refactoring for compilation 2024-10-22 10:13:30 +02:00
Miriam Baglioni 09a2c93fc7 [personEntity] added relations with projects extracting the info from the database 2024-10-21 16:21:15 +02:00
Miriam Baglioni ce4ee1189f [personEntity] create entity for each profile in orcid even without works. Added validated true to each relation coming from orcid data 2024-10-21 14:38:15 +02:00
Claudio Atzori 4f0463d779 [graph provision] person serialisation, limit the number of authorships and coauthorships before expanding the payloads 2024-09-24 14:54:34 +02:00
Claudio Atzori d1cadc77c9 [graph provision] person serialisation, limit the number of authorships and coauthorships before expanding the payloads 2024-09-24 10:57:20 +02:00
Michele Artini 0e89d4a1cf fixed a bug with topic ENRICH/MORE/SUBJECT/ARXIV 2024-09-24 08:57:49 +02:00
Michele Artini 7f81673f3c removed the deletedByInference=true filter 2024-09-23 15:27:43 +02:00
Claudio Atzori e0ff84baf0 [graph provision] person serialisation, limit the number of authorships and coauthorships before expanding the payloads 2024-09-23 10:29:46 +02:00
Claudio Atzori 5f86c93be6 [graph provision] person serialisation 2024-09-20 12:20:00 +02:00
Claudio Atzori 23e0ab3a7c run mergeResultsOfDifferentTypes only when checkDelegatedAuthority is true 2024-09-17 15:36:10 +02:00
Miriam Baglioni 45605f93ae merging with branch beta 2024-08-12 18:03:10 +02:00
Miriam Baglioni 5a7ba77271 [Person]fix issue in affiliation relation id construction for person (missing ::) 2024-08-12 18:01:15 +02:00
Miriam Baglioni 8c185a7b1a resolving conflicts 2024-08-05 17:14:11 +02:00
Claudio Atzori e16616b964 added dataInfo to person records 2024-08-05 15:57:37 +02:00
Miriam Baglioni 985ca15264 [openaire-affiliation]removes matchings without DOI 2024-08-05 12:10:40 +02:00
Claudio Atzori 0bf76f2a34 [graph provision] added person to the graph2hive workflow 2024-08-05 09:35:07 +02:00
Claudio Atzori 975d44cac7 [graph provision] added person to the provision workflow 2024-08-02 16:14:10 +02:00
Claudio Atzori 6bdb8643e6 ActionManager promote: allow to ingest person records in a graph that did not contain them, bumped dhp-schemas version 2024-07-31 11:02:22 +02:00
Claudio Atzori 9486e21a44 copy or process the person records throughout the graph pipeline 2024-07-30 14:25:31 +02:00
47 changed files with 1560 additions and 131 deletions

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionmanager.personentity; package eu.dnetlib.dhp.common.person;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
@ -61,7 +61,7 @@ public class CoAuthorshipIterator implements Iterator<Relation> {
private Relation getRelation(String orcid1, String orcid2) { private Relation getRelation(String orcid1, String orcid2) {
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1); String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2); String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
return OafMapperUtils Relation relation = OafMapperUtils
.getRelation( .getRelation(
source, target, ModelConstants.PERSON_PERSON_RELTYPE, source, target, ModelConstants.PERSON_PERSON_RELTYPE,
ModelConstants.PERSON_PERSON_SUBRELTYPE, ModelConstants.PERSON_PERSON_SUBRELTYPE,
@ -76,5 +76,7 @@ public class CoAuthorshipIterator implements Iterator<Relation> {
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91"), "0.91"),
null); null);
relation.setValidated(true);
return relation;
} }
} }

View File

@ -1,12 +1,9 @@
package eu.dnetlib.dhp.actionmanager.personentity; package eu.dnetlib.dhp.common.person;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class Coauthors implements Serializable { public class Coauthors implements Serializable {
private List<String> coauthors; private List<String> coauthors;

View File

@ -363,6 +363,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
// nothing to clean here // nothing to clean here
} else if (value instanceof Project) { } else if (value instanceof Project) {
// nothing to clean here // nothing to clean here
} else if (value instanceof Person) {
// nothing to clean here
} else if (value instanceof Organization) { } else if (value instanceof Organization) {
Organization o = (Organization) value; Organization o = (Organization) value;
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) { if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {

View File

@ -30,6 +30,7 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
public class MergeUtils { public class MergeUtils {
public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) { public static <T extends Oaf> T mergeById(String s, Iterator<T> oafEntityIterator) {
return mergeGroup(s, oafEntityIterator, true); return mergeGroup(s, oafEntityIterator, true);
} }
@ -88,7 +89,7 @@ public class MergeUtils {
private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) { private static Oaf mergeEntities(Oaf left, Oaf right, boolean checkDelegatedAuthority) {
if (sameClass(left, right, Result.class)) { if (sameClass(left, right, Result.class)) {
if (!left.getClass().equals(right.getClass()) || checkDelegatedAuthority) { if (checkDelegatedAuthority) {
return mergeResultsOfDifferentTypes((Result) left, (Result) right); return mergeResultsOfDifferentTypes((Result) left, (Result) right);
} }

View File

@ -1,6 +1,12 @@
package eu.dnetlib.dhp.schema.oaf.utils; package eu.dnetlib.dhp.schema.oaf.utils;
import java.util.Map;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.common.ModelConstants;
public class ModelHardLimits { public class ModelHardLimits {
private ModelHardLimits() { private ModelHardLimits() {
@ -19,6 +25,12 @@ public class ModelHardLimits {
public static final int MAX_ABSTRACT_LENGTH = 150000; public static final int MAX_ABSTRACT_LENGTH = 150000;
public static final int MAX_RELATED_ABSTRACT_LENGTH = 500; public static final int MAX_RELATED_ABSTRACT_LENGTH = 500;
public static final int MAX_INSTANCES = 10; public static final int MAX_INSTANCES = 10;
public static final Map<String, Long> MAX_RELATIONS_BY_RELCLASS = Maps.newHashMap();
static {
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.PERSON_PERSON_HASCOAUTHORED, 500L);
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.RESULT_PERSON_HASAUTHORED, 500L);
}
public static String getCollectionName(String format) { public static String getCollectionName(String format) {
return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION; return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;

View File

@ -151,12 +151,17 @@ public class PromoteActionPayloadForGraphTableJob {
SparkSession spark, String path, Class<G> rowClazz) { SparkSession spark, String path, Class<G> rowClazz) {
logger.info("Reading graph table from path: {}", path); logger.info("Reading graph table from path: {}", path);
return spark if (HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) {
.read() return spark
.textFile(path) .read()
.map( .textFile(path)
(MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), .map(
Encoders.bean(rowClazz)); (MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz));
} else {
logger.info("Found empty graph table from path: {}", path);
return spark.emptyDataset(Encoders.bean(rowClazz));
}
} }
private static <A extends Oaf> Dataset<A> readActionPayload( private static <A extends Oaf> Dataset<A> readActionPayload(
@ -223,7 +228,7 @@ public class PromoteActionPayloadForGraphTableJob {
rowClazz, rowClazz,
actionPayloadClazz); actionPayloadClazz);
if (shouldGroupById) { if (Boolean.TRUE.equals(shouldGroupById)) {
return PromoteActionPayloadFunctions return PromoteActionPayloadFunctions
.groupGraphTableByIdAndMerge( .groupGraphTableByIdAndMerge(
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
@ -250,6 +255,8 @@ public class PromoteActionPayloadForGraphTableJob {
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation()); return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation());
case "eu.dnetlib.dhp.schema.oaf.Software": case "eu.dnetlib.dhp.schema.oaf.Software":
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software()); return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software());
case "eu.dnetlib.dhp.schema.oaf.Person":
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Person());
default: default:
throw new RuntimeException("unknown class: " + clazz.getCanonicalName()); throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
} }

View File

@ -50,7 +50,7 @@ public class PromoteActionPayloadFunctions {
PromoteAction.Strategy promoteActionStrategy, PromoteAction.Strategy promoteActionStrategy,
Class<G> rowClazz, Class<G> rowClazz,
Class<A> actionPayloadClazz) { Class<A> actionPayloadClazz) {
if (!isSubClass(rowClazz, actionPayloadClazz)) { if (Boolean.FALSE.equals(isSubClass(rowClazz, actionPayloadClazz))) {
throw new RuntimeException( throw new RuntimeException(
"action payload type must be the same or be a super type of table row type"); "action payload type must be the same or be a super type of table row type");
} }

View File

@ -7,3 +7,4 @@ promote_action_payload_for_project_table classpath eu/dnetlib/dhp/actionmanager/
promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app
promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app
promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app
promote_action_payload_for_person_table classpath eu/dnetlib/dhp/actionmanager/wf/person/oozie_app

View File

@ -148,6 +148,7 @@
<path start="PromoteActionPayloadForPublicationTable"/> <path start="PromoteActionPayloadForPublicationTable"/>
<path start="PromoteActionPayloadForRelationTable"/> <path start="PromoteActionPayloadForRelationTable"/>
<path start="PromoteActionPayloadForSoftwareTable"/> <path start="PromoteActionPayloadForSoftwareTable"/>
<path start="PromoteActionPayloadForPersonTable"/>
</fork> </fork>
<action name="PromoteActionPayloadForDatasetTable"> <action name="PromoteActionPayloadForDatasetTable">
@ -270,6 +271,21 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="PromoteActionPayloadForPersonTable">
<sub-workflow>
<app-path>${wf:appPath()}/promote_action_payload_for_person_table</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>inputActionPayloadRootPath</name>
<value>${workingDir}/action_payload_by_type</value>
</property>
</configuration>
</sub-workflow>
<ok to="JoinPromote"/>
<error to="Kill"/>
</action>
<join name="JoinPromote" to="End"/> <join name="JoinPromote" to="End"/>
<end name="End"/> <end name="End"/>

View File

@ -0,0 +1,129 @@
<workflow-app name="promote_action_payload_for_person_table" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>activePromotePersonActionPayload</name>
<description>when true will promote actions with eu.dnetlib.dhp.schema.oaf.Person payload</description>
</property>
<property>
<name>inputGraphRootPath</name>
<description>root location of input materialized graph</description>
</property>
<property>
<name>inputActionPayloadRootPath</name>
<description>root location of action payloads to promote</description>
</property>
<property>
<name>outputGraphRootPath</name>
<description>root location for output materialized graph</description>
</property>
<property>
<name>mergeAndGetStrategy</name>
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="DecisionPromotePersonActionPayload"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="DecisionPromotePersonActionPayload">
<switch>
<case to="PromotePersonActionPayloadForPersonTable">
${(activePromotePersonActionPayload eq "true") and
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")}
</case>
<default to="SkipPromotePersonActionPayloadForPersonTable"/>
</switch>
</decision>
<action name="PromotePersonActionPayloadForPersonTable">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>PromotePersonActionPayloadForPersonTable</name>
<class>eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob</class>
<jar>dhp-actionmanager-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--inputActionPayloadPath</arg><arg>${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/person</arg>
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="SkipPromotePersonActionPayloadForPersonTable">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${outputGraphRootPath}/person"/>
</prepare>
<arg>-pb</arg>
<arg>${inputGraphRootPath}/person</arg>
<arg>${outputGraphRootPath}/person</arg>
</distcp>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,210 @@
/*
* Copyright (c) 2024.
* SPDX-FileCopyrightText: © 2023 Consiglio Nazionale delle Ricerche
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.FunctionalInterfaceSupport.*;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import static org.apache.spark.sql.functions.*;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class PromoteResultWithMeasuresTest {
private static final Logger log = LoggerFactory.getLogger(PromoteResultWithMeasuresTest.class);
private static SparkSession spark;
private static Path tempDir;
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@BeforeAll
public static void beforeAll() throws IOException {
tempDir = Files.createTempDirectory(PromoteResultWithMeasuresTest.class.getSimpleName());
log.info("using work dir {}", tempDir);
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.setAppName(PromoteResultWithMeasuresTest.class.getSimpleName());
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", tempDir.toString());
conf.set("hive.metastore.warehouse.dir", tempDir.resolve("warehouse").toString());
spark = SparkSession.builder().config(conf).getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
spark.stop();
FileUtils.deleteDirectory(tempDir.toFile());
}
@Test
void testPromoteResultWithMeasures_job() throws Exception {
final String inputGraphTablePath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
.getPath();
final String inputActionPayloadPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
.getPath();
final String actionPayloadsPath = tempDir.resolve("actionPayloads").toString();
spark
.read()
.text(inputActionPayloadPath)
.withColumn("payload", col("value"))
.select("payload")
.write()
.parquet(actionPayloadsPath);
final Path outputGraphTablePath = tempDir.resolve("outputGraphTablePath");
PromoteActionPayloadForGraphTableJob
.main(new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--graphTableClassName", Publication.class.getCanonicalName(),
"--inputGraphTablePath", inputGraphTablePath,
"--inputActionPayloadPath", actionPayloadsPath,
"--actionPayloadClassName", Result.class.getCanonicalName(),
"--outputGraphTablePath", outputGraphTablePath.toString(),
"--mergeAndGetStrategy", MergeAndGet.Strategy.MERGE_FROM_AND_GET.toString(),
"--promoteActionStrategy", PromoteAction.Strategy.ENRICH.toString(),
"--shouldGroupById", "true"
});
assertFalse(isDirEmpty(outputGraphTablePath));
final Encoder<Publication> pubEncoder = Encoders.bean(Publication.class);
List<Publication> results = spark
.read()
.schema(pubEncoder.schema())
.json(outputGraphTablePath.toString())
.as(pubEncoder)
.collectAsList();
verify(results);
}
@Test
void testPromoteResultWithMeasures_internal() throws JsonProcessingException {
Dataset<Publication> rowDS = spark
.read()
.schema(Encoders.bean(Publication.class).schema())
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/graph")
.as(Encoders.bean(Publication.class));
Dataset<Result> actionPayloadDS = spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json("src/test/resources/eu/dnetlib/dhp/actionmanager/promote/measures/actionPayloads")
.as(Encoders.bean(Result.class));
final MergeAndGet.Strategy mergeFromAndGet = MergeAndGet.Strategy.MERGE_FROM_AND_GET;
final SerializableSupplier<Function<Publication, String>> rowIdFn = ModelSupport::idFn;
final SerializableSupplier<BiFunction<Publication, Result, Publication>> mergeAndGetFn = MergeAndGet
.functionFor(mergeFromAndGet);
final SerializableSupplier<Publication> zeroFn = () -> Publication.class
.cast(new eu.dnetlib.dhp.schema.oaf.Publication());
final SerializableSupplier<Function<Publication, Boolean>> isNotZeroFn = PromoteResultWithMeasuresTest::isNotZeroFnUsingIdOrSourceAndTarget;
Dataset<Publication> joinedResults = PromoteActionPayloadFunctions
.joinGraphTableWithActionPayloadAndMerge(
rowDS,
actionPayloadDS,
rowIdFn,
ModelSupport::idFn,
mergeAndGetFn,
PromoteAction.Strategy.ENRICH,
Publication.class,
Result.class);
SerializableSupplier<BiFunction<Publication, Publication, Publication>> mergeRowsAndGetFn = MergeAndGet
.functionFor(mergeFromAndGet);
Dataset<Publication> mergedResults = PromoteActionPayloadFunctions
.groupGraphTableByIdAndMerge(
joinedResults, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, Publication.class);
verify(mergedResults.collectAsList());
}
private static void verify(List<Publication> results) throws JsonProcessingException {
assertNotNull(results);
assertEquals(1, results.size());
Result r = results.get(0);
log.info(OBJECT_MAPPER.writeValueAsString(r));
assertNotNull(r.getMeasures());
assertFalse(r.getMeasures().isEmpty());
assertTrue(
r
.getMeasures()
.stream()
.map(Measure::getId)
.collect(Collectors.toCollection(HashSet::new))
.containsAll(
Lists
.newArrayList(
"downloads", "views", "influence", "popularity", "influence_alt", "popularity_alt",
"impulse")));
}
private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
return t -> {
if (isSubClass(t, Relation.class)) {
final Relation rel = (Relation) t;
return StringUtils.isNotBlank(rel.getSource()) && StringUtils.isNotBlank(rel.getTarget());
}
return StringUtils.isNotBlank(((OafEntity) t).getId());
};
}
private static boolean isDirEmpty(final Path directory) throws IOException {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory)) {
return !dirStream.iterator().hasNext();
}
}
}

View File

@ -0,0 +1,3 @@
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"downloads","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"125","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"views","unit":[{"key":"opendoar____::358aee4cc897452c00244351e4d91f69||ZENODO","value":"35","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:usage_counts","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":[{"id":"influence","unit":[{"key":"score","value":"3.1167566E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity","unit":[{"key":"score","value":"7.335433E-9","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"influence_alt","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"popularity_alt","unit":[{"key":"score","value":"2.96","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]},{"id":"impulse","unit":[{"key":"score","value":"4","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},{"key":"class","value":"C5","dataInfo":{"invisible":false,"inferred":true,"deletedbyinference":false,"trust":"","inferenceprovenance":"update","provenanceaction":{"classid":"measure:bip","classname":"Inferred by OpenAIRE","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}]}],"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":null,"publiclyFunded":null,"transformativeAgreement":null,"isGreen":null,"isInDiamondJournal":null}
{"collectedfrom":null,"dataInfo":null,"lastupdatetimestamp":null,"id":"50|doi_dedup___::02317b7093277ec8aa0311d5c6a25b9b","originalId":null,"pid":null,"dateofcollection":null,"dateoftransformation":null,"extraInfo":null,"oaiprovenance":null,"measures":null,"context":null,"processingchargeamount":null,"processingchargecurrency":null,"author":null,"resulttype":null,"metaResourceType":null,"language":null,"country":null,"subject":null,"title":null,"relevantdate":null,"description":null,"dateofacceptance":null,"publisher":null,"embargoenddate":null,"source":null,"fulltext":null,"format":null,"contributor":null,"resourcetype":null,"coverage":null,"bestaccessright":null,"externalReference":null,"instance":null,"eoscifguidelines":null,"openAccessColor":"hybrid","publiclyFunded":false,"transformativeAgreement":null,"isGreen":true,"isInDiamondJournal":false}

View File

@ -2,15 +2,25 @@
package eu.dnetlib.dhp.actionmanager.personentity; package eu.dnetlib.dhp.actionmanager.personentity;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static org.apache.spark.sql.functions.*;
import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.common.person.Coauthors;
import org.apache.commons.cli.ParseException; import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -28,10 +38,12 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.orcid.model.Author; import eu.dnetlib.dhp.collection.orcid.model.Author;
import eu.dnetlib.dhp.collection.orcid.model.Employment; import eu.dnetlib.dhp.collection.orcid.model.Employment;
import eu.dnetlib.dhp.collection.orcid.model.Work; import eu.dnetlib.dhp.collection.orcid.model.Work;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Person; import eu.dnetlib.dhp.schema.oaf.Person;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
@ -44,7 +56,7 @@ import scala.Tuple2;
public class ExtractPerson implements Serializable { public class ExtractPerson implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class); private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String OPENAIRE_PREFIX = "openaire____"; private static final String OPENAIRE_PREFIX = "openaire____";
private static final String SEPARATOR = "::"; private static final String SEPARATOR = "::";
@ -61,6 +73,41 @@ public class ExtractPerson implements Serializable {
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______"; private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid"; public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID"; public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
public static final String FUNDER_AUTHORS_CLASSID = "sysimport:crosswalk:funderdatabase";
public static final String FUNDER_AUTHORS_CLASSNAME = "Imported from Funder Database";
public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556";
public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE";
public static List<KeyValue> collectedfromOpenAIRE = OafMapperUtils
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
public static final DataInfo ORCIDDATAINFO = OafMapperUtils
.dataInfo(
false,
null,
false,
false,
OafMapperUtils
.qualifier(
ORCID_AUTHORS_CLASSID,
ORCID_AUTHORS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91");
public static final DataInfo FUNDERDATAINFO = OafMapperUtils
.dataInfo(
false,
null,
false,
false,
OafMapperUtils
.qualifier(
FUNDER_AUTHORS_CLASSID,
FUNDER_AUTHORS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91");
public static void main(final String[] args) throws IOException, ParseException { public static void main(final String[] args) throws IOException, ParseException {
@ -91,19 +138,130 @@ public class ExtractPerson implements Serializable {
final String workingDir = parser.get("workingDir"); final String workingDir = parser.get("workingDir");
log.info("workingDir {}", workingDir); log.info("workingDir {}", workingDir);
final String dbUrl = parser.get("postgresUrl");
final String dbUser = parser.get("postgresUser");
final String dbPassword = parser.get("postgresPassword");
final String hdfsNameNode = parser.get("hdfsNameNode");
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
createActionSet(spark, inputPath, outputPath, workingDir); extractInfoForActionSetFromORCID(spark, inputPath, workingDir);
extractInfoForActionSetFromProjects(
spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode);
createActionSet(spark, outputPath, workingDir);
}); });
} }
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) { private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir,
String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", hdfsNameNode);
FileSystem fileSystem = FileSystem.get(conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
dbClient.processResults(QUERY, rs -> writeRelation(getRelationWithProject(rs), writer));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static Relation getRelationWithProject(ResultSet rs) {
try {
return getProjectRelation(
rs.getString("project"), rs.getString("pid"),
rs.getString("role"));
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
private static Relation getProjectRelation(String project, String orcid, String role) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
String target = project.substring(0, 14)
+ IdentifierFactory.md5(project.substring(15));
List<KeyValue> properties = new ArrayList<>();
Relation relation = OafMapperUtils
.getRelation(
source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE,
ModelConstants.PROJECT_PERSON_PARTICIPATES,
collectedfromOpenAIRE,
FUNDERDATAINFO,
null);
relation.setValidated(true);
if (StringUtil.isNotBlank(role)) {
KeyValue kv = new KeyValue();
kv.setKey("role");
kv.setValue(role);
properties.add(kv);
}
if (!properties.isEmpty())
relation.setProperties(properties);
return relation;
}
protected static void writeRelation(final Relation relation, BufferedWriter writer) {
try {
writer.write(OBJECT_MAPPER.writeValueAsString(relation));
writer.newLine();
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
private static void createActionSet(SparkSession spark, String outputPath, String workingDir) {
Dataset<Person> people;
people = spark
.read()
.textFile(workingDir + "/people")
.map(
(MapFunction<String, Person>) value -> OBJECT_MAPPER
.readValue(value, Person.class),
Encoders.bean(Person.class));
people
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.union(
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/coauthorship")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/affiliation")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/project")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
}
private static void extractInfoForActionSetFromORCID(SparkSession spark, String inputPath, String workingDir) {
Dataset<Author> authors = spark Dataset<Author> authors = spark
.read() .read()
.parquet(inputPath + "Authors") .parquet(inputPath + "Authors")
@ -129,18 +287,13 @@ public class ExtractPerson implements Serializable {
.parquet(inputPath + "Employments") .parquet(inputPath + "Employments")
.as(Encoders.bean(Employment.class)); .as(Encoders.bean(Employment.class));
Dataset<Author> peopleToMap = authors
.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
Dataset<Employment> employment = employmentDataset Dataset<Employment> employment = employmentDataset
.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid"))) .joinWith(authors, employmentDataset.col("orcid").equalTo(authors.col("orcid")))
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class)); .map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
Dataset<Person> people; // Mapping all the orcid profiles even if the profile has no visible works
peopleToMap.map((MapFunction<Author, Person>) op -> {
authors.map((MapFunction<Author, Person>) op -> {
Person person = new Person(); Person person = new Person();
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX)); person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
person person
@ -193,6 +346,7 @@ public class ExtractPerson implements Serializable {
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null)); ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null));
person.setDateofcollection(op.getLastModifiedDate()); person.setDateofcollection(op.getLastModifiedDate());
person.setOriginalId(Arrays.asList(op.getOrcid())); person.setOriginalId(Arrays.asList(op.getOrcid()));
person.setDataInfo(ORCIDDATAINFO);
return person; return person;
}, Encoders.bean(Person.class)) }, Encoders.bean(Person.class))
.write() .write()
@ -246,34 +400,6 @@ public class ExtractPerson implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(workingDir + "/affiliation"); .json(workingDir + "/affiliation");
people = spark
.read()
.textFile(workingDir + "/people")
.map(
(MapFunction<String, Person>) value -> OBJECT_MAPPER
.readValue(value, Person.class),
Encoders.bean(Person.class));
people.show(false);
people
.toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))
.union(
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/coauthorship")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.union(
getRelations(spark, workingDir + "/affiliation")
.toJavaRDD()
.map(r -> new AtomicAction(r.getClass(), r)))
.mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
} }
private static Dataset<Relation> getRelations(SparkSession spark, String path) { private static Dataset<Relation> getRelations(SparkSession spark, String path) {
@ -297,7 +423,7 @@ public class ExtractPerson implements Serializable {
} }
private static Relation getAffiliationRelation(Employment row) { private static Relation getAffiliationRelation(Employment row) {
String source = PERSON_PREFIX + IdentifierFactory.md5(row.getOrcid()); String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(row.getOrcid());
String target = ROR_PREFIX String target = ROR_PREFIX
+ IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue())); + IdentifierFactory.md5(PidCleaner.normalizePidValue("ROR", row.getAffiliationId().getValue()));
List<KeyValue> properties = new ArrayList<>(); List<KeyValue> properties = new ArrayList<>();
@ -307,15 +433,9 @@ public class ExtractPerson implements Serializable {
source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE, source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE,
ModelConstants.ORG_PERSON_PARTICIPATES, ModelConstants.ORG_PERSON_PARTICIPATES,
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
OafMapperUtils ORCIDDATAINFO,
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91"),
null); null);
relation.setValidated(true);
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) { if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) {
KeyValue kv = new KeyValue(); KeyValue kv = new KeyValue();
@ -336,45 +456,6 @@ public class ExtractPerson implements Serializable {
} }
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1);
String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2);
return Arrays
.asList(
OafMapperUtils
.getRelation(
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
ModelConstants.PERSON_PERSON_SUBRELTYPE,
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91"),
null),
OafMapperUtils
.getRelation(
target, source, ModelConstants.PERSON_PERSON_RELTYPE,
ModelConstants.PERSON_PERSON_SUBRELTYPE,
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
OafMapperUtils
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91"),
null));
}
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) { private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
if (Optional.ofNullable(w.getPids()).isPresent()) if (Optional.ofNullable(w.getPids()).isPresent())
@ -417,21 +498,15 @@ public class ExtractPerson implements Serializable {
default: default:
return null; return null;
} }
Relation relation = OafMapperUtils
return OafMapperUtils
.getRelation( .getRelation(
source, target, ModelConstants.RESULT_PERSON_RELTYPE, source, target, ModelConstants.RESULT_PERSON_RELTYPE,
ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE,
ModelConstants.RESULT_PERSON_HASAUTHORED, ModelConstants.RESULT_PERSON_HASAUTHORED,
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)), Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
OafMapperUtils ORCIDDATAINFO,
.dataInfo(
false, null, false, false,
OafMapperUtils
.qualifier(
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.91"),
null); null);
relation.setValidated(true);
return relation;
} }
} }

View File

@ -21,5 +21,30 @@
"paramLongName": "workingDir", "paramLongName": "workingDir",
"paramDescription": "the hdfs name node", "paramDescription": "the hdfs name node",
"paramRequired": false "paramRequired": false
},
{
"paramName": "pu",
"paramLongName": "postgresUrl",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "ps",
"paramLongName": "postgresUser",
"paramDescription": "the hdfs name node",
"paramRequired": false
},
{
"paramName": "pp",
"paramLongName": "postgresPassword",
"paramDescription": "the hdfs name node",
"paramRequired": false
},{
"paramName": "nn",
"paramLongName": "hdfsNameNode",
"paramDescription": "the hdfs name node",
"paramRequired": false
} }
] ]

View File

@ -1,2 +1,5 @@
inputPath=/data/orcid_2023/tables/ inputPath=/data/orcid_2023/tables/
outputPath=/user/miriam.baglioni/peopleAS outputPath=/user/miriam.baglioni/peopleAS
postgresUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus
postgresUser=dnet
postgresPassword=dnetPwd

View File

@ -9,6 +9,18 @@
<name>outputPath</name> <name>outputPath</name>
<description>the path where to store the actionset</description> <description>the path where to store the actionset</description>
</property> </property>
<property>
<name>postgresUrl</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>postgresUser</name>
<description>the path where to store the actionset</description>
</property>
<property>
<name>postgresPassword</name>
<description>the path where to store the actionset</description>
</property>
<property> <property>
<name>sparkDriverMemory</name> <name>sparkDriverMemory</name>
<description>memory for driver process</description> <description>memory for driver process</description>
@ -102,6 +114,10 @@
<arg>--inputPath</arg><arg>${inputPath}</arg> <arg>--inputPath</arg><arg>${inputPath}</arg>
<arg>--outputPath</arg><arg>${outputPath}</arg> <arg>--outputPath</arg><arg>${outputPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--postgresUrl</arg><arg>${postgresUrl}</arg>
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>

View File

@ -63,6 +63,7 @@
<path start="copy_software"/> <path start="copy_software"/>
<path start="copy_datasource"/> <path start="copy_datasource"/>
<path start="copy_project"/> <path start="copy_project"/>
<path start="copy_person"/>
<path start="copy_organization"/> <path start="copy_organization"/>
</fork> </fork>
@ -120,6 +121,15 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/person</arg>
<arg>${nameNode}/${outputPath}/person</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasource"> <action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>

View File

@ -70,9 +70,8 @@ public class PrepareRelatedProjectsJob {
final Dataset<Relation> rels = ClusterUtils final Dataset<Relation> rels = ClusterUtils
.loadRelations(graphPath, spark) .loadRelations(graphPath, spark)
.filter((FilterFunction<Relation>) r -> r.getDataInfo().getDeletedbyinference()) .filter((FilterFunction<Relation>) r -> ModelConstants.RESULT_PROJECT.equals(r.getRelType()))
.filter((FilterFunction<Relation>) r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT)) .filter((FilterFunction<Relation>) r -> !BrokerConstants.IS_MERGED_IN_CLASS.equals(r.getRelClass()))
.filter((FilterFunction<Relation>) r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getSource())) .filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getTarget())); .filter((FilterFunction<Relation>) r -> !ClusterUtils.isDedupRoot(r.getTarget()));

View File

@ -53,7 +53,7 @@ public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source
.getPids() .getSubjects()
.stream() .stream()
.filter(s -> !existingSubjects.contains(subjectAsString(s))) .filter(s -> !existingSubjects.contains(subjectAsString(s)))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.broker.oa.matchers.simple;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
public class EnrichMoreSubjectTest {
final EnrichMoreSubject matcher = new EnrichMoreSubject();
@BeforeEach
void setUp() throws Exception {
}
@Test
void testFindDifferences_1() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_2() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertEquals(1, list.size());
}
@Test
void testFindDifferences_3() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
target.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
@Test
void testFindDifferences_4() {
final OaBrokerMainEntity source = new OaBrokerMainEntity();
final OaBrokerMainEntity target = new OaBrokerMainEntity();
source.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
target.setSubjects(Arrays.asList(new OaBrokerTypedValue("arxiv", "subject_01")));
final List<OaBrokerTypedValue> list = this.matcher.findDifferences(source, target);
assertTrue(list.isEmpty());
}
}

View File

@ -48,12 +48,7 @@
<groupId>io.github.classgraph</groupId> <groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId> <artifactId>classgraph</artifactId>
</dependency> </dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-aggregation</artifactId>
<version>1.2.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>

View File

@ -0,0 +1,295 @@
package eu.dnetlib.dhp.person;
import static com.ibm.icu.text.PluralRules.Operand.w;
import static eu.dnetlib.dhp.PropagationConstant.*;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.common.person.Coauthors;
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import scala.Tuple2;
public class SparkExtractPersonRelations {
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
public static final DataInfo DATAINFO = OafMapperUtils
.dataInfo(
false,
"openaire",
true,
false,
OafMapperUtils
.qualifier(
ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY,
ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"0.85");
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkCountryPropagationJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String sourcePath = parser.get("sourcePath");
log.info("sourcePath: {}", sourcePath);
final String workingPath = parser.get("outputPath");
log.info("workingPath: {}", workingPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
extractRelations(
spark,
sourcePath,
workingPath);
removeIsolatedPerson(spark,sourcePath, workingPath);
});
}
private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) {
Dataset<Person> personDataset = spark.read().schema(Encoders.bean(Person.class).schema())
.json(sourcePath + "person")
.as(Encoders.bean(Person.class));
Dataset<Relation> relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema())
.json(sourcePath + "relation")
.as(Encoders.bean(Relation.class));
personDataset.join(relationDataset, personDataset.col("id").equalTo(relationDataset.col("source")), "left_semi")
.write()
.option("compression","gzip")
.mode(SaveMode.Overwrite)
.json(workingPath + "person");
spark.read().schema(Encoders.bean(Person.class).schema())
.json(workingPath + "person")
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.json(sourcePath + "person");
}
private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) {
Dataset<Tuple2<String, Relation>> relationDataset = spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(sourcePath + "relation")
.as(Encoders.bean(Relation.class))
.map(
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
r.getSource() + r.getRelClass() + r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
ModelSupport.entityTypes
.keySet()
.stream()
.filter(ModelSupport::isResult)
.forEach(
e -> {
// 1. search for results having orcid_pending and orcid in the set of pids for the authors
Dataset<Result> resultWithOrcids = spark
.read()
.schema(Encoders.bean(Result.class).schema())
.json(sourcePath + e.name())
.as(Encoders.bean(Result.class))
.filter(
(FilterFunction<Result>) r -> !r.getDataInfo().getDeletedbyinference() &&
!r.getDataInfo().getInvisible() &&
Optional
.ofNullable(r.getAuthor())
.isPresent())
.filter(
(FilterFunction<Result>) r -> r
.getAuthor()
.stream()
.anyMatch(
a -> Optional
.ofNullable(
a
.getPid())
.isPresent() &&
a
.getPid()
.stream()
.anyMatch(
p -> Arrays
.asList("orcid", "orcid_pending")
.contains(p.getQualifier().getClassid().toLowerCase()))));
// 2. create authorship relations between the result identifier and the person entity with
// orcid_pending.
Dataset<Tuple2<String, Relation>> newRelations = resultWithOrcids
.flatMap(
(FlatMapFunction<Result, Relation>) r -> getAuthorshipRelations(r),
Encoders.bean(Relation.class))
// .groupByKey((MapFunction<Relation, String>) r-> r.getSource()+r.getTarget(), Encoders.STRING() )
// .mapGroups((MapGroupsFunction<String, Relation, Relation>) (k,it) -> it.next(), Encoders.bean(Relation.class) )
.map(
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
r.getSource() + r.getRelClass() + r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
newRelations
.joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left")
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, Relation>>, Relation>) t2 -> {
if (t2._2() == null)
return t2._1()._2();
return null;
}, Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r != null)
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingPath);
// 2.1 store in a separate location the relation between the person and the pids for the result?
// 3. create co_authorship relations between the pairs of authors with orcid/orcid_pending pids
newRelations = resultWithOrcids
.map((MapFunction<Result, Coauthors>) r -> getAuthorsPidList(r), Encoders.bean(Coauthors.class))
.flatMap(
(FlatMapFunction<Coauthors, Relation>) c -> new CoAuthorshipIterator(c.getCoauthors()),
Encoders.bean(Relation.class))
.groupByKey(
(MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(),
Encoders.bean(Relation.class))
.map(
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
r.getSource() + r.getRelClass() + r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
newRelations
.joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left")
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, Relation>>, Relation>) t2 -> {
if (t2._2() == null)
return t2._1()._2();
return null;
}, Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) r -> r != null)
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingPath);
});
spark
.read()
.schema(Encoders.bean(Relation.class).schema())
.json(workingPath)
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(sourcePath + "relation");
}
private static Coauthors getAuthorsPidList(Result r) {
Coauthors coauth = new Coauthors();
coauth
.setCoauthors(
r
.getAuthor()
.stream()
.filter(
a -> a
.getPid()
.stream()
.anyMatch(
p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid())))
.map(a -> {
Optional<StructuredProperty> tmp = a
.getPid()
.stream()
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid"))
.findFirst();
if (tmp.isPresent())
return tmp.get().getValue();
tmp = a
.getPid()
.stream()
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))
.findFirst();
if (tmp.isPresent())
return tmp.get().getValue();
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
return coauth;
}
private static Iterator<Relation> getAuthorshipRelations(Result r) {
List<Relation> relationList = new ArrayList<>();
for (Author a : r.getAuthor())
relationList.addAll(a.getPid().stream().map(p -> {
if (p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))
return getRelation(p.getValue(), r.getId());
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList()));
return relationList.iterator();
}
private static Relation getRelation(String orcid, String resultId) {
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
Relation relation = OafMapperUtils
.getRelation(
source, resultId, ModelConstants.RESULT_PERSON_RELTYPE,
ModelConstants.RESULT_PERSON_SUBRELTYPE,
ModelConstants.RESULT_PERSON_HASAUTHORED,
null, // collectedfrom = null
DATAINFO,
null);
return relation;
}
}

View File

@ -7,4 +7,5 @@ community_organization classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunit
result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_app
community_project classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app community_project classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app
community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app
country_propagation classpath eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app country_propagation classpath eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app
person_propagation classpath eu/dnetlib/dhp/wf/subworkflows/person/oozie_app

View File

@ -122,6 +122,7 @@
<case to="community_project">${wf:conf('resumeFrom') eq 'CommunityProject'}</case> <case to="community_project">${wf:conf('resumeFrom') eq 'CommunityProject'}</case>
<case to="community_sem_rel">${wf:conf('resumeFrom') eq 'CommunitySemanticRelation'}</case> <case to="community_sem_rel">${wf:conf('resumeFrom') eq 'CommunitySemanticRelation'}</case>
<case to="country_propagation">${wf:conf('resumeFrom') eq 'CountryPropagation'}</case> <case to="country_propagation">${wf:conf('resumeFrom') eq 'CountryPropagation'}</case>
<case to="person_propagation">${wf:conf('resumeFrom') eq 'PersonPropagation'}</case>
<default to="orcid_propagation"/> <default to="orcid_propagation"/>
</switch> </switch>
</decision> </decision>
@ -291,10 +292,24 @@
</property> </property>
</configuration> </configuration>
</sub-workflow> </sub-workflow>
<ok to="person_propagation" />
<error to="Kill" />
</action>
<action name="person_propagation">
<sub-workflow>
<app-path>${wf:appPath()}/person_propagation
</app-path>
<propagate-configuration/>
<configuration>
<property>
<name>sourcePath</name>
<value>${outputPath}</value>
</property>
</configuration>
</sub-workflow>
<ok to="country_propagation" /> <ok to="country_propagation" />
<error to="Kill" /> <error to="Kill" />
</action> </action>
<action name="country_propagation"> <action name="country_propagation">
<sub-workflow> <sub-workflow>
<app-path>${wf:appPath()}/country_propagation <app-path>${wf:appPath()}/country_propagation
@ -319,6 +334,8 @@
<error to="Kill" /> <error to="Kill" />
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -34,6 +34,7 @@
<path start="copy_organization"/> <path start="copy_organization"/>
<path start="copy_projects"/> <path start="copy_projects"/>
<path start="copy_datasources"/> <path start="copy_datasources"/>
<path start="copy_persons"/>
</fork> </fork>
<action name="copy_relation"> <action name="copy_relation">
@ -80,6 +81,17 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="copy_persons">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/person</arg>
<arg>${nameNode}/${outputPath}/person</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<join name="copy_wait" to="fork_prepare_assoc_step1"/> <join name="copy_wait" to="fork_prepare_assoc_step1"/>
<fork name="fork_prepare_assoc_step1"> <fork name="fork_prepare_assoc_step1">

View File

@ -0,0 +1,21 @@
[
{
"paramName":"s",
"paramLongName":"sourcePath",
"paramDescription": "the path of the sequencial file to read",
"paramRequired": true
},
{
"paramName": "out",
"paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files",
"paramRequired": true
},
{
"paramName": "ssm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "true if the spark session is managed, false otherwise",
"paramRequired": false
}
]

View File

@ -0,0 +1 @@
sourcePath=/tmp/miriam/13_graph_copy

View File

@ -0,0 +1,58 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
</property>
<property>
<name>sparkExecutorNumber</name>
<value>4</value>
</property>
<property>
<name>sparkDriverMemory</name>
<value>15G</value>
</property>
<property>
<name>sparkExecutorMemory</name>
<value>5G</value>
</property>
<property>
<name>sparkExecutorCores</name>
<value>4</value>
</property>
<property>
<name>spark2MaxExecutors</name>
<value>50</value>
</property>
</configuration>

View File

@ -0,0 +1,68 @@
<workflow-app name="person_propagation" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the source path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="reset_outputpath">
<fs>
<delete path="${workingDir}"/>
<mkdir path="${workingDir}"/>
</fs>
<ok to="extract_person_relation_from_graph"/>
<error to="Kill"/>
</action>
<action name="extract_person_relation_from_graph">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>personPropagation</name>
<class>eu.dnetlib.dhp.person.SparkExtractPersonRelations</class>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,95 @@
package eu.dnetlib.dhp.person;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
import scala.Tuple2;
public class PersonPropagationJobTest {
private static final Logger log = LoggerFactory.getLogger(PersonPropagationJobTest.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static SparkSession spark;
private static Path workingDir;
@BeforeAll
public static void beforeAll() throws IOException {
workingDir = Files.createTempDirectory(PersonPropagationJobTest.class.getSimpleName());
log.info("using work dir {}", workingDir);
SparkConf conf = new SparkConf();
conf.setAppName(PersonPropagationJobTest.class.getSimpleName());
conf.setMaster("local[*]");
conf.set("spark.driver.host", "localhost");
conf.set("hive.metastore.local", "true");
conf.set("spark.ui.enabled", "false");
conf.set("spark.sql.warehouse.dir", workingDir.toString());
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
spark = SparkSession
.builder()
.appName(PersonPropagationJobTest.class.getSimpleName())
.config(conf)
.getOrCreate();
}
@AfterAll
public static void afterAll() throws IOException {
FileUtils.deleteDirectory(workingDir.toFile());
spark.stop();
}
@Test
void testPersonPropagation() throws Exception {
final String sourcePath = getClass()
.getResource("/eu/dnetlib/dhp/personpropagation/graph")
.getPath();
SparkExtractPersonRelations
.main(
new String[] {
"--isSparkSessionManaged", Boolean.FALSE.toString(),
"--sourcePath", sourcePath,
"--outputPath", workingDir.toString()
});
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<Relation> tmp = sc
.textFile(workingDir.toString() + "/relation")
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
//TODO write assertions and find relevant information for hte resource files
}
}

File diff suppressed because one or more lines are too long

View File

@ -89,6 +89,14 @@
<arg>${nameNode}/${graphPath}/project</arg> <arg>${nameNode}/${graphPath}/project</arg>
<arg>${nameNode}/${targetPath}/project</arg> <arg>${nameNode}/${targetPath}/project</arg>
</distcp> </distcp>
<ok to="copy_person"/>
<error to="Kill"/>
</action>
<action name="copy_person">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${graphPath}/person</arg>
<arg>${nameNode}/${targetPath}/person</arg>
</distcp>
<ok to="copy_relation"/> <ok to="copy_relation"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -142,6 +142,7 @@
<path start="clean_datasource"/> <path start="clean_datasource"/>
<path start="clean_organization"/> <path start="clean_organization"/>
<path start="clean_project"/> <path start="clean_project"/>
<path start="clean_person"/>
<path start="clean_relation"/> <path start="clean_relation"/>
</fork> </fork>
@ -390,6 +391,41 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="clean_person">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Clean person</name>
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=2000
</spark-opts>
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--contextId</arg><arg>${contextId}</arg>
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
<arg>--country</arg><arg>${country}</arg>
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
<arg>--deepClean</arg><arg>${shouldClean}</arg>
</spark>
<ok to="wait_clean"/>
<error to="Kill"/>
</action>
<action name="clean_relation"> <action name="clean_relation">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -102,6 +102,7 @@
<path start="import_datasource"/> <path start="import_datasource"/>
<path start="import_organization"/> <path start="import_organization"/>
<path start="import_project"/> <path start="import_project"/>
<path start="import_person"/>
<path start="import_relation"/> <path start="import_relation"/>
</fork> </fork>
@ -308,6 +309,35 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="import_person">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Import table person</name>
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
--conf spark.sql.shuffle.partitions=1000
</spark-opts>
<arg>--inputPath</arg><arg>${inputPath}/person</arg>
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="join_import"/>
<error to="Kill"/>
</action>
<action name="import_relation"> <action name="import_relation">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>

View File

@ -32,4 +32,11 @@ class ORCIDAuthorMatchersTest {
// assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented // assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented
} }
@Test def testDocumentationNames(): Unit = {
assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones"))
}
@Test def testDocumentationNames2(): Unit = {
assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller Jones"))
}
} }

View File

@ -231,6 +231,14 @@ public class CreateRelatedEntitiesJob_phase1 {
if (!f.isEmpty()) { if (!f.isEmpty()) {
re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList())); re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList()));
} }
break;
case person:
final Person person = (Person) entity;
re.setGivenName(person.getGivenName());
re.setFamilyName(person.getFamilyName());
re.setAlternativeNames(person.getAlternativeNames());
break; break;
} }
return re; return re;

View File

@ -2,10 +2,12 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits.MAX_RELATIONS_BY_RELCLASS;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
@ -15,11 +17,13 @@ import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -27,11 +31,13 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper; import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper; import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory; import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import eu.dnetlib.dhp.schema.solr.SolrRecord; import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
@ -124,6 +130,9 @@ public class PayloadConverterJob {
.map(Oaf::getDataInfo) .map(Oaf::getDataInfo)
.map(DataInfo::getDeletedbyinference) .map(DataInfo::getDeletedbyinference)
.orElse(false)) .orElse(false))
.map(
(MapFunction<JoinedEntity, JoinedEntity>) PayloadConverterJob::pruneRelatedEntities,
Encoders.kryo(JoinedEntity.class))
.map( .map(
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>( (MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
recordFactory.build(je, validateXML), recordFactory.build(je, validateXML),
@ -139,6 +148,32 @@ public class PayloadConverterJob {
.json(outputPath); .json(outputPath);
} }
/**
* This function iterates through the RelatedEntityWrapper(s) associated to the JoinedEntity and rules out
* those exceeding the maximum allowed frequency defined in eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits#MAX_RELATIONS_BY_RELCLASS
*/
private static JoinedEntity pruneRelatedEntities(JoinedEntity je) {
Map<String, Long> freqs = Maps.newHashMap();
List<RelatedEntityWrapper> rew = Lists.newArrayList();
if (je.getLinks() != null) {
je.getLinks().forEach(link -> {
final String relClass = link.getRelation().getRelClass();
final Long count = freqs.getOrDefault(relClass, 0L);
final Long max = MAX_RELATIONS_BY_RELCLASS.getOrDefault(relClass, Long.MAX_VALUE);
if (count <= max) {
rew.add(link);
freqs.put(relClass, freqs.getOrDefault(relClass, 0L) + 1);
}
});
je.setLinks(rew);
}
return je;
}
private static void removeOutputDir(final SparkSession spark, final String path) { private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }

View File

@ -37,6 +37,8 @@ import eu.dnetlib.dhp.schema.solr.Measure;
import eu.dnetlib.dhp.schema.solr.OpenAccessColor; import eu.dnetlib.dhp.schema.solr.OpenAccessColor;
import eu.dnetlib.dhp.schema.solr.OpenAccessRoute; import eu.dnetlib.dhp.schema.solr.OpenAccessRoute;
import eu.dnetlib.dhp.schema.solr.Organization; import eu.dnetlib.dhp.schema.solr.Organization;
import eu.dnetlib.dhp.schema.solr.Person;
import eu.dnetlib.dhp.schema.solr.PersonTopic;
import eu.dnetlib.dhp.schema.solr.Pid; import eu.dnetlib.dhp.schema.solr.Pid;
import eu.dnetlib.dhp.schema.solr.Project; import eu.dnetlib.dhp.schema.solr.Project;
import eu.dnetlib.dhp.schema.solr.Result; import eu.dnetlib.dhp.schema.solr.Result;
@ -89,6 +91,8 @@ public class ProvisionModelSupport {
r.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e)); r.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e));
} else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) { } else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) {
r.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs)); r.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs));
} else if (e instanceof eu.dnetlib.dhp.schema.oaf.Person) {
r.setPerson(mapPerson((eu.dnetlib.dhp.schema.oaf.Person) e));
} }
r r
.setLinks( .setLinks(
@ -185,6 +189,18 @@ public class ProvisionModelSupport {
return ps; return ps;
} }
private static Person mapPerson(eu.dnetlib.dhp.schema.oaf.Person p) {
Person ps = new Person();
ps.setFamilyName(p.getFamilyName());
ps.setGivenName(p.getGivenName());
ps.setAlternativeNames(p.getAlternativeNames());
ps.setBiography(p.getBiography());
ps.setConsent(p.getConsent());
// ps.setSubject(...));
return ps;
}
private static Funding mapFunding(List<String> fundingtree, VocabularyGroup vocs) { private static Funding mapFunding(List<String> fundingtree, VocabularyGroup vocs) {
SAXReader reader = new SAXReader(); SAXReader reader = new SAXReader();
return Optional return Optional

View File

@ -51,6 +51,11 @@ public class RelatedEntity implements Serializable {
private Qualifier contracttype; private Qualifier contracttype;
private List<String> fundingtree; private List<String> fundingtree;
// person
private String givenName;
private String familyName;
private List<String> alternativeNames;
public String getId() { public String getId() {
return id; return id;
} }
@ -251,6 +256,30 @@ public class RelatedEntity implements Serializable {
this.fundingtree = fundingtree; this.fundingtree = fundingtree;
} }
public String getGivenName() {
return givenName;
}
public void setGivenName(String givenName) {
this.givenName = givenName;
}
public String getFamilyName() {
return familyName;
}
public void setFamilyName(String familyName) {
this.familyName = familyName;
}
public List<String> getAlternativeNames() {
return alternativeNames;
}
public void setAlternativeNames(List<String> alternativeNames) {
this.alternativeNames = alternativeNames;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) if (this == o)
@ -280,7 +309,10 @@ public class RelatedEntity implements Serializable {
&& Objects.equal(code, that.code) && Objects.equal(code, that.code)
&& Objects.equal(acronym, that.acronym) && Objects.equal(acronym, that.acronym)
&& Objects.equal(contracttype, that.contracttype) && Objects.equal(contracttype, that.contracttype)
&& Objects.equal(fundingtree, that.fundingtree); && Objects.equal(fundingtree, that.fundingtree)
&& Objects.equal(givenName, that.givenName)
&& Objects.equal(familyName, that.familyName)
&& Objects.equal(alternativeNames, that.alternativeNames);
} }
@Override @Override
@ -309,6 +341,9 @@ public class RelatedEntity implements Serializable {
code, code,
acronym, acronym,
contracttype, contracttype,
fundingtree); fundingtree,
familyName,
givenName,
alternativeNames);
} }
} }

View File

@ -20,7 +20,6 @@ import javax.xml.transform.*;
import javax.xml.transform.dom.DOMSource; import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamResult;
import eu.dnetlib.dhp.oa.provision.model.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
@ -42,6 +41,7 @@ import com.google.common.collect.Sets;
import com.mycila.xmltool.XMLDoc; import com.mycila.xmltool.XMLDoc;
import com.mycila.xmltool.XMLTag; import com.mycila.xmltool.XMLTag;
import eu.dnetlib.dhp.oa.provision.model.*;
import eu.dnetlib.dhp.schema.common.*; import eu.dnetlib.dhp.schema.common.*;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
@ -1035,6 +1035,48 @@ public class XmlRecordFactory implements Serializable {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
break;
case person:
final Person person = (Person) entity;
if (person.getGivenName() != null) {
metadata.add(XmlSerializationUtils.asXmlElement("givenname", person.getGivenName()));
}
if (person.getFamilyName() != null) {
metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName()));
}
if (person.getAlternativeNames() != null) {
metadata
.addAll(
person
.getAlternativeNames()
.stream()
.map(altName -> XmlSerializationUtils.asXmlElement("alternativename", altName))
.collect(Collectors.toList()));
}
if (person.getBiography() != null) {
metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography()));
}
if (person.getSubject() != null) {
metadata
.addAll(
person
.getSubject()
.stream()
.map(pt -> {
List<Tuple2<String, String>> attrs = Lists.newArrayList();
attrs.add(new Tuple2<>("schema", pt.getSchema()));
attrs.add(new Tuple2<>("value", pt.getValue()));
attrs.add(new Tuple2<>("fromYear", String.valueOf(pt.getFromYear())));
attrs.add(new Tuple2<>("toYear", String.valueOf(pt.getToYear())));
return XmlSerializationUtils.asXmlElement("subject", attrs);
})
.collect(Collectors.toList()));
}
if (person.getConsent() != null) {
metadata.add(XmlSerializationUtils.asXmlElement("consent", String.valueOf(person.getConsent())));
}
break; break;
default: default:
throw new IllegalArgumentException("invalid entity type: " + type); throw new IllegalArgumentException("invalid entity type: " + type);
@ -1240,6 +1282,25 @@ public class XmlRecordFactory implements Serializable {
.collect(Collectors.toList())); .collect(Collectors.toList()));
} }
break; break;
case person:
if (isNotBlank(re.getGivenName())) {
metadata.add(XmlSerializationUtils.asXmlElement("givenname", re.getGivenName()));
}
if (isNotBlank(re.getFamilyName())) {
metadata.add(XmlSerializationUtils.asXmlElement("familyname", re.getFamilyName()));
}
if (re.getAlternativeNames() != null && !re.getAlternativeNames().isEmpty()) {
metadata
.addAll(
re
.getAlternativeNames()
.stream()
.map(name -> XmlSerializationUtils.asXmlElement("alternativename", name))
.collect(Collectors.toList()));
}
break;
default: default:
throw new IllegalArgumentException("invalid target type: " + targetType); throw new IllegalArgumentException("invalid target type: " + targetType);
} }

View File

@ -180,6 +180,7 @@
<path start="join_relation_datasource"/> <path start="join_relation_datasource"/>
<path start="join_relation_organization"/> <path start="join_relation_organization"/>
<path start="join_relation_project"/> <path start="join_relation_project"/>
<path start="join_relation_person"/>
</fork> </fork>
<action name="join_relation_publication"> <action name="join_relation_publication">
@ -378,6 +379,34 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_relation_person">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = person.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/person</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="wait_joins" to="fork_join_all_entities"/> <join name="wait_joins" to="fork_join_all_entities"/>
<fork name="fork_join_all_entities"> <fork name="fork_join_all_entities">
@ -388,6 +417,7 @@
<path start="join_datasource_relations"/> <path start="join_datasource_relations"/>
<path start="join_organization_relations"/> <path start="join_organization_relations"/>
<path start="join_project_relations"/> <path start="join_project_relations"/>
<path start="join_person_relations"/>
</fork> </fork>
<action name="join_publication_relations"> <action name="join_publication_relations">
@ -593,6 +623,35 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="join_person_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[person.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/person</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/person</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="create_payloads"/> <join name="wait_join_phase2" to="create_payloads"/>
<action name="create_payloads"> <action name="create_payloads">

View File

@ -937,7 +937,7 @@
<commons.logging.version>1.1.3</commons.logging.version> <commons.logging.version>1.1.3</commons.logging.version>
<commons-validator.version>1.7</commons-validator.version> <commons-validator.version>1.7</commons-validator.version>
<dateparser.version>1.0.7</dateparser.version> <dateparser.version>1.0.7</dateparser.version>
<dhp-schemas.version>[7.0.1]</dhp-schemas.version> <dhp-schemas.version>[7.0.2]</dhp-schemas.version>
<dhp.cdh.version>cdh5.9.2</dhp.cdh.version> <dhp.cdh.version>cdh5.9.2</dhp.cdh.version>
<dhp.commons.lang.version>3.5</dhp.commons.lang.version> <dhp.commons.lang.version>3.5</dhp.commons.lang.version>
<dhp.guava.version>11.0.2</dhp.guava.version> <dhp.guava.version>11.0.2</dhp.guava.version>