merged person_through_the_graph & code formatting
This commit is contained in:
commit
e4abe55988
|
@ -1,5 +1,5 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
package eu.dnetlib.dhp.common.person;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
@ -61,7 +61,7 @@ public class CoAuthorshipIterator implements Iterator<Relation> {
|
|||
private Relation getRelation(String orcid1, String orcid2) {
|
||||
String source = PERSON_PREFIX + IdentifierFactory.md5(orcid1);
|
||||
String target = PERSON_PREFIX + IdentifierFactory.md5(orcid2);
|
||||
return OafMapperUtils
|
||||
Relation relation = OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
|
@ -76,5 +76,7 @@ public class CoAuthorshipIterator implements Iterator<Relation> {
|
|||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null);
|
||||
relation.setValidated(true);
|
||||
return relation;
|
||||
}
|
||||
}
|
|
@ -1,12 +1,9 @@
|
|||
|
||||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
package eu.dnetlib.dhp.common.person;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class Coauthors implements Serializable {
|
||||
private List<String> coauthors;
|
||||
|
|
@ -363,6 +363,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
|||
// nothing to clean here
|
||||
} else if (value instanceof Project) {
|
||||
// nothing to clean here
|
||||
} else if (value instanceof Person) {
|
||||
// nothing to clean here
|
||||
} else if (value instanceof Organization) {
|
||||
Organization o = (Organization) value;
|
||||
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
|
||||
|
|
|
@ -1,6 +1,12 @@
|
|||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
||||
public class ModelHardLimits {
|
||||
|
||||
private ModelHardLimits() {
|
||||
|
@ -20,6 +26,12 @@ public class ModelHardLimits {
|
|||
public static final int MAX_ABSTRACT_LENGTH = 150000;
|
||||
public static final int MAX_RELATED_ABSTRACT_LENGTH = 500;
|
||||
public static final int MAX_INSTANCES = 10;
|
||||
public static final Map<String, Long> MAX_RELATIONS_BY_RELCLASS = Maps.newHashMap();
|
||||
|
||||
static {
|
||||
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.PERSON_PERSON_HASCOAUTHORED, 500L);
|
||||
MAX_RELATIONS_BY_RELCLASS.put(ModelConstants.RESULT_PERSON_HASAUTHORED, 500L);
|
||||
}
|
||||
|
||||
public static String getCollectionName(String format) {
|
||||
return format + SEPARATOR + LAYOUT + SEPARATOR + INTERPRETATION;
|
||||
|
|
|
@ -151,12 +151,17 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
SparkSession spark, String path, Class<G> rowClazz) {
|
||||
logger.info("Reading graph table from path: {}", path);
|
||||
|
||||
if (HdfsSupport.exists(path, spark.sparkContext().hadoopConfiguration())) {
|
||||
return spark
|
||||
.read()
|
||||
.textFile(path)
|
||||
.map(
|
||||
(MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz),
|
||||
Encoders.bean(rowClazz));
|
||||
} else {
|
||||
logger.info("Found empty graph table from path: {}", path);
|
||||
return spark.emptyDataset(Encoders.bean(rowClazz));
|
||||
}
|
||||
}
|
||||
|
||||
private static <A extends Oaf> Dataset<A> readActionPayload(
|
||||
|
@ -223,7 +228,7 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
rowClazz,
|
||||
actionPayloadClazz);
|
||||
|
||||
if (shouldGroupById) {
|
||||
if (Boolean.TRUE.equals(shouldGroupById)) {
|
||||
return PromoteActionPayloadFunctions
|
||||
.groupGraphTableByIdAndMerge(
|
||||
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
|
||||
|
@ -250,6 +255,8 @@ public class PromoteActionPayloadForGraphTableJob {
|
|||
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Relation());
|
||||
case "eu.dnetlib.dhp.schema.oaf.Software":
|
||||
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Software());
|
||||
case "eu.dnetlib.dhp.schema.oaf.Person":
|
||||
return () -> clazz.cast(new eu.dnetlib.dhp.schema.oaf.Person());
|
||||
default:
|
||||
throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ public class PromoteActionPayloadFunctions {
|
|||
PromoteAction.Strategy promoteActionStrategy,
|
||||
Class<G> rowClazz,
|
||||
Class<A> actionPayloadClazz) {
|
||||
if (!isSubClass(rowClazz, actionPayloadClazz)) {
|
||||
if (Boolean.FALSE.equals(isSubClass(rowClazz, actionPayloadClazz))) {
|
||||
throw new RuntimeException(
|
||||
"action payload type must be the same or be a super type of table row type");
|
||||
}
|
||||
|
|
|
@ -7,3 +7,4 @@ promote_action_payload_for_project_table classpath eu/dnetlib/dhp/actionmanager/
|
|||
promote_action_payload_for_publication_table classpath eu/dnetlib/dhp/actionmanager/wf/publication/oozie_app
|
||||
promote_action_payload_for_relation_table classpath eu/dnetlib/dhp/actionmanager/wf/relation/oozie_app
|
||||
promote_action_payload_for_software_table classpath eu/dnetlib/dhp/actionmanager/wf/software/oozie_app
|
||||
promote_action_payload_for_person_table classpath eu/dnetlib/dhp/actionmanager/wf/person/oozie_app
|
||||
|
|
|
@ -148,6 +148,7 @@
|
|||
<path start="PromoteActionPayloadForPublicationTable"/>
|
||||
<path start="PromoteActionPayloadForRelationTable"/>
|
||||
<path start="PromoteActionPayloadForSoftwareTable"/>
|
||||
<path start="PromoteActionPayloadForPersonTable"/>
|
||||
</fork>
|
||||
|
||||
<action name="PromoteActionPayloadForDatasetTable">
|
||||
|
@ -270,6 +271,21 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="PromoteActionPayloadForPersonTable">
|
||||
<sub-workflow>
|
||||
<app-path>${wf:appPath()}/promote_action_payload_for_person_table</app-path>
|
||||
<propagate-configuration/>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>inputActionPayloadRootPath</name>
|
||||
<value>${workingDir}/action_payload_by_type</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</sub-workflow>
|
||||
<ok to="JoinPromote"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="JoinPromote" to="End"/>
|
||||
|
||||
<end name="End"/>
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
<workflow-app name="promote_action_payload_for_person_table" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>activePromotePersonActionPayload</name>
|
||||
<description>when true will promote actions with eu.dnetlib.dhp.schema.oaf.Person payload</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputGraphRootPath</name>
|
||||
<description>root location of input materialized graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>inputActionPayloadRootPath</name>
|
||||
<description>root location of action payloads to promote</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>outputGraphRootPath</name>
|
||||
<description>root location for output materialized graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>mergeAndGetStrategy</name>
|
||||
<description>strategy for merging graph table objects with action payload instances, MERGE_FROM_AND_GET or SELECT_NEWER_AND_GET</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozieActionShareLibForSpark2</name>
|
||||
<description>oozie action sharelib for spark 2.*</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
<description>spark 2.* extra listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
<description>spark 2.* sql query execution listeners classname</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<description>spark 2.* yarn history server address</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<description>spark 2.* event log dir location</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="DecisionPromotePersonActionPayload"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<decision name="DecisionPromotePersonActionPayload">
|
||||
<switch>
|
||||
<case to="PromotePersonActionPayloadForPersonTable">
|
||||
${(activePromotePersonActionPayload eq "true") and
|
||||
(fs:exists(concat(concat(concat(concat(wf:conf('nameNode'),'/'),wf:conf('inputActionPayloadRootPath')),'/'),'clazz=eu.dnetlib.dhp.schema.oaf.Person')) eq "true")}
|
||||
</case>
|
||||
<default to="SkipPromotePersonActionPayloadForPersonTable"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<action name="PromotePersonActionPayloadForPersonTable">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PromotePersonActionPayloadForPersonTable</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob</class>
|
||||
<jar>dhp-actionmanager-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--inputGraphTablePath</arg><arg>${inputGraphRootPath}/person</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--inputActionPayloadPath</arg><arg>${inputActionPayloadRootPath}/clazz=eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--actionPayloadClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--outputGraphTablePath</arg><arg>${outputGraphRootPath}/person</arg>
|
||||
<arg>--mergeAndGetStrategy</arg><arg>${mergeAndGetStrategy}</arg>
|
||||
<arg>--promoteActionStrategy</arg><arg>${promoteActionStrategy}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="SkipPromotePersonActionPayloadForPersonTable">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<prepare>
|
||||
<delete path="${outputGraphRootPath}/person"/>
|
||||
</prepare>
|
||||
<arg>-pb</arg>
|
||||
<arg>${inputGraphRootPath}/person</arg>
|
||||
<arg>${outputGraphRootPath}/person</arg>
|
||||
</distcp>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -2,16 +2,23 @@
|
|||
package eu.dnetlib.dhp.actionmanager.personentity;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static org.apache.spark.sql.functions.*;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.Serializable;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.BZip2Codec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
|
@ -21,6 +28,7 @@ import org.apache.spark.sql.*;
|
|||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.spark_project.jetty.util.StringUtil;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
|
@ -28,10 +36,14 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|||
import eu.dnetlib.dhp.collection.orcid.model.Author;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Employment;
|
||||
import eu.dnetlib.dhp.collection.orcid.model.Work;
|
||||
import eu.dnetlib.dhp.common.DbClient;
|
||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
|
||||
import eu.dnetlib.dhp.common.person.Coauthors;
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Person;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
@ -44,7 +56,7 @@ import scala.Tuple2;
|
|||
|
||||
public class ExtractPerson implements Serializable {
|
||||
private static final Logger log = LoggerFactory.getLogger(ExtractPerson.class);
|
||||
|
||||
private static final String QUERY = "SELECT * FROM project_person WHERE pid_type = 'ORCID'";
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final String OPENAIRE_PREFIX = "openaire____";
|
||||
private static final String SEPARATOR = "::";
|
||||
|
@ -61,6 +73,41 @@ public class ExtractPerson implements Serializable {
|
|||
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||
public static final String ORCID_AUTHORS_CLASSID = "sysimport:crosswalk:orcid";
|
||||
public static final String ORCID_AUTHORS_CLASSNAME = "Imported from ORCID";
|
||||
public static final String FUNDER_AUTHORS_CLASSID = "sysimport:crosswalk:funderdatabase";
|
||||
public static final String FUNDER_AUTHORS_CLASSNAME = "Imported from Funder Database";
|
||||
public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556";
|
||||
public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE";
|
||||
|
||||
public static List<KeyValue> collectedfromOpenAIRE = OafMapperUtils
|
||||
.listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME);
|
||||
|
||||
public static final DataInfo ORCIDDATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID,
|
||||
ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91");
|
||||
|
||||
public static final DataInfo FUNDERDATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
FUNDER_AUTHORS_CLASSID,
|
||||
FUNDER_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91");
|
||||
|
||||
public static void main(final String[] args) throws IOException, ParseException {
|
||||
|
||||
|
@ -91,19 +138,130 @@ public class ExtractPerson implements Serializable {
|
|||
final String workingDir = parser.get("workingDir");
|
||||
log.info("workingDir {}", workingDir);
|
||||
|
||||
final String dbUrl = parser.get("postgresUrl");
|
||||
final String dbUser = parser.get("postgresUser");
|
||||
final String dbPassword = parser.get("postgresPassword");
|
||||
|
||||
final String hdfsNameNode = parser.get("hdfsNameNode");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||
createActionSet(spark, inputPath, outputPath, workingDir);
|
||||
extractInfoForActionSetFromORCID(spark, inputPath, workingDir);
|
||||
extractInfoForActionSetFromProjects(
|
||||
spark, inputPath, workingDir, dbUrl, dbUser, dbPassword, workingDir + "/project", hdfsNameNode);
|
||||
createActionSet(spark, outputPath, workingDir);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static void createActionSet(SparkSession spark, String inputPath, String outputPath, String workingDir) {
|
||||
private static void extractInfoForActionSetFromProjects(SparkSession spark, String inputPath, String workingDir,
|
||||
String dbUrl, String dbUser, String dbPassword, String hdfsPath, String hdfsNameNode) throws IOException {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.set("fs.defaultFS", hdfsNameNode);
|
||||
|
||||
FileSystem fileSystem = FileSystem.get(conf);
|
||||
Path hdfsWritePath = new Path(hdfsPath);
|
||||
FSDataOutputStream fos = fileSystem.create(hdfsWritePath);
|
||||
try (DbClient dbClient = new DbClient(dbUrl, dbUser, dbPassword)) {
|
||||
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8))) {
|
||||
dbClient.processResults(QUERY, rs -> writeRelation(getRelationWithProject(rs), writer));
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Relation getRelationWithProject(ResultSet rs) {
|
||||
try {
|
||||
return getProjectRelation(
|
||||
rs.getString("project"), rs.getString("pid"),
|
||||
rs.getString("role"));
|
||||
} catch (final SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Relation getProjectRelation(String project, String orcid, String role) {
|
||||
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||
String target = project.substring(0, 14)
|
||||
+ IdentifierFactory.md5(project.substring(15));
|
||||
List<KeyValue> properties = new ArrayList<>();
|
||||
|
||||
Relation relation = OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.PROJECT_PERSON_RELTYPE, ModelConstants.PROJECT_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PROJECT_PERSON_PARTICIPATES,
|
||||
collectedfromOpenAIRE,
|
||||
FUNDERDATAINFO,
|
||||
null);
|
||||
relation.setValidated(true);
|
||||
|
||||
if (StringUtil.isNotBlank(role)) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("role");
|
||||
kv.setValue(role);
|
||||
properties.add(kv);
|
||||
}
|
||||
|
||||
if (!properties.isEmpty())
|
||||
relation.setProperties(properties);
|
||||
return relation;
|
||||
|
||||
}
|
||||
|
||||
protected static void writeRelation(final Relation relation, BufferedWriter writer) {
|
||||
try {
|
||||
writer.write(OBJECT_MAPPER.writeValueAsString(relation));
|
||||
writer.newLine();
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void createActionSet(SparkSession spark, String outputPath, String workingDir) {
|
||||
|
||||
Dataset<Person> people;
|
||||
people = spark
|
||||
.read()
|
||||
.textFile(workingDir + "/people")
|
||||
.map(
|
||||
(MapFunction<String, Person>) value -> OBJECT_MAPPER
|
||||
.readValue(value, Person.class),
|
||||
Encoders.bean(Person.class));
|
||||
|
||||
people
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/coauthorship")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/affiliation")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/project")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||
}
|
||||
|
||||
private static void extractInfoForActionSetFromORCID(SparkSession spark, String inputPath, String workingDir) {
|
||||
Dataset<Author> authors = spark
|
||||
.read()
|
||||
.parquet(inputPath + "Authors")
|
||||
|
@ -129,18 +287,13 @@ public class ExtractPerson implements Serializable {
|
|||
.parquet(inputPath + "Employments")
|
||||
.as(Encoders.bean(Employment.class));
|
||||
|
||||
Dataset<Author> peopleToMap = authors
|
||||
.joinWith(works, authors.col("orcid").equalTo(works.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Author, Work>, Author>) t2 -> t2._1(), Encoders.bean(Author.class))
|
||||
.groupByKey((MapFunction<Author, String>) a -> a.getOrcid(), Encoders.STRING())
|
||||
.mapGroups((MapGroupsFunction<String, Author, Author>) (k, it) -> it.next(), Encoders.bean(Author.class));
|
||||
|
||||
Dataset<Employment> employment = employmentDataset
|
||||
.joinWith(peopleToMap, employmentDataset.col("orcid").equalTo(peopleToMap.col("orcid")))
|
||||
.joinWith(authors, employmentDataset.col("orcid").equalTo(authors.col("orcid")))
|
||||
.map((MapFunction<Tuple2<Employment, Author>, Employment>) t2 -> t2._1(), Encoders.bean(Employment.class));
|
||||
|
||||
Dataset<Person> people;
|
||||
peopleToMap.map((MapFunction<Author, Person>) op -> {
|
||||
// Mapping all the orcid profiles even if the profile has no visible works
|
||||
|
||||
authors.map((MapFunction<Author, Person>) op -> {
|
||||
Person person = new Person();
|
||||
person.setId(DHPUtils.generateIdentifier(op.getOrcid(), PERSON_PREFIX));
|
||||
person
|
||||
|
@ -193,6 +346,7 @@ public class ExtractPerson implements Serializable {
|
|||
ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, null));
|
||||
person.setDateofcollection(op.getLastModifiedDate());
|
||||
person.setOriginalId(Arrays.asList(op.getOrcid()));
|
||||
person.setDataInfo(ORCIDDATAINFO);
|
||||
return person;
|
||||
}, Encoders.bean(Person.class))
|
||||
.write()
|
||||
|
@ -246,34 +400,6 @@ public class ExtractPerson implements Serializable {
|
|||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingDir + "/affiliation");
|
||||
|
||||
people = spark
|
||||
.read()
|
||||
.textFile(workingDir + "/people")
|
||||
.map(
|
||||
(MapFunction<String, Person>) value -> OBJECT_MAPPER
|
||||
.readValue(value, Person.class),
|
||||
Encoders.bean(Person.class));
|
||||
|
||||
people.show(false);
|
||||
people
|
||||
.toJavaRDD()
|
||||
.map(p -> new AtomicAction(p.getClass(), p))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/authorship").toJavaRDD().map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/coauthorship")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.union(
|
||||
getRelations(spark, workingDir + "/affiliation")
|
||||
.toJavaRDD()
|
||||
.map(r -> new AtomicAction(r.getClass(), r)))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
|
||||
}
|
||||
|
||||
private static Dataset<Relation> getRelations(SparkSession spark, String path) {
|
||||
|
@ -307,23 +433,17 @@ public class ExtractPerson implements Serializable {
|
|||
source, target, ModelConstants.ORG_PERSON_RELTYPE, ModelConstants.ORG_PERSON_SUBRELTYPE,
|
||||
ModelConstants.ORG_PERSON_PARTICIPATES,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
ORCIDDATAINFO,
|
||||
null);
|
||||
relation.setValidated(true);
|
||||
|
||||
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtils.isNotBlank(row.getStartDate())) {
|
||||
if (Optional.ofNullable(row.getStartDate()).isPresent() && StringUtil.isNotBlank(row.getStartDate())) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("startDate");
|
||||
kv.setValue(row.getStartDate());
|
||||
properties.add(kv);
|
||||
}
|
||||
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtils.isNotBlank(row.getEndDate())) {
|
||||
if (Optional.ofNullable(row.getEndDate()).isPresent() && StringUtil.isNotBlank(row.getEndDate())) {
|
||||
KeyValue kv = new KeyValue();
|
||||
kv.setKey("endDate");
|
||||
kv.setValue(row.getEndDate());
|
||||
|
@ -336,45 +456,6 @@ public class ExtractPerson implements Serializable {
|
|||
|
||||
}
|
||||
|
||||
private static Collection<? extends Relation> getCoAuthorshipRelations(String orcid1, String orcid2) {
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid1);
|
||||
String target = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid2);
|
||||
|
||||
return Arrays
|
||||
.asList(
|
||||
OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null),
|
||||
OafMapperUtils
|
||||
.getRelation(
|
||||
target, source, ModelConstants.PERSON_PERSON_RELTYPE,
|
||||
ModelConstants.PERSON_PERSON_SUBRELTYPE,
|
||||
ModelConstants.PERSON_PERSON_HASCOAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
null));
|
||||
|
||||
}
|
||||
|
||||
private static @NotNull Iterator<Relation> getAuthorshipRelationIterator(Work w) {
|
||||
|
||||
if (Optional.ofNullable(w.getPids()).isPresent())
|
||||
|
@ -417,21 +498,15 @@ public class ExtractPerson implements Serializable {
|
|||
default:
|
||||
return null;
|
||||
}
|
||||
|
||||
return OafMapperUtils
|
||||
Relation relation = OafMapperUtils
|
||||
.getRelation(
|
||||
source, target, ModelConstants.RESULT_PERSON_RELTYPE,
|
||||
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||
Arrays.asList(OafMapperUtils.keyValue(orcidKey, ModelConstants.ORCID_DS)),
|
||||
OafMapperUtils
|
||||
.dataInfo(
|
||||
false, null, false, false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ORCID_AUTHORS_CLASSID, ORCID_AUTHORS_CLASSNAME, ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.91"),
|
||||
ORCIDDATAINFO,
|
||||
null);
|
||||
relation.setValidated(true);
|
||||
return relation;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,5 +21,30 @@
|
|||
"paramLongName": "workingDir",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "pu",
|
||||
"paramLongName": "postgresUrl",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "ps",
|
||||
"paramLongName": "postgresUser",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "pp",
|
||||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
},{
|
||||
"paramName": "nn",
|
||||
"paramLongName": "hdfsNameNode",
|
||||
"paramDescription": "the hdfs name node",
|
||||
"paramRequired": false
|
||||
}
|
||||
|
||||
]
|
||||
|
|
|
@ -1,2 +1,5 @@
|
|||
inputPath=/data/orcid_2023/tables/
|
||||
outputPath=/user/miriam.baglioni/peopleAS
|
||||
postgresUrl=jdbc:postgresql://beta.services.openaire.eu:5432/dnet_openaireplus
|
||||
postgresUser=dnet
|
||||
postgresPassword=dnetPwd
|
|
@ -9,6 +9,18 @@
|
|||
<name>outputPath</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUrl</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresUser</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>postgresPassword</name>
|
||||
<description>the path where to store the actionset</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
|
@ -102,6 +114,10 @@
|
|||
<arg>--inputPath</arg><arg>${inputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--workingDir</arg><arg>${workingDir}</arg>
|
||||
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
|
||||
<arg>--postgresUrl</arg><arg>${postgresUrl}</arg>
|
||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
<path start="copy_software"/>
|
||||
<path start="copy_datasource"/>
|
||||
<path start="copy_project"/>
|
||||
<path start="copy_person"/>
|
||||
<path start="copy_organization"/>
|
||||
</fork>
|
||||
|
||||
|
@ -120,6 +121,15 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_person">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/person</arg>
|
||||
<arg>${nameNode}/${outputPath}/person</arg>
|
||||
</distcp>
|
||||
<ok to="wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_datasource">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${sourcePath}/datasource</arg>
|
||||
|
|
|
@ -48,12 +48,7 @@
|
|||
<groupId>io.github.classgraph</groupId>
|
||||
<artifactId>classgraph</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>eu.dnetlib.dhp</groupId>
|
||||
<artifactId>dhp-aggregation</artifactId>
|
||||
<version>1.2.5-SNAPSHOT</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
|
|
@ -0,0 +1,302 @@
|
|||
|
||||
package eu.dnetlib.dhp.person;
|
||||
|
||||
import static com.ibm.icu.text.PluralRules.Operand.w;
|
||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
|
||||
import eu.dnetlib.dhp.common.person.Coauthors;
|
||||
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class SparkExtractPersonRelations {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
|
||||
private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
|
||||
|
||||
public static final DataInfo DATAINFO = OafMapperUtils
|
||||
.dataInfo(
|
||||
false,
|
||||
"openaire",
|
||||
true,
|
||||
false,
|
||||
OafMapperUtils
|
||||
.qualifier(
|
||||
ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY,
|
||||
ModelConstants.SYSIMPORT_CROSSWALK_REPOSITORY,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS,
|
||||
ModelConstants.DNET_PROVENANCE_ACTIONS),
|
||||
"0.85");
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
SparkCountryPropagationJob.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
String sourcePath = parser.get("sourcePath");
|
||||
log.info("sourcePath: {}", sourcePath);
|
||||
|
||||
final String workingPath = parser.get("outputPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
|
||||
extractRelations(
|
||||
spark,
|
||||
sourcePath,
|
||||
workingPath);
|
||||
removeIsolatedPerson(spark, sourcePath, workingPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) {
|
||||
Dataset<Person> personDataset = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Person.class).schema())
|
||||
.json(sourcePath + "person")
|
||||
.as(Encoders.bean(Person.class));
|
||||
|
||||
Dataset<Relation> relationDataset = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Relation.class).schema())
|
||||
.json(sourcePath + "relation")
|
||||
.as(Encoders.bean(Relation.class));
|
||||
|
||||
personDataset
|
||||
.join(relationDataset, personDataset.col("id").equalTo(relationDataset.col("source")), "left_semi")
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(workingPath + "person");
|
||||
|
||||
spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Person.class).schema())
|
||||
.json(workingPath + "person")
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.option("compression", "gzip")
|
||||
.json(sourcePath + "person");
|
||||
}
|
||||
|
||||
private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) {
|
||||
|
||||
Dataset<Tuple2<String, Relation>> relationDataset = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Relation.class).schema())
|
||||
.json(sourcePath + "relation")
|
||||
.as(Encoders.bean(Relation.class))
|
||||
.map(
|
||||
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
|
||||
r.getSource() + r.getRelClass() + r.getTarget(), r),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
|
||||
|
||||
ModelSupport.entityTypes
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(ModelSupport::isResult)
|
||||
.forEach(
|
||||
e -> {
|
||||
// 1. search for results having orcid_pending and orcid in the set of pids for the authors
|
||||
Dataset<Result> resultWithOrcids = spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Result.class).schema())
|
||||
.json(sourcePath + e.name())
|
||||
.as(Encoders.bean(Result.class))
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
||||
!r.getDataInfo().getInvisible() &&
|
||||
Optional
|
||||
.ofNullable(r.getAuthor())
|
||||
.isPresent())
|
||||
.filter(
|
||||
(FilterFunction<Result>) r -> r
|
||||
.getAuthor()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
a -> Optional
|
||||
.ofNullable(
|
||||
a
|
||||
.getPid())
|
||||
.isPresent() &&
|
||||
a
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> Arrays
|
||||
.asList("orcid", "orcid_pending")
|
||||
.contains(p.getQualifier().getClassid().toLowerCase()))));
|
||||
// 2. create authorship relations between the result identifier and the person entity with
|
||||
// orcid_pending.
|
||||
Dataset<Tuple2<String, Relation>> newRelations = resultWithOrcids
|
||||
.flatMap(
|
||||
(FlatMapFunction<Result, Relation>) r -> getAuthorshipRelations(r),
|
||||
Encoders.bean(Relation.class))
|
||||
// .groupByKey((MapFunction<Relation, String>) r-> r.getSource()+r.getTarget(), Encoders.STRING() )
|
||||
// .mapGroups((MapGroupsFunction<String, Relation, Relation>) (k,it) -> it.next(), Encoders.bean(Relation.class) )
|
||||
.map(
|
||||
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
|
||||
r.getSource() + r.getRelClass() + r.getTarget(), r),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
|
||||
newRelations
|
||||
.joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left")
|
||||
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, Relation>>, Relation>) t2 -> {
|
||||
if (t2._2() == null)
|
||||
return t2._1()._2();
|
||||
return null;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.filter((FilterFunction<Relation>) r -> r != null)
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
|
||||
// 2.1 store in a separate location the relation between the person and the pids for the result?
|
||||
|
||||
// 3. create co_authorship relations between the pairs of authors with orcid/orcid_pending pids
|
||||
newRelations = resultWithOrcids
|
||||
.map((MapFunction<Result, Coauthors>) r -> getAuthorsPidList(r), Encoders.bean(Coauthors.class))
|
||||
.flatMap(
|
||||
(FlatMapFunction<Coauthors, Relation>) c -> new CoAuthorshipIterator(c.getCoauthors()),
|
||||
Encoders.bean(Relation.class))
|
||||
.groupByKey(
|
||||
(MapFunction<Relation, String>) r -> r.getSource() + r.getTarget(), Encoders.STRING())
|
||||
.mapGroups(
|
||||
(MapGroupsFunction<String, Relation, Relation>) (k, it) -> it.next(),
|
||||
Encoders.bean(Relation.class))
|
||||
.map(
|
||||
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(
|
||||
r.getSource() + r.getRelClass() + r.getTarget(), r),
|
||||
Encoders.tuple(Encoders.STRING(), Encoders.bean(Relation.class)));
|
||||
newRelations
|
||||
.joinWith(relationDataset, newRelations.col("_1").equalTo(relationDataset.col("_1")), "left")
|
||||
.map((MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, Relation>>, Relation>) t2 -> {
|
||||
if (t2._2() == null)
|
||||
return t2._1()._2();
|
||||
return null;
|
||||
}, Encoders.bean(Relation.class))
|
||||
.filter((FilterFunction<Relation>) r -> r != null)
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.option("compression", "gzip")
|
||||
.json(workingPath);
|
||||
|
||||
});
|
||||
spark
|
||||
.read()
|
||||
.schema(Encoders.bean(Relation.class).schema())
|
||||
.json(workingPath)
|
||||
.write()
|
||||
.mode(SaveMode.Append)
|
||||
.option("compression", "gzip")
|
||||
.json(sourcePath + "relation");
|
||||
|
||||
}
|
||||
|
||||
private static Coauthors getAuthorsPidList(Result r) {
|
||||
Coauthors coauth = new Coauthors();
|
||||
coauth
|
||||
.setCoauthors(
|
||||
r
|
||||
.getAuthor()
|
||||
.stream()
|
||||
.filter(
|
||||
a -> a
|
||||
.getPid()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid())))
|
||||
.map(a -> {
|
||||
Optional<StructuredProperty> tmp = a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid"))
|
||||
.findFirst();
|
||||
if (tmp.isPresent())
|
||||
return tmp.get().getValue();
|
||||
tmp = a
|
||||
.getPid()
|
||||
.stream()
|
||||
.filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))
|
||||
.findFirst();
|
||||
if (tmp.isPresent())
|
||||
return tmp.get().getValue();
|
||||
|
||||
return null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
return coauth;
|
||||
|
||||
}
|
||||
|
||||
private static Iterator<Relation> getAuthorshipRelations(Result r) {
|
||||
List<Relation> relationList = new ArrayList<>();
|
||||
for (Author a : r.getAuthor())
|
||||
|
||||
relationList.addAll(a.getPid().stream().map(p -> {
|
||||
|
||||
if (p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))
|
||||
return getRelation(p.getValue(), r.getId());
|
||||
return null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
return relationList.iterator();
|
||||
}
|
||||
|
||||
private static Relation getRelation(String orcid, String resultId) {
|
||||
|
||||
String source = PERSON_PREFIX + "::" + IdentifierFactory.md5(orcid);
|
||||
|
||||
Relation relation = OafMapperUtils
|
||||
.getRelation(
|
||||
source, resultId, ModelConstants.RESULT_PERSON_RELTYPE,
|
||||
ModelConstants.RESULT_PERSON_SUBRELTYPE,
|
||||
ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||
null, // collectedfrom = null
|
||||
DATAINFO,
|
||||
null);
|
||||
|
||||
return relation;
|
||||
}
|
||||
|
||||
}
|
|
@ -8,3 +8,4 @@ result_project classpath eu/dnetlib/dhp/wf/subworkflows/projecttoresult/oozie_ap
|
|||
community_project classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromproject/oozie_app
|
||||
community_sem_rel classpath eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/oozie_app
|
||||
country_propagation classpath eu/dnetlib/dhp/wf/subworkflows/countrypropagation/oozie_app
|
||||
person_propagation classpath eu/dnetlib/dhp/wf/subworkflows/person/oozie_app
|
|
@ -122,6 +122,7 @@
|
|||
<case to="community_project">${wf:conf('resumeFrom') eq 'CommunityProject'}</case>
|
||||
<case to="community_sem_rel">${wf:conf('resumeFrom') eq 'CommunitySemanticRelation'}</case>
|
||||
<case to="country_propagation">${wf:conf('resumeFrom') eq 'CountryPropagation'}</case>
|
||||
<case to="person_propagation">${wf:conf('resumeFrom') eq 'PersonPropagation'}</case>
|
||||
<default to="orcid_propagation"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
@ -291,10 +292,24 @@
|
|||
</property>
|
||||
</configuration>
|
||||
</sub-workflow>
|
||||
<ok to="person_propagation" />
|
||||
<error to="Kill" />
|
||||
</action>
|
||||
<action name="person_propagation">
|
||||
<sub-workflow>
|
||||
<app-path>${wf:appPath()}/person_propagation
|
||||
</app-path>
|
||||
<propagate-configuration/>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<value>${outputPath}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</sub-workflow>
|
||||
<ok to="country_propagation" />
|
||||
<error to="Kill" />
|
||||
</action>
|
||||
|
||||
<action name="country_propagation">
|
||||
<sub-workflow>
|
||||
<app-path>${wf:appPath()}/country_propagation
|
||||
|
@ -319,6 +334,8 @@
|
|||
<error to="Kill" />
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
<path start="copy_organization"/>
|
||||
<path start="copy_projects"/>
|
||||
<path start="copy_datasources"/>
|
||||
<path start="copy_persons"/>
|
||||
</fork>
|
||||
|
||||
<action name="copy_relation">
|
||||
|
@ -80,6 +81,17 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="copy_persons">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<arg>${nameNode}/${sourcePath}/person</arg>
|
||||
<arg>${nameNode}/${outputPath}/person</arg>
|
||||
</distcp>
|
||||
<ok to="copy_wait"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="copy_wait" to="fork_prepare_assoc_step1"/>
|
||||
|
||||
<fork name="fork_prepare_assoc_step1">
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
[
|
||||
{
|
||||
"paramName":"s",
|
||||
"paramLongName":"sourcePath",
|
||||
"paramDescription": "the path of the sequencial file to read",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "out",
|
||||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path used to store temporary output files",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "ssm",
|
||||
"paramLongName": "isSparkSessionManaged",
|
||||
"paramDescription": "true if the spark session is managed, false otherwise",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -0,0 +1 @@
|
|||
sourcePath=/tmp/miriam/13_graph_copy
|
|
@ -0,0 +1,58 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorNumber</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<value>15G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<value>5G</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<value>4</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2MaxExecutors</name>
|
||||
<value>50</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,68 @@
|
|||
<workflow-app name="person_propagation" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the source path</description>
|
||||
</property>
|
||||
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="reset_outputpath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="reset_outputpath">
|
||||
<fs>
|
||||
<delete path="${workingDir}"/>
|
||||
<mkdir path="${workingDir}"/>
|
||||
</fs>
|
||||
<ok to="extract_person_relation_from_graph"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="extract_person_relation_from_graph">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>personPropagation</name>
|
||||
<class>eu.dnetlib.dhp.person.SparkExtractPersonRelations</class>
|
||||
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.speculation=false
|
||||
--conf spark.hadoop.mapreduce.map.speculative=false
|
||||
--conf spark.hadoop.mapreduce.reduce.speculative=false
|
||||
--conf spark.sql.shuffle.partitions=7680
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
</workflow-app>
|
|
@ -0,0 +1,93 @@
|
|||
|
||||
package eu.dnetlib.dhp.person;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class PersonPropagationJobTest {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PersonPropagationJobTest.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static SparkSession spark;
|
||||
|
||||
private static Path workingDir;
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() throws IOException {
|
||||
workingDir = Files.createTempDirectory(PersonPropagationJobTest.class.getSimpleName());
|
||||
log.info("using work dir {}", workingDir);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.setAppName(PersonPropagationJobTest.class.getSimpleName());
|
||||
|
||||
conf.setMaster("local[*]");
|
||||
conf.set("spark.driver.host", "localhost");
|
||||
conf.set("hive.metastore.local", "true");
|
||||
conf.set("spark.ui.enabled", "false");
|
||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
||||
|
||||
spark = SparkSession
|
||||
.builder()
|
||||
.appName(PersonPropagationJobTest.class.getSimpleName())
|
||||
.config(conf)
|
||||
.getOrCreate();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() throws IOException {
|
||||
FileUtils.deleteDirectory(workingDir.toFile());
|
||||
spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testPersonPropagation() throws Exception {
|
||||
final String sourcePath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/personpropagation/graph")
|
||||
.getPath();
|
||||
|
||||
SparkExtractPersonRelations
|
||||
.main(
|
||||
new String[] {
|
||||
"--isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"--sourcePath", sourcePath,
|
||||
"--outputPath", workingDir.toString()
|
||||
});
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Relation> tmp = sc
|
||||
.textFile(workingDir.toString() + "/relation")
|
||||
.map(item -> OBJECT_MAPPER.readValue(item, Relation.class));
|
||||
|
||||
// TODO write assertions and find relevant information for hte resource files
|
||||
}
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -89,6 +89,14 @@
|
|||
<arg>${nameNode}/${graphPath}/project</arg>
|
||||
<arg>${nameNode}/${targetPath}/project</arg>
|
||||
</distcp>
|
||||
<ok to="copy_person"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
<action name="copy_person">
|
||||
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||
<arg>${nameNode}/${graphPath}/person</arg>
|
||||
<arg>${nameNode}/${targetPath}/person</arg>
|
||||
</distcp>
|
||||
<ok to="copy_relation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
|
|
@ -142,6 +142,7 @@
|
|||
<path start="clean_datasource"/>
|
||||
<path start="clean_organization"/>
|
||||
<path start="clean_project"/>
|
||||
<path start="clean_person"/>
|
||||
<path start="clean_relation"/>
|
||||
</fork>
|
||||
|
||||
|
@ -390,6 +391,41 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="clean_person">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Clean person</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.clean.CleanGraphSparkJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=2000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${graphInputPath}/person</arg>
|
||||
<arg>--outputPath</arg><arg>${graphOutputPath}/person</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--contextId</arg><arg>${contextId}</arg>
|
||||
<arg>--verifyParam</arg><arg>${verifyParam}</arg>
|
||||
<arg>--country</arg><arg>${country}</arg>
|
||||
<arg>--verifyCountryParam</arg><arg>${verifyCountryParam}</arg>
|
||||
<arg>--hostedBy</arg><arg>${workingDir}/working/hostedby</arg>
|
||||
<arg>--collectedfrom</arg><arg>${collectedfrom}</arg>
|
||||
<arg>--masterDuplicatePath</arg><arg>${workingDir}/masterduplicate</arg>
|
||||
<arg>--deepClean</arg><arg>${shouldClean}</arg>
|
||||
</spark>
|
||||
<ok to="wait_clean"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="clean_relation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
|
|
@ -102,6 +102,7 @@
|
|||
<path start="import_datasource"/>
|
||||
<path start="import_organization"/>
|
||||
<path start="import_project"/>
|
||||
<path start="import_person"/>
|
||||
<path start="import_relation"/>
|
||||
</fork>
|
||||
|
||||
|
@ -308,6 +309,35 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="import_person">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Import table person</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.hive.GraphHiveTableImporterJob</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
--conf spark.sql.shuffle.partitions=1000
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${inputPath}/person</arg>
|
||||
<arg>--hiveDbName</arg><arg>${hiveDbName}</arg>
|
||||
<arg>--className</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--hiveMetastoreUris</arg><arg>${hiveMetastoreUris}</arg>
|
||||
<arg>--numPartitions</arg><arg>1000</arg>
|
||||
</spark>
|
||||
<ok to="join_import"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="import_relation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
|
|
@ -16,8 +16,6 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.common.RelationInverse;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
|
@ -32,6 +30,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.common.RelationInverse;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||
|
||||
|
@ -366,6 +366,7 @@ class MigrateDbEntitiesApplicationTest {
|
|||
assertValidId(r1.getCollectedfrom().get(0).getKey());
|
||||
assertValidId(r2.getCollectedfrom().get(0).getKey());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testProcessClaims_affiliation() throws Exception {
|
||||
final List<TypedField> fields = prepareMocks("claimsrel_resultset_affiliation.json");
|
||||
|
|
|
@ -233,6 +233,14 @@ public class CreateRelatedEntitiesJob_phase1 {
|
|||
if (!f.isEmpty()) {
|
||||
re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
case person:
|
||||
final Person person = (Person) entity;
|
||||
|
||||
re.setGivenName(person.getGivenName());
|
||||
re.setFamilyName(person.getFamilyName());
|
||||
re.setAlternativeNames(person.getAlternativeNames());
|
||||
|
||||
break;
|
||||
}
|
||||
return re;
|
||||
|
|
|
@ -2,10 +2,12 @@
|
|||
package eu.dnetlib.dhp.oa.provision;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
import static eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits.MAX_RELATIONS_BY_RELCLASS;
|
||||
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
@ -15,11 +17,13 @@ import org.apache.spark.api.java.function.FilterFunction;
|
|||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
|
@ -27,11 +31,13 @@ import eu.dnetlib.dhp.common.HdfsSupport;
|
|||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
|
||||
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
|
||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
|
||||
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
|
||||
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
|
||||
import eu.dnetlib.dhp.schema.solr.SolrRecord;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
|
@ -124,6 +130,9 @@ public class PayloadConverterJob {
|
|||
.map(Oaf::getDataInfo)
|
||||
.map(DataInfo::getDeletedbyinference)
|
||||
.orElse(false))
|
||||
.map(
|
||||
(MapFunction<JoinedEntity, JoinedEntity>) PayloadConverterJob::pruneRelatedEntities,
|
||||
Encoders.kryo(JoinedEntity.class))
|
||||
.map(
|
||||
(MapFunction<JoinedEntity, Tuple2<String, SolrRecord>>) je -> new Tuple2<>(
|
||||
recordFactory.build(je, validateXML),
|
||||
|
@ -139,6 +148,32 @@ public class PayloadConverterJob {
|
|||
.json(outputPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* This function iterates through the RelatedEntityWrapper(s) associated to the JoinedEntity and rules out
|
||||
* those exceeding the maximum allowed frequency defined in eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits#MAX_RELATIONS_BY_RELCLASS
|
||||
*/
|
||||
private static JoinedEntity pruneRelatedEntities(JoinedEntity je) {
|
||||
Map<String, Long> freqs = Maps.newHashMap();
|
||||
List<RelatedEntityWrapper> rew = Lists.newArrayList();
|
||||
|
||||
if (je.getLinks() != null) {
|
||||
je.getLinks().forEach(link -> {
|
||||
final String relClass = link.getRelation().getRelClass();
|
||||
|
||||
final Long count = freqs.getOrDefault(relClass, 0L);
|
||||
final Long max = MAX_RELATIONS_BY_RELCLASS.getOrDefault(relClass, Long.MAX_VALUE);
|
||||
|
||||
if (count <= max) {
|
||||
rew.add(link);
|
||||
freqs.put(relClass, freqs.getOrDefault(relClass, 0L) + 1);
|
||||
}
|
||||
});
|
||||
je.setLinks(rew);
|
||||
}
|
||||
|
||||
return je;
|
||||
}
|
||||
|
||||
private static void removeOutputDir(final SparkSession spark, final String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ import java.io.StringReader;
|
|||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentException;
|
||||
|
@ -24,6 +23,7 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
|
||||
import eu.dnetlib.dhp.schema.solr.*;
|
||||
import eu.dnetlib.dhp.schema.solr.AccessRight;
|
||||
import eu.dnetlib.dhp.schema.solr.Author;
|
||||
|
@ -38,6 +38,8 @@ import eu.dnetlib.dhp.schema.solr.Measure;
|
|||
import eu.dnetlib.dhp.schema.solr.OpenAccessColor;
|
||||
import eu.dnetlib.dhp.schema.solr.OpenAccessRoute;
|
||||
import eu.dnetlib.dhp.schema.solr.Organization;
|
||||
import eu.dnetlib.dhp.schema.solr.Person;
|
||||
import eu.dnetlib.dhp.schema.solr.PersonTopic;
|
||||
import eu.dnetlib.dhp.schema.solr.Pid;
|
||||
import eu.dnetlib.dhp.schema.solr.Project;
|
||||
import eu.dnetlib.dhp.schema.solr.Result;
|
||||
|
@ -90,6 +92,8 @@ public class ProvisionModelSupport {
|
|||
r.setOrganization(mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) e));
|
||||
} else if (e instanceof eu.dnetlib.dhp.schema.oaf.Project) {
|
||||
r.setProject(mapProject((eu.dnetlib.dhp.schema.oaf.Project) e, vocs));
|
||||
} else if (e instanceof eu.dnetlib.dhp.schema.oaf.Person) {
|
||||
r.setPerson(mapPerson((eu.dnetlib.dhp.schema.oaf.Person) e));
|
||||
}
|
||||
r
|
||||
.setLinks(
|
||||
|
@ -152,8 +156,13 @@ public class ProvisionModelSupport {
|
|||
rr.setResulttype(mapQualifier(re.getResulttype()));
|
||||
rr.setTitle(Optional.ofNullable(re.getTitle()).map(StructuredProperty::getValue).orElse(null));
|
||||
rr.setDescription(StringUtils.left(re.getDescription(), ModelHardLimits.MAX_RELATED_ABSTRACT_LENGTH));
|
||||
rr.setAuthor(Optional.ofNullable(re.getAuthor())
|
||||
.map(aa -> aa.stream()
|
||||
rr
|
||||
.setAuthor(
|
||||
Optional
|
||||
.ofNullable(re.getAuthor())
|
||||
.map(
|
||||
aa -> aa
|
||||
.stream()
|
||||
.limit(ModelHardLimits.MAX_RELATED_AUTHORS)
|
||||
.collect(Collectors.toList()))
|
||||
.orElse(null));
|
||||
|
@ -192,6 +201,18 @@ public class ProvisionModelSupport {
|
|||
return ps;
|
||||
}
|
||||
|
||||
private static Person mapPerson(eu.dnetlib.dhp.schema.oaf.Person p) {
|
||||
Person ps = new Person();
|
||||
ps.setFamilyName(p.getFamilyName());
|
||||
ps.setGivenName(p.getGivenName());
|
||||
ps.setAlternativeNames(p.getAlternativeNames());
|
||||
ps.setBiography(p.getBiography());
|
||||
ps.setConsent(p.getConsent());
|
||||
// ps.setSubject(...));
|
||||
|
||||
return ps;
|
||||
}
|
||||
|
||||
private static Funding mapFunding(List<String> fundingtree, VocabularyGroup vocs) {
|
||||
SAXReader reader = new SAXReader();
|
||||
return Optional
|
||||
|
|
|
@ -51,6 +51,11 @@ public class RelatedEntity implements Serializable {
|
|||
private Qualifier contracttype;
|
||||
private List<String> fundingtree;
|
||||
|
||||
// person
|
||||
private String givenName;
|
||||
private String familyName;
|
||||
private List<String> alternativeNames;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
}
|
||||
|
@ -251,6 +256,30 @@ public class RelatedEntity implements Serializable {
|
|||
this.fundingtree = fundingtree;
|
||||
}
|
||||
|
||||
public String getGivenName() {
|
||||
return givenName;
|
||||
}
|
||||
|
||||
public void setGivenName(String givenName) {
|
||||
this.givenName = givenName;
|
||||
}
|
||||
|
||||
public String getFamilyName() {
|
||||
return familyName;
|
||||
}
|
||||
|
||||
public void setFamilyName(String familyName) {
|
||||
this.familyName = familyName;
|
||||
}
|
||||
|
||||
public List<String> getAlternativeNames() {
|
||||
return alternativeNames;
|
||||
}
|
||||
|
||||
public void setAlternativeNames(List<String> alternativeNames) {
|
||||
this.alternativeNames = alternativeNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
|
@ -280,7 +309,10 @@ public class RelatedEntity implements Serializable {
|
|||
&& Objects.equal(code, that.code)
|
||||
&& Objects.equal(acronym, that.acronym)
|
||||
&& Objects.equal(contracttype, that.contracttype)
|
||||
&& Objects.equal(fundingtree, that.fundingtree);
|
||||
&& Objects.equal(fundingtree, that.fundingtree)
|
||||
&& Objects.equal(givenName, that.givenName)
|
||||
&& Objects.equal(familyName, that.familyName)
|
||||
&& Objects.equal(alternativeNames, that.alternativeNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -309,6 +341,9 @@ public class RelatedEntity implements Serializable {
|
|||
code,
|
||||
acronym,
|
||||
contracttype,
|
||||
fundingtree);
|
||||
fundingtree,
|
||||
familyName,
|
||||
givenName,
|
||||
alternativeNames);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1035,6 +1035,48 @@ public class XmlRecordFactory implements Serializable {
|
|||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
break;
|
||||
case person:
|
||||
final Person person = (Person) entity;
|
||||
|
||||
if (person.getGivenName() != null) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("givenname", person.getGivenName()));
|
||||
}
|
||||
if (person.getFamilyName() != null) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("familyname", person.getFamilyName()));
|
||||
}
|
||||
if (person.getAlternativeNames() != null) {
|
||||
metadata
|
||||
.addAll(
|
||||
person
|
||||
.getAlternativeNames()
|
||||
.stream()
|
||||
.map(altName -> XmlSerializationUtils.asXmlElement("alternativename", altName))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (person.getBiography() != null) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("biography", person.getBiography()));
|
||||
}
|
||||
if (person.getSubject() != null) {
|
||||
metadata
|
||||
.addAll(
|
||||
person
|
||||
.getSubject()
|
||||
.stream()
|
||||
.map(pt -> {
|
||||
List<Tuple2<String, String>> attrs = Lists.newArrayList();
|
||||
attrs.add(new Tuple2<>("schema", pt.getSchema()));
|
||||
attrs.add(new Tuple2<>("value", pt.getValue()));
|
||||
attrs.add(new Tuple2<>("fromYear", String.valueOf(pt.getFromYear())));
|
||||
attrs.add(new Tuple2<>("toYear", String.valueOf(pt.getToYear())));
|
||||
return XmlSerializationUtils.asXmlElement("subject", attrs);
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
if (person.getConsent() != null) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("consent", String.valueOf(person.getConsent())));
|
||||
}
|
||||
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid entity type: " + type);
|
||||
|
@ -1240,6 +1282,25 @@ public class XmlRecordFactory implements Serializable {
|
|||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
|
||||
case person:
|
||||
|
||||
if (isNotBlank(re.getGivenName())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("givenname", re.getGivenName()));
|
||||
}
|
||||
if (isNotBlank(re.getFamilyName())) {
|
||||
metadata.add(XmlSerializationUtils.asXmlElement("familyname", re.getFamilyName()));
|
||||
}
|
||||
if (re.getAlternativeNames() != null && !re.getAlternativeNames().isEmpty()) {
|
||||
metadata
|
||||
.addAll(
|
||||
re
|
||||
.getAlternativeNames()
|
||||
.stream()
|
||||
.map(name -> XmlSerializationUtils.asXmlElement("alternativename", name))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("invalid target type: " + targetType);
|
||||
}
|
||||
|
|
|
@ -180,6 +180,7 @@
|
|||
<path start="join_relation_datasource"/>
|
||||
<path start="join_relation_organization"/>
|
||||
<path start="join_relation_project"/>
|
||||
<path start="join_relation_person"/>
|
||||
</fork>
|
||||
|
||||
<action name="join_relation_publication">
|
||||
|
@ -378,6 +379,34 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_relation_person">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Join[relation.target = person.id]</name>
|
||||
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCoresForJoining}
|
||||
--executor-memory=${sparkExecutorMemoryForJoining}
|
||||
--driver-memory=${sparkDriverMemoryForJoining}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/person</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/join_partial/person</arg>
|
||||
</spark>
|
||||
<ok to="wait_joins"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_joins" to="fork_join_all_entities"/>
|
||||
|
||||
<fork name="fork_join_all_entities">
|
||||
|
@ -388,6 +417,7 @@
|
|||
<path start="join_datasource_relations"/>
|
||||
<path start="join_organization_relations"/>
|
||||
<path start="join_project_relations"/>
|
||||
<path start="join_person_relations"/>
|
||||
</fork>
|
||||
|
||||
<action name="join_publication_relations">
|
||||
|
@ -593,6 +623,35 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="join_person_relations">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Join[person.id = relatedEntity.source]</name>
|
||||
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
|
||||
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCoresForJoining}
|
||||
--executor-memory=${sparkExecutorMemoryForJoining}
|
||||
--driver-memory=${sparkDriverMemoryForJoining}
|
||||
--conf spark.executor.memoryOverhead=${sparkExecutorMemoryForJoining}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=5000
|
||||
--conf spark.network.timeout=${sparkNetworkTimeout}
|
||||
</spark-opts>
|
||||
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/person</arg>
|
||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Person</arg>
|
||||
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/join_entities/person</arg>
|
||||
<arg>--numPartitions</arg><arg>10000</arg>
|
||||
</spark>
|
||||
<ok to="wait_join_phase2"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_join_phase2" to="create_payloads"/>
|
||||
|
||||
<action name="create_payloads">
|
||||
|
|
Loading…
Reference in New Issue