diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java index 5ace02bbc4..2c7107b70b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/SparkGenerateScholix.java @@ -39,19 +39,14 @@ public class SparkGenerateScholix { final JavaRDD relationToExport = sc.textFile(graphPath + "/relation").filter(ProvisionUtil::isNotDeleted); final JavaPairRDD scholixSummary = sc.textFile(workingDirPath + "/summary").mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i)); - - - PairFunction, String, Scholix> k = - summaryRelation -> - new Tuple2<>( - DHPUtils.getJPathString(targetIDPath,summaryRelation._2()), - Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2())); - scholixSummary.join( relationToExport .mapToPair((PairFunction) i -> new Tuple2<>(DHPUtils.getJPathString(sourceIDPath, i), i))) .map(Tuple2::_2) - .mapToPair(k) + .mapToPair(summaryRelation -> + new Tuple2<>( + DHPUtils.getJPathString(targetIDPath,summaryRelation._2()), + Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2()))) .join(scholixSummary) .map(Tuple2::_2) .map(i -> i._1().addTarget(i._2())) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java index 70467abb69..9ef2be2be0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/Scholix.java @@ -3,10 +3,8 @@ package eu.dnetlib.dhp.provision.scholix; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; import eu.dnetlib.dhp.schema.oaf.Relation; - +import eu.dnetlib.dhp.utils.DHPUtils; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -32,25 +30,39 @@ public class Scholix implements Serializable { try { ScholixSummary scholixSummary = mapper.readValue(sourceSummaryJson, ScholixSummary.class); - Relation rel = mapper.readValue(sourceSummaryJson, Relation.class); + Relation rel = mapper.readValue(relation, Relation.class); final Scholix s = new Scholix(); if (scholixSummary.getDate() != null) s.setPublicationDate(scholixSummary.getDate().stream().findFirst().orElse(null)); - - s.setLinkprovider(rel.getCollectedFrom().stream().map(cf -> new ScholixEntityId(cf.getValue(), Collections.singletonList( new ScholixIdentifier(cf.getKey(), "dnet_identifier") ))).collect(Collectors.toList())); - - + s.setRelationship(new ScholixRelationship(rel.getRelType(),rel.getRelClass(),null )); + s.setSource(ScholixResource.fromSummary(scholixSummary)); + return s; } catch (Throwable e) { throw new RuntimeException(e); } } + + private void generateIdentifier( ) { + setIdentifier(DHPUtils.md5(String.format("%s::%s::%s",source.getDnetIdentifier(),relationship.getName(), target.getDnetIdentifier()))); + + } + public Scholix addTarget(final String targetSummaryJson) { - return this; + final ObjectMapper mapper = new ObjectMapper(); + + try { + ScholixSummary targetSummary = mapper.readValue(targetSummaryJson, ScholixSummary.class); + setTarget(ScholixResource.fromSummary(targetSummary)); + generateIdentifier(); + return this; + } catch (Throwable e) { + throw new RuntimeException(e); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java index 74cb361f64..34becbb906 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/provision/scholix/ScholixResource.java @@ -1,26 +1,70 @@ package eu.dnetlib.dhp.provision.scholix; +import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary; + import java.io.Serializable; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class ScholixResource implements Serializable { - private ScholixIdentifier identifier ; - private String dnetIdentifier ; - private String objectType ; - private String objectSubType ; - private String title ; - private List creator ; - private String publicationDate ; - private List publisher ; - private List collectedFrom ; + private List identifier; + private String dnetIdentifier; + private String objectType; + private String objectSubType; + private String title; + private List creator; + private String publicationDate; + private List publisher; + private List collectedFrom; - public ScholixIdentifier getIdentifier() { + public static ScholixResource fromSummary(ScholixSummary summary) { + + final ScholixResource resource = new ScholixResource(); + + resource.setDnetIdentifier(summary.getId()); + + resource.setIdentifier(summary.getLocalIdentifier().stream() + .map(i -> + new ScholixIdentifier(i.getId(), i.getType())) + .collect(Collectors.toList())); + + resource.setObjectType(summary.getTypology().toString()); + + resource.setTitle(summary.getTitle().stream().findAny().orElse(null)); + + if (summary.getAuthor() != null) + resource.setCreator(summary.getAuthor().stream() + .map(c -> new ScholixEntityId(c, null)) + .collect(Collectors.toList()) + ); + + if (summary.getDate() != null) + resource.setPublicationDate(summary.getDate().stream().findAny().orElse(null)); + if (summary.getPublisher() != null) + resource.setPublisher(summary.getPublisher().stream() + .map(p -> new ScholixEntityId(p, null)) + .collect(Collectors.toList()) + ); + if (summary.getDatasources() != null) + resource.setCollectedFrom(summary.getDatasources().stream() + .map(d -> + new ScholixCollectedFrom(new ScholixEntityId(d.getDatasourceName(), + Collections.singletonList(new ScholixIdentifier(d.getDatasourceId(), "dnet_identifier")) + ), "collected", d.getCompletionStatus())) + .collect(Collectors.toList())); + return resource; + + } + + public List getIdentifier() { return identifier; } - public ScholixResource setIdentifier(ScholixIdentifier identifier) { + public ScholixResource setIdentifier(List identifier) { this.identifier = identifier; return this; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml index 7e509d7bf2..9120a2e9aa 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/graph/Application/provision/oozie_app/workflow.xml @@ -27,7 +27,7 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -91,10 +91,29 @@ --sourcePath${workingDirPath}/summary --index${index}_object - + + + + + + + ${jobTracker} + ${nameNode} + yarn-cluster + cluster + generate Summary + eu.dnetlib.dhp.provision.SparkGenerateScholix + dhp-graph-provision-${projectVersion}.jar + --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} + -mt yarn-cluster + --workingDirPath${workingDirPath} + --graphPath${graphPath} + + + + + - - \ No newline at end of file