Added Provision workflow

This commit is contained in:
Sandro La Bruzzo 2020-02-26 10:51:35 +01:00
parent b021b8a2e1
commit 2ef3705b2c
16 changed files with 458 additions and 12 deletions

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.provision;
import eu.dnetlib.dhp.provision.scholix.Typology;
import eu.dnetlib.dhp.provision.scholix.summary.Typology;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang3.StringUtils;

View File

@ -0,0 +1,72 @@
package eu.dnetlib.dhp.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
public class SparkGenerateScholix {
private static final String jsonIDPath = "$.id";
private static final String sourceIDPath = "$.source";
private static final String targetIDPath = "$.target";
public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkGenerateScholix.class.getResourceAsStream("/eu/dnetlib/dhp/provision/input_generate_summary_parameters.json")));
parser.parseArgument(args);
final SparkSession spark = SparkSession
.builder()
.appName(SparkExtractRelationCount.class.getSimpleName())
.master(parser.get("master"))
.getOrCreate();
final String graphPath = parser.get("graphPath");
final String workingDirPath = parser.get("workingDirPath");
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
final JavaRDD<String> relationToExport = sc.textFile(graphPath + "/relation").filter(ProvisionUtil::isNotDeleted);
final JavaPairRDD<String,String> scholixSummary = sc.textFile(workingDirPath + "/summary").mapToPair((PairFunction<String, String, String>) i -> new Tuple2<>(DHPUtils.getJPathString(jsonIDPath, i), i));
PairFunction<Tuple2<String, String>, String, Scholix> k =
summaryRelation ->
new Tuple2<>(
DHPUtils.getJPathString(targetIDPath,summaryRelation._2()),
Scholix.generateScholixWithSource(summaryRelation._1(), summaryRelation._2()));
scholixSummary.join(
relationToExport
.mapToPair((PairFunction<String, String, String>) i -> new Tuple2<>(DHPUtils.getJPathString(sourceIDPath, i), i)))
.map(Tuple2::_2)
.mapToPair(k)
.join(scholixSummary)
.map(Tuple2::_2)
.map(i -> i._1().addTarget(i._2()))
.map(s-> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(s);
})
.saveAsTextFile(workingDirPath + "/scholix", GzipCodec.class);
;
}
}

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.provision;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.ScholixSummary;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;

View File

@ -1,8 +1,6 @@
package eu.dnetlib.dhp.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.ScholixSummary;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;

View File

