fixed incremental indexing

This commit is contained in:
Sandro La Bruzzo 2020-04-14 17:47:36 +02:00
parent 82e8341f50
commit c36239e693
15 changed files with 364 additions and 188 deletions

View File

@ -0,0 +1,15 @@
package eu.dnetlib.dhp.schema.scholexplorer;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class DLIRelation extends Relation {
private String dateOfCollection;
public String getDateOfCollection() {
return dateOfCollection;
}
public void setDateOfCollection(String dateOfCollection) {
this.dateOfCollection = dateOfCollection;
}
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.sx.graph;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.utils.DHPUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
@ -49,15 +50,15 @@ public class SparkSXGeneratePidSimlarity {
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct();
JavaRDD<Relation> simRel = datasetSimRel.union(publicationSimRel).map(s -> {
final Relation r = new Relation();
JavaRDD<DLIRelation> simRel = datasetSimRel.union(publicationSimRel).map(s -> {
final DLIRelation r = new DLIRelation();
r.setSource(s._1());
r.setTarget(s._2());
r.setRelType("similar");
return r;
}
);
spark.createDataset(simRel.rdd(), Encoders.bean(Relation.class)).distinct().write()
spark.createDataset(simRel.rdd(), Encoders.bean(DLIRelation.class)).distinct().write()
.mode(SaveMode.Overwrite).save(targetPath+"/pid_simRel");
}
}

View File

@ -7,6 +7,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray;
@ -135,19 +136,19 @@ public class SparkScholexplorerCreateRawGraphJob {
SparkSXGeneratePidSimlarity.generateDataFrame(spark, sc, inputPath.replace("/relation",""),targetPath.replace("/relation","") );
RDD<Relation> rdd = union.mapToPair((PairFunction<String, String, Relation>) f -> {
RDD<DLIRelation> rdd = union.mapToPair((PairFunction<String, String, DLIRelation>) f -> {
final String source = getJPathString(SOURCEJSONPATH, f);
final String target = getJPathString(TARGETJSONPATH, f);
final String reltype = getJPathString(RELJSONPATH, f);
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, Relation.class));
return new Tuple2<>(DHPUtils.md5(String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, DLIRelation.class));
}).reduceByKey((a, b) -> {
a.mergeFrom(b);
return a;
}).map(Tuple2::_2).rdd();
spark.createDataset(rdd, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
spark.createDataset(rdd, Encoders.bean(DLIRelation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
Dataset<Relation> rel_ds =spark.read().load(targetPath).as(Encoders.bean(Relation.class));
System.out.println("LOADING PATH :"+targetPath.replace("/relation","")+"/pid_simRel");

View File

@ -2,10 +2,13 @@ package eu.dnetlib.dhp.sx.graph.parser;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.scholexplorer.relation.RelInfo;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@ -15,6 +18,7 @@ import javax.xml.stream.XMLStreamReader;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public abstract class AbstractScholexplorerParser {
@ -104,6 +108,74 @@ public abstract class AbstractScholexplorerParser {
return type+ DHPUtils.md5(String.format("%s::%s", pid.toLowerCase().trim(), pidType.toLowerCase().trim()));
}
protected DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di, final String dateOfCollection) {
final DLIUnknown uk = new DLIUnknown();
uk.setId(generateId(pid, pidType, "unknown"));
ProvenaceInfo pi = new ProvenaceInfo();
pi.setId(cf.getKey());
pi.setName(cf.getValue());
pi.setCompletionStatus("incomplete");
uk.setDataInfo(di);
uk.setDlicollectedfrom(Collections.singletonList(pi));
final StructuredProperty sourcePid = new StructuredProperty();
sourcePid.setValue(pid);
final Qualifier pt = new Qualifier();
pt.setClassname(pidType);
pt.setClassid(pidType);
pt.setSchemename("dnet:pid_types");
pt.setSchemeid("dnet:pid_types");
sourcePid.setQualifier(pt);
uk.setPid(Collections.singletonList(sourcePid));
uk.setDateofcollection(dateOfCollection);
return uk;
}
protected void generateRelations(RelationMapper relationMapper, Result parsedObject, List<Oaf> result, DataInfo di, String dateOfCollection, List<VtdUtilityParser.Node> relatedIdentifiers) {
if(relatedIdentifiers!= null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<DLIRelation> rels = new ArrayList<>();
DLIRelation r = new DLIRelation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation;
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
r.setDateOfCollection(dateOfCollection);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
inverseRelation = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDataInfo(di);
rels.add(r);
r = new DLIRelation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDateOfCollection(dateOfCollection);
rels.add(r);
if("unknown".equalsIgnoreCase(relatedType))
result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di, dateOfCollection));
return rels.stream();
}).collect(Collectors.toList()));
}
}

View File

@ -42,7 +42,8 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
parsedObject.setOriginalObjIdentifier(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='objIdentifier']"));
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']");
parsedObject.setDateofcollection(dateOfCollection);
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
@ -123,7 +124,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
List<String> descs = VtdUtilityParser.getTextValue(ap, vn, "//*[local-name()='description']");
if (descs != null && descs.size() > 0)
parsedObject.setDescription(descs.stream()
.map(it -> it.length() < 512 ? it : it.substring(0, 512))
.map(it -> it.length() < 10000 ? it : it.substring(0, 10000))
.map(it -> {
final Field<String> d = new Field<>();
d.setValue(it);
@ -137,48 +138,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType"));
if(relatedIdentifiers!= null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<Relation> rels = new ArrayList<>();
Relation r = new Relation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = n.getAttributes().get("inverseRelationType");
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
inverseRelation = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setDataInfo(di);
rels.add(r);
r = new Relation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
if("unknown".equalsIgnoreCase(relatedType))
result.add(createUnknownObject(relatedPid, relatedPidType, parsedObject.getCollectedfrom().get(0), di));
return rels.stream();
}).collect(Collectors.toList()));
}
generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers);
final List<Node> hostedBy =
@ -199,7 +159,7 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
}
List<StructuredProperty> subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Arrays.asList("subjectScheme")));
List<StructuredProperty> subjects = extractSubject(VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='resource']//*[local-name()='subject']", Collections.singletonList("subjectScheme")));
parsedObject.setSubject(subjects);
@ -265,24 +225,6 @@ public class DatasetScholexplorerParser extends AbstractScholexplorerParser {
}
private DLIUnknown createUnknownObject(final String pid, final String pidType, final KeyValue cf, final DataInfo di) {
final DLIUnknown uk = new DLIUnknown();
uk.setId(generateId(pid, pidType, "unknown"));
ProvenaceInfo pi = new ProvenaceInfo();
pi.setId(cf.getKey());
pi.setName(cf.getValue());
pi.setCompletionStatus("incomplete");
uk.setDataInfo(di);
uk.setDlicollectedfrom(Collections.singletonList(pi));
final StructuredProperty sourcePid = new StructuredProperty();
sourcePid.setValue(pid);
final Qualifier pt = new Qualifier();
pt.setClassname(pidType);
pt.setClassid(pidType);
pt.setSchemename("dnet:pid_types");
pt.setSchemeid("dnet:pid_types");
sourcePid.setQualifier(pt);
uk.setPid(Collections.singletonList(sourcePid));
return uk;
}
}

