[person] added merge of relations and production of all possible relations (not only authorship but also co authorship) Note some of the properties present in the publsihers file may be missing (role and corresponding author since there is no place where to put them) note the confidence of the affiliation is not present in the property of the relation
This commit is contained in:
parent
c71ad6aba1
commit
d378de4f0d
|
@ -1,14 +1,13 @@
|
||||||
package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata;
|
package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata;
|
||||||
|
|
||||||
import com.azul.tooling.in.Model;
|
|
||||||
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
|
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
|
||||||
|
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
|
||||||
|
import eu.dnetlib.dhp.common.person.Coauthors;
|
||||||
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors;
|
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
|
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
|
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
|
||||||
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
import eu.dnetlib.dhp.utils.OrcidAuthor;
|
||||||
|
@ -57,6 +56,8 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
|
private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
|
||||||
OrcidAuthors oas = new OrcidAuthors();
|
OrcidAuthors oas = new OrcidAuthors();
|
||||||
List<OrcidAuthor> tmp = authors
|
List<OrcidAuthor> tmp = authors
|
||||||
|
@ -102,13 +103,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) {
|
public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) {
|
||||||
//creates new relations
|
//creates new relations of authorship
|
||||||
Dataset<Relation> newRelations = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
|
Dataset<Relation> newRelations = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
|
||||||
.parquet(workingDir + "/publication_matched")
|
.parquet(workingDir + "/publication_matched")
|
||||||
.selectExpr("id as doi", "enriched_author")
|
.selectExpr("id as doi", "enriched_author")
|
||||||
.flatMap((FlatMapFunction<Row, Relation>) this::getRelationsList, Encoders.bean(Relation.class));
|
.flatMap((FlatMapFunction<Row, Relation>) this::getRelationsList, Encoders.bean(Relation.class));
|
||||||
|
|
||||||
|
|
||||||
//redirects new relations versus representatives if any
|
//redirects new relations versus representatives if any
|
||||||
Dataset<Row> graph_relations = spark.read().schema(Encoders.bean(Relation.class).schema())
|
Dataset<Row> graph_relations = spark.read().schema(Encoders.bean(Relation.class).schema())
|
||||||
.json(graphPath + "/relation")
|
.json(graphPath + "/relation")
|
||||||
|
@ -154,14 +154,24 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor
|
||||||
|
|
||||||
eauthors.forEach(author -> {
|
eauthors.forEach(author -> {
|
||||||
List<Row> pids =author.getAs("pid");
|
List<Row> pids =author.getAs("pid");
|
||||||
pids.forEach(p -> {
|
List<Row> pidList = pids.stream().filter(p -> ModelConstants.ORCID.equalsIgnoreCase(p.getAs("schema")) || ModelConstants.ORCID_PENDING.equalsIgnoreCase(p.getAs("schema"))).collect(Collectors.toList());
|
||||||
if(p.getAs("scheme")==ModelConstants.ORCID)
|
pidList.forEach(p -> relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value")))
|
||||||
relationList.add(getRelations(r.getAs("doi"),author.getAs("raw_affiliation_string"), p.getAs("value")));
|
);
|
||||||
});
|
new CoAuthorshipIterator(extractCoAuthors(pidList)).forEachRemaining(relationList::add);
|
||||||
|
|
||||||
});
|
});
|
||||||
return relationList.iterator();
|
return relationList.iterator();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<String> extractCoAuthors(List<Row> pidList) {
|
||||||
|
|
||||||
|
List<String> coauthors = new ArrayList<>();
|
||||||
|
for(Row pid : pidList)
|
||||||
|
coauthors.add(pid.getAs("Value"));
|
||||||
|
|
||||||
|
|
||||||
|
return coauthors;
|
||||||
|
}
|
||||||
private Relation getRelations(String doi, List<String> rawAffiliationString, String orcid) {
|
private Relation getRelations(String doi, List<String> rawAffiliationString, String orcid) {
|
||||||
Relation rel = OafMapperUtils.getRelation("30|orcid_______::"+ DHPUtils.md5(orcid) , "50|doi_________::" + DHPUtils.md5(doi)
|
Relation rel = OafMapperUtils.getRelation("30|orcid_______::"+ DHPUtils.md5(orcid) , "50|doi_________::" + DHPUtils.md5(doi)
|
||||||
,ModelConstants.RESULT_PERSON_RELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_HASAUTHORED,
|
,ModelConstants.RESULT_PERSON_RELTYPE, ModelConstants.RESULT_PERSON_SUBRELTYPE, ModelConstants.RESULT_PERSON_HASAUTHORED,
|
||||||
|
@ -272,9 +282,12 @@ public class EnrichExternalDataWithGraphORCID extends SparkEnrichWithOrcidAuthor
|
||||||
List<StructuredProperty> pids = new ArrayList<>();
|
List<StructuredProperty> pids = new ArrayList<>();
|
||||||
List<String> affs = new ArrayList<>();
|
List<String> affs = new ArrayList<>();
|
||||||
|
|
||||||
((List<Row>)a.getAs("pids")).forEach(pid -> pids.add(getPid(pid)));
|
List<Row> publisherPids = a.getAs("pids");
|
||||||
|
if(Optional.ofNullable(publisherPids).isPresent())
|
||||||
|
publisherPids.forEach(pid -> pids.add(getPid(pid)));
|
||||||
|
List<Row>affiliations = a.getAs("affiliations");
|
||||||
//"`Matchings`: ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>,
|
//"`Matchings`: ARRAY<STRUCT<`PID`:STRING, `Value`:STRING,`Confidence`:DOUBLE, `Status`:STRING>>,
|
||||||
((List<Row>)a.getAs("affiliations")).forEach(aff -> {
|
affiliations.forEach(aff -> {
|
||||||
if(aff.getAs("Status").equals(Boolean.TRUE))
|
if(aff.getAs("Status").equals(Boolean.TRUE))
|
||||||
affs.add(aff.getAs("PID") + "@@" + aff.getAs("Value") + "@@" + String.valueOf(aff.getAs("Confidence")));
|
affs.add(aff.getAs("PID") + "@@" + aff.getAs("Value") + "@@" + String.valueOf(aff.getAs("Confidence")));
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
|
||||||
|
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
||||||
|
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.PropagationConstant.isSparkSessionManaged;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
public class SparkCopyEnrichedPublisherAuthors implements Serializable {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
SparkCopyEnrichedPublisherAuthors.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/wf/subworkflows/enrich/publisher/input_propagation_parameter.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
String workingDir = parser.get("workingDir");
|
||||||
|
log.info("workingDir: {}", workingDir);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("outputPath");
|
||||||
|
log.info("outputPath: {}", outputPath);
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> copyEnrichedAuthors(spark, workingDir, outputPath));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private static void copyEnrichedAuthors(SparkSession spark, String workingDir, String persistedPath) {
|
||||||
|
spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema())
|
||||||
|
.parquet(workingDir + "/publication_matched")
|
||||||
|
.selectExpr("id as doi", "enriched_author")
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression","gzip")
|
||||||
|
.json(persistedPath + "/publisherEnrichedAuthors");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "issm",
|
||||||
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
"paramDescription": "should be local or yarn",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "op",
|
||||||
|
"paramLongName": "outputPath",
|
||||||
|
"paramDescription": "the path of the orcid Table generated by the dump",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "wd",
|
||||||
|
"paramLongName": "workingDir",
|
||||||
|
"paramDescription": "the path of the graph we want to apply enrichment",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
Loading…
Reference in New Issue