@ -0,0 +1,119 @@
package eu.dnetlib.dhp.provision.scholix;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class Scholix implements Serializable {
private String publicationDate;
private List<ScholixEntityId> publisher;
private List<ScholixEntityId> linkprovider;
private ScholixRelationship relationship;
private ScholixResource source;
private ScholixResource target;
private String identifier;
public static Scholix generateScholixWithSource(final String sourceSummaryJson, final String relation) {
final ObjectMapper mapper = new ObjectMapper();
try {
ScholixSummary scholixSummary = mapper.readValue(sourceSummaryJson, ScholixSummary.class);
Relation rel = mapper.readValue(sourceSummaryJson, Relation.class);
final Scholix s = new Scholix();
if (scholixSummary.getDate() != null)
s.setPublicationDate(scholixSummary.getDate().stream().findFirst().orElse(null));
s.setLinkprovider(rel.getCollectedFrom().stream().map(cf ->
new ScholixEntityId(cf.getValue(), Collections.singletonList(
new ScholixIdentifier(cf.getKey(), "dnet_identifier")
))).collect(Collectors.toList()));
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
public Scholix addTarget(final String targetSummaryJson) {
return this;
}
public String getPublicationDate() {
return publicationDate;
}
public Scholix setPublicationDate(String publicationDate) {
this.publicationDate = publicationDate;
return this;
}
public List<ScholixEntityId> getPublisher() {
return publisher;
}
public Scholix setPublisher(List<ScholixEntityId> publisher) {
this.publisher = publisher;
return this;
}
public List<ScholixEntityId> getLinkprovider() {
return linkprovider;
}
public Scholix setLinkprovider(List<ScholixEntityId> linkprovider) {
this.linkprovider = linkprovider;
return this;
}
public ScholixRelationship getRelationship() {
return relationship;
}
public Scholix setRelationship(ScholixRelationship relationship) {
this.relationship = relationship;
return this;
}
public ScholixResource getSource() {
return source;
}
public Scholix setSource(ScholixResource source) {
this.source = source;
return this;
}
public ScholixResource getTarget() {
return target;
}
public Scholix setTarget(ScholixResource target) {
this.target = target;
return this;
}
public String getIdentifier() {
return identifier;
}
public Scholix setIdentifier(String identifier) {
this.identifier = identifier;
return this;
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.provision.scholix;
import java.io.Serializable;
public class ScholixCollectedFrom implements Serializable {
private ScholixEntityId provider;
private String provisionMode;
private String completionStatus;
public ScholixCollectedFrom() {
}
public ScholixCollectedFrom(ScholixEntityId provider, String provisionMode, String completionStatus) {
this.provider = provider;
this.provisionMode = provisionMode;
this.completionStatus = completionStatus;
}
public ScholixEntityId getProvider() {
return provider;
}
public ScholixCollectedFrom setProvider(ScholixEntityId provider) {
this.provider = provider;
return this;
}
public String getProvisionMode() {
return provisionMode;
}
public ScholixCollectedFrom setProvisionMode(String provisionMode) {
this.provisionMode = provisionMode;
return this;
}
public String getCompletionStatus() {
return completionStatus;
}
public ScholixCollectedFrom setCompletionStatus(String completionStatus) {
this.completionStatus = completionStatus;
return this;
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.provision.scholix;
import java.io.Serializable;
import java.util.List;
public class ScholixEntityId implements Serializable {
private String name;
private List<ScholixIdentifier> identifiers;
public ScholixEntityId() {
}
public ScholixEntityId(String name, List<ScholixIdentifier> identifiers) {
this.name = name;
this.identifiers = identifiers;
}
public String getName() {
return name;
}
public ScholixEntityId setName(String name) {
this.name = name;
return this;
}
public List<ScholixIdentifier> getIdentifiers() {
return identifiers;
}
public ScholixEntityId setIdentifiers(List<ScholixIdentifier> identifiers) {
this.identifiers = identifiers;
return this;
}
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.provision.scholix;
import java.io.Serializable;
public class ScholixIdentifier implements Serializable {
private String identifier;
private String schema;
public ScholixIdentifier() {
}
public ScholixIdentifier(String identifier, String schema) {
this.identifier = identifier;
this.schema = schema;
}
public String getIdentifier() {
return identifier;
}
public ScholixIdentifier setIdentifier(String identifier) {
this.identifier = identifier;
return this;
}
public String getSchema() {
return schema;
}
public ScholixIdentifier setSchema(String schema) {
this.schema = schema;
return this;
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.provision.scholix;
import java.io.Serializable;
public class ScholixRelationship implements Serializable {
private String name;
private String schema;
private String inverse;
public ScholixRelationship() {
}
public ScholixRelationship(String name, String schema, String inverse) {
this.name = name;
this.schema = schema;
this.inverse = inverse;
}
public String getName() {
return name;
}
public ScholixRelationship setName(String name) {
this.name = name;
return this;
}
public String getSchema() {
return schema;
}
public ScholixRelationship setSchema(String schema) {
this.schema = schema;
return this;
}
public String getInverse() {
return inverse;
}
public ScholixRelationship setInverse(String inverse) {
this.inverse = inverse;
return this;
}
}

View File

@ -0,0 +1,99 @@
package eu.dnetlib.dhp.provision.scholix;
import java.io.Serializable;
import java.util.List;
public class ScholixResource implements Serializable {
private ScholixIdentifier identifier ;
private String dnetIdentifier ;
private String objectType ;
private String objectSubType ;
private String title ;
private List<ScholixEntityId> creator ;
private String publicationDate ;
private List<ScholixEntityId> publisher ;
private List<ScholixCollectedFrom> collectedFrom ;
public ScholixIdentifier getIdentifier() {
return identifier;
}
public ScholixResource setIdentifier(ScholixIdentifier identifier) {
this.identifier = identifier;
return this;
}
public String getDnetIdentifier() {
return dnetIdentifier;
}
public ScholixResource setDnetIdentifier(String dnetIdentifier) {
this.dnetIdentifier = dnetIdentifier;
return this;
}
public String getObjectType() {
return objectType;
}
public ScholixResource setObjectType(String objectType) {
this.objectType = objectType;
return this;
}
public String getObjectSubType() {
return objectSubType;
}
public ScholixResource setObjectSubType(String objectSubType) {
this.objectSubType = objectSubType;
return this;
}
public String getTitle() {
return title;
}
public ScholixResource setTitle(String title) {
this.title = title;
return this;
}
public List<ScholixEntityId> getCreator() {
return creator;
}
public ScholixResource setCreator(List<ScholixEntityId> creator) {
this.creator = creator;
return this;
}
public String getPublicationDate() {
return publicationDate;
}
public ScholixResource setPublicationDate(String publicationDate) {
this.publicationDate = publicationDate;
return this;
}
public List<ScholixEntityId> getPublisher() {
return publisher;
}
public ScholixResource setPublisher(List<ScholixEntityId> publisher) {
this.publisher = publisher;
return this;
}
public List<ScholixCollectedFrom> getCollectedFrom() {
return collectedFrom;
}
public ScholixResource setCollectedFrom(List<ScholixCollectedFrom> collectedFrom) {
this.collectedFrom = collectedFrom;
return this;
}
}

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision.scholix;
package eu.dnetlib.dhp.provision.scholix.summary;
import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision.scholix;
package eu.dnetlib.dhp.provision.scholix.summary;
import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision.scholix;
package eu.dnetlib.dhp.provision.scholix.summary;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision.scholix;
package eu.dnetlib.dhp.provision.scholix.summary;
import java.io.Serializable;

View File

@ -1,4 +1,4 @@
package eu.dnetlib.dhp.provision.scholix;
package eu.dnetlib.dhp.provision.scholix.summary;
import java.io.Serializable;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.provision;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.ScholixSummary;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import org.apache.commons.io.IOUtils;
import org.junit.Ignore;
import org.junit.Test;