View File

@ -38,7 +38,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
di.setDeletedbyinference(false);
di.setInvisible(false);
parsedObject.setDateofcollection(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']"));
String dateOfCollection = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='dateOfCollection']");
parsedObject.setDateofcollection(dateOfCollection);
final String resolvedDate = VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='resolvedDate']");
parsedObject.setOriginalId(Collections.singletonList(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='recordIdentifier']")));
@ -118,48 +119,7 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
final List<Node> relatedIdentifiers =
VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='relatedIdentifier']",
Arrays.asList("relatedIdentifierType", "relationType", "entityType", "inverseRelationType"));
if (relatedIdentifiers != null) {
result.addAll(relatedIdentifiers.stream()
.flatMap(n -> {
final List<Relation> rels = new ArrayList<>();
Relation r = new Relation();
r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
final String relatedType = n.getAttributes().getOrDefault("entityType", "unknown");
String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation = "Unknown";
final String targetId = generateId(relatedPid, relatedPidType, relatedType);
if (relationMapper.containsKey(relationSemantic.toLowerCase()))
{
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal();
inverseRelation = relInfo.getInverse();
}
else {
relationSemantic = "Unknown";
}
r.setTarget(targetId);
r.setRelType(relationSemantic);
r.setCollectedFrom(parsedObject.getCollectedfrom());
r.setRelClass("datacite");
r.setDataInfo(di);
rels.add(r);
r = new Relation();
r.setDataInfo(di);
r.setSource(targetId);
r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation);
r.setRelClass("datacite");
r.setCollectedFrom(parsedObject.getCollectedfrom());
rels.add(r);
return rels.stream();
}).collect(Collectors.toList()));
}
generateRelations(relationMapper, parsedObject, result, di, dateOfCollection, relatedIdentifiers);
final List<Node> hostedBy =
VtdUtilityParser.getTextValuesWithAttributes(ap, vn, "//*[local-name()='hostedBy']", Arrays.asList("id", "name"));
@ -206,8 +166,8 @@ public class PublicationScholexplorerParser extends AbstractScholexplorerParser
description.setValue(VtdUtilityParser.getSingleValue(ap, vn, "//*[local-name()='description']"));
if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 512) {
description.setValue(description.getValue().substring(0, 512));
if (StringUtils.isNotBlank(description.getValue()) && description.getValue().length() > 10000) {
description.setValue(description.getValue().substring(0, 10000));
}
parsedObject.setDescription(Collections.singletonList(description));

View File

@ -69,6 +69,11 @@
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
</dependencies>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.provision.scholix;
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -20,10 +19,6 @@ public class ScholixResource implements Serializable {
private List<ScholixEntityId> publisher;
private List<ScholixCollectedFrom> collectedFrom;
public static ScholixResource fromSummary(ScholixSummary summary) {
final ScholixResource resource = new ScholixResource();
@ -65,6 +60,7 @@ public class ScholixResource implements Serializable {
}
public List<ScholixIdentifier> getIdentifier() {
return identifier;
}

View File

@ -165,7 +165,7 @@ public class Datacite2Scholix {
return res;
}
protected String generateId(final String pid, final String pidType, final String entityType) {
public static String generateId(final String pid, final String pidType, final String entityType) {
String type;
switch (entityType){
case "publication":

View File

@ -4,18 +4,25 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.provision.scholix.ScholixIdentifier;
import eu.dnetlib.dhp.provision.scholix.ScholixRelationship;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class SparkResolveScholixTarget {
@ -29,8 +36,6 @@ public class SparkResolveScholixTarget {
final String sourcePath = parser.get("sourcePath");
final String workingDirPath= parser.get("workingDirPath");
final String indexHost= parser.get("indexHost");
try (SparkSession spark = getSession(conf, master)){
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -65,7 +70,55 @@ public class SparkResolveScholixTarget {
}, Encoders.bean(ScholixResource.class)).write().mode(SaveMode.Overwrite).save(workingDirPath+"/stepB");
Dataset<ScholixResource> s2 = spark.read().load(workingDirPath+"/stepB").as(Encoders.bean(ScholixResource.class));
s1.joinWith(s2, s1.col("target.identifier.identifier").equalTo(s2.col("identifier.identifier")), "left")
.flatMap((FlatMapFunction<Tuple2<Scholix, ScholixResource>, Scholix>) f ->
{
final List<Scholix> res = new ArrayList<>();
final Scholix s = f._1();
final ScholixResource target = f._2();
if (StringUtils.isNotBlank(s.getIdentifier()))
res.add(s);
else if (target == null) {
ScholixResource currentTarget = s.getTarget();
currentTarget.setObjectType("unknown");
currentTarget.setDnetIdentifier(Datacite2Scholix.generateId(currentTarget.getIdentifier().get(0).getIdentifier(),currentTarget.getIdentifier().get(0).getSchema(), currentTarget.getObjectType()));
s.generateIdentifier();
res.add(s);
final Scholix inverse = new Scholix();
inverse.setTarget(s.getSource());
inverse.setSource(s.getTarget());
inverse.setLinkprovider(s.getLinkprovider());
inverse.setPublicationDate(s.getPublicationDate());
inverse.setPublisher(s.getPublisher());
inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName()));
inverse.generateIdentifier();
res.add(inverse);
} else
{
target.setIdentifier(target.getIdentifier().stream().map(d -> new ScholixIdentifier(d.getIdentifier().toLowerCase(), d.getSchema().toLowerCase())).collect(Collectors.toList()));
s.setTarget(target);
s.generateIdentifier();
res.add(s);
final Scholix inverse = new Scholix();
inverse.setTarget(s.getSource());
inverse.setSource(s.getTarget());
inverse.setLinkprovider(s.getLinkprovider());
inverse.setPublicationDate(s.getPublicationDate());
inverse.setPublisher(s.getPublisher());
inverse.setRelationship(new ScholixRelationship(s.getRelationship().getInverse(), s.getRelationship().getSchema(), s.getRelationship().getName()));
inverse.generateIdentifier();
res.add(inverse);
}
return res.iterator();
}, Encoders.bean(Scholix.class)).javaRDD().map(s -> new ObjectMapper().writeValueAsString(s)).saveAsTextFile(workingDirPath+"/resolved_json");
}
}

View File

@ -0,0 +1,10 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,68 @@
<workflow-app name="Index graph to ElasticSearch" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
</parameters>
<start to="indexSummary"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="indexSummary">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index Summary</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/summary</arg>
<arg>--index</arg><arg>${index}_object</arg>
<arg>--idPath</arg><arg>id</arg>
<arg>--type</arg><arg>summary</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/scholix_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,10 @@
<configuration>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,97 @@
<workflow-app name="Keep On Synch datacite" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingDirPath</name>
<description>the source path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>index</name>
<description>index name</description>
</property>
<property>
<name>timestamp</name>
<description>timestamp from incremental harvesting</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingDirPath}/synch'/>
<mkdir path='${workingDirPath}/synch'/>
</fs>
<ok to="ImportDataciteUpdate"/>
<error to="Kill"/>
</action>
<action name="ImportDataciteUpdate">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.provision.update.RetrieveUpdateFromDatacite</main-class>
<arg>-t</arg><arg>${workingDirPath}/synch/input_json</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
<arg>-ih</arg><arg>ip-90-147-167-25.ct1.garrservices.it</arg>
<arg>-in</arg><arg>datacite</arg>
</java>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="resolveScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Resolve and generate Scholix</name>
<class>eu.dnetlib.dhp.provision.update.SparkResolveScholixTarget</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="32" </spark-opts>
<arg>-m</arg> <arg>yarn-cluster</arg>
<arg>-s</arg><arg>${workingDirPath}/synch/input_json</arg>
<arg>-w</arg><arg>${workingDirPath}/synch</arg>
<arg>-h</arg><arg>ip-90-147-167-25.ct1.garrservices.it</arg>
</spark>
<ok to="indexScholix"/>
<error to="Kill"/>
</action>
<action name="indexScholix">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>index scholix</name>
<class>eu.dnetlib.dhp.provision.SparkIndexCollectionOnES</class>
<jar>dhp-graph-provision-scholexplorer-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT} --conf spark.dynamicAllocation.maxExecutors="8" </spark-opts>
<arg>-mt</arg> <arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDirPath}/synch/resolved_json</arg>
<arg>--index</arg><arg>${index}_scholix</arg>
<arg>--idPath</arg><arg>identifier</arg>
<arg>--type</arg><arg>scholix</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -3,24 +3,18 @@ package eu.dnetlib.dhp.provision;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.provision.scholix.Scholix;
import eu.dnetlib.dhp.provision.scholix.ScholixResource;
import eu.dnetlib.dhp.provision.update.*;
import eu.dnetlib.dhp.provision.update.CrossrefClient;
import eu.dnetlib.dhp.provision.update.Datacite2Scholix;
import eu.dnetlib.dhp.provision.update.DataciteClient;
import eu.dnetlib.scholexplorer.relation.RelationMapper;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
public class DataciteClientTest {
@Test
public void dataciteSCholixTest() throws Exception {
final String json = IOUtils.toString(getClass().getResourceAsStream("datacite.json"));
@ -32,27 +26,6 @@ public class DataciteClientTest {
}
public void testClient() throws Exception {
RetrieveUpdateFromDatacite.main(new String[]{
"-n", "file:///data/new_s2.txt",
"-t", "/data/new_s2.txt",
"-ts", "1585760736",
"-ih", "ip-90-147-167-25.ct1.garrservices.it",
"-in", "datacite",
});
SparkResolveScholixTarget.main(new String[]{
"-s", "file:///data/new_s.txt",
"-m", "local[*]",
"-w", "/data/scholix/provision",
"-h", "ip-90-147-167-25.ct1.garrservices.it",
});
}
public void testResolveDataset() throws Exception {
DataciteClient dc = new DataciteClient("ip-90-147-167-25.ct1.garrservices.it");
ScholixResource datasetByDOI = dc.getDatasetByDOI("10.17182/hepdata.15392.v1/t5");
@ -66,32 +39,5 @@ public class DataciteClientTest {
System.out.println(new ObjectMapper().writeValueAsString(crossrefByDOI));
}
private String getResponse(final String url,final String json ) {
CloseableHttpClient client = HttpClients.createDefault();
try {
HttpPost httpPost = new HttpPost(url);
if (json!= null) {
StringEntity entity = new StringEntity(json);
httpPost.setEntity(entity);
httpPost.setHeader("Accept", "application/json");
httpPost.setHeader("Content-type", "application/json");
}
CloseableHttpResponse response = client.execute(httpPost);
return IOUtils.toString(response.getEntity().getContent());
} catch (Throwable e) {
throw new RuntimeException("Error on executing request ",e);
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException("Unable to close client ",e);
}
}
}
}