forked from D-Net/dnet-hadoop
Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop
This commit is contained in:
commit
96ce124b59
|
@ -2,7 +2,6 @@
|
||||||
package eu.dnetlib.dhp.broker.model;
|
package eu.dnetlib.dhp.broker.model;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class Event implements Serializable {
|
public class Event implements Serializable {
|
||||||
|
|
||||||
|
@ -25,7 +24,7 @@ public class Event implements Serializable {
|
||||||
|
|
||||||
private boolean instantMessage;
|
private boolean instantMessage;
|
||||||
|
|
||||||
private Map<String, Object> map;
|
private MappedFields map;
|
||||||
|
|
||||||
public Event() {
|
public Event() {
|
||||||
}
|
}
|
||||||
|
@ -33,7 +32,7 @@ public class Event implements Serializable {
|
||||||
public Event(final String producerId, final String eventId, final String topic, final String payload,
|
public Event(final String producerId, final String eventId, final String topic, final String payload,
|
||||||
final Long creationDate, final Long expiryDate,
|
final Long creationDate, final Long expiryDate,
|
||||||
final boolean instantMessage,
|
final boolean instantMessage,
|
||||||
final Map<String, Object> map) {
|
final MappedFields map) {
|
||||||
this.producerId = producerId;
|
this.producerId = producerId;
|
||||||
this.eventId = eventId;
|
this.eventId = eventId;
|
||||||
this.topic = topic;
|
this.topic = topic;
|
||||||
|
@ -100,11 +99,11 @@ public class Event implements Serializable {
|
||||||
this.instantMessage = instantMessage;
|
this.instantMessage = instantMessage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Map<String, Object> getMap() {
|
public MappedFields getMap() {
|
||||||
return this.map;
|
return this.map;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMap(final Map<String, Object> map) {
|
public void setMap(final MappedFields map) {
|
||||||
this.map = map;
|
this.map = map;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,9 +3,8 @@ package eu.dnetlib.dhp.broker.model;
|
||||||
|
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
@ -30,10 +29,10 @@ public class EventFactory {
|
||||||
|
|
||||||
final Event res = new Event();
|
final Event res = new Event();
|
||||||
|
|
||||||
final Map<String, Object> map = createMapFromResult(updateInfo);
|
final MappedFields map = createMapFromResult(updateInfo);
|
||||||
|
|
||||||
final String eventId = calculateEventId(
|
final String eventId = calculateEventId(
|
||||||
updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId(), updateInfo.getHighlightValueAsString());
|
updateInfo.getTopicPath(), updateInfo.getTarget().getOpenaireId(), updateInfo.getHighlightValueAsString());
|
||||||
|
|
||||||
res.setEventId(eventId);
|
res.setEventId(eventId);
|
||||||
res.setProducerId(PRODUCER_ID);
|
res.setProducerId(PRODUCER_ID);
|
||||||
|
@ -46,35 +45,35 @@ public class EventFactory {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
|
private static MappedFields createMapFromResult(final UpdateInfo<?> updateInfo) {
|
||||||
final Map<String, Object> map = new HashMap<>();
|
final MappedFields map = new MappedFields();
|
||||||
|
|
||||||
final OaBrokerMainEntity source = updateInfo.getSource();
|
final OaBrokerMainEntity source = updateInfo.getSource();
|
||||||
final OaBrokerMainEntity target = updateInfo.getTarget();
|
final OaBrokerMainEntity target = updateInfo.getTarget();
|
||||||
|
|
||||||
map.put("target_datasource_id", target.getCollectedFromId());
|
map.setTargetDatasourceId(target.getCollectedFromId());
|
||||||
map.put("target_datasource_name", target.getCollectedFromName());
|
map.setTargetDatasourceName(target.getCollectedFromName());
|
||||||
|
|
||||||
map.put("target_publication_id", target.getOriginalId());
|
map.setTargetResultId(target.getOpenaireId());
|
||||||
|
|
||||||
final List<String> titles = target.getTitles();
|
final List<String> titles = target.getTitles();
|
||||||
if (titles.size() > 0) {
|
if (titles.size() > 0) {
|
||||||
map.put("target_publication_title", titles.get(0));
|
map.setTargetResultTitle(titles.get(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
final long date = parseDateTolong(target.getPublicationdate());
|
final long date = parseDateTolong(target.getPublicationdate());
|
||||||
if (date > 0) {
|
if (date > 0) {
|
||||||
map.put("target_dateofacceptance", date);
|
map.setTargetDateofacceptance(date);
|
||||||
}
|
}
|
||||||
|
|
||||||
map.put("target_publication_subject_list", target.getSubjects());
|
map.setTargetSubjects(target.getSubjects().stream().map(s -> s.getValue()).collect(Collectors.toList()));
|
||||||
map.put("target_publication_author_list", target.getCreators());
|
map.setTargetAuthors(target.getCreators().stream().map(a -> a.getFullname()).collect(Collectors.toList()));
|
||||||
|
|
||||||
// PROVENANCE INFO
|
// PROVENANCE INFO
|
||||||
map.put("trust", updateInfo.getTrust());
|
map.setTrust(updateInfo.getTrust());
|
||||||
map.put("provenance_datasource_id", source.getCollectedFromId());
|
map.setProvenanceDatasourceId(source.getCollectedFromId());
|
||||||
map.put("provenance_datasource_name", source.getCollectedFromName());
|
map.setProvenanceDatasourceName(source.getCollectedFromName());
|
||||||
map.put("provenance_publication_id_list", source.getOriginalId());
|
map.setProvenanceResultId(source.getOpenaireId());
|
||||||
|
|
||||||
return map;
|
return map;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class MappedFields implements Serializable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private static final long serialVersionUID = -7999704113195802008L;
|
||||||
|
|
||||||
|
private String targetDatasourceId;
|
||||||
|
private String targetDatasourceName;
|
||||||
|
private String targetResultId;
|
||||||
|
private String targetResultTitle;
|
||||||
|
private long targetDateofacceptance;
|
||||||
|
private List<String> targetSubjects;
|
||||||
|
private List<String> targetAuthors;
|
||||||
|
private float trust;
|
||||||
|
private String provenanceDatasourceId;
|
||||||
|
private String provenanceDatasourceName;
|
||||||
|
private String provenanceResultId;
|
||||||
|
|
||||||
|
public String getTargetDatasourceId() {
|
||||||
|
return targetDatasourceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetDatasourceId(final String targetDatasourceId) {
|
||||||
|
this.targetDatasourceId = targetDatasourceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetDatasourceName() {
|
||||||
|
return targetDatasourceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetDatasourceName(final String targetDatasourceName) {
|
||||||
|
this.targetDatasourceName = targetDatasourceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetResultId() {
|
||||||
|
return targetResultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetResultId(final String targetResultId) {
|
||||||
|
this.targetResultId = targetResultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTargetResultTitle() {
|
||||||
|
return targetResultTitle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetResultTitle(final String targetResultTitle) {
|
||||||
|
this.targetResultTitle = targetResultTitle;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTargetDateofacceptance() {
|
||||||
|
return targetDateofacceptance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetDateofacceptance(final long targetDateofacceptance) {
|
||||||
|
this.targetDateofacceptance = targetDateofacceptance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTargetSubjects() {
|
||||||
|
return targetSubjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetSubjects(final List<String> targetSubjects) {
|
||||||
|
this.targetSubjects = targetSubjects;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getTargetAuthors() {
|
||||||
|
return targetAuthors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTargetAuthors(final List<String> targetAuthors) {
|
||||||
|
this.targetAuthors = targetAuthors;
|
||||||
|
}
|
||||||
|
|
||||||
|
public float getTrust() {
|
||||||
|
return trust;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTrust(final float trust) {
|
||||||
|
this.trust = trust;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProvenanceDatasourceId() {
|
||||||
|
return provenanceDatasourceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProvenanceDatasourceId(final String provenanceDatasourceId) {
|
||||||
|
this.provenanceDatasourceId = provenanceDatasourceId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProvenanceDatasourceName() {
|
||||||
|
return provenanceDatasourceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProvenanceDatasourceName(final String provenanceDatasourceName) {
|
||||||
|
this.provenanceDatasourceName = provenanceDatasourceName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getProvenanceResultId() {
|
||||||
|
return provenanceResultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setProvenanceResultId(final String provenanceResultId) {
|
||||||
|
this.provenanceResultId = provenanceResultId;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -11,7 +11,7 @@ import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.TypedColumn;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -22,15 +22,15 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class JoinEntitiesJob {
|
public class JoinStep1Job {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
|
private static final Logger log = LoggerFactory.getLogger(JoinStep1Job.class);
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
JoinEntitiesJob.class
|
JoinStep1Job.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ public class JoinEntitiesJob {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
final String joinedEntitiesPath = workingPath + "/joinedEntities";
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step1";
|
||||||
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
|
@ -52,39 +52,28 @@ public class JoinEntitiesJob {
|
||||||
|
|
||||||
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> r1 = join(
|
final Dataset<RelatedProject> typedRels = ClusterUtils
|
||||||
r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class),
|
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
|
||||||
new RelatedProjectAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r2 = join(
|
|
||||||
// r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new
|
|
||||||
// RelatedDatasetAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r3 = join(
|
|
||||||
// r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new
|
|
||||||
// RelatedPublicationAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r4 = join(
|
|
||||||
// r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new
|
|
||||||
// RelatedSoftwareAggregator());
|
|
||||||
|
|
||||||
r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
|
|
||||||
final Dataset<T> typedRels,
|
|
||||||
final Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> aggr) {
|
|
||||||
|
|
||||||
return sources
|
|
||||||
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
|
||||||
.groupByKey(
|
|
||||||
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
|
||||||
.agg(aggr.toColumn())
|
|
||||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftwareAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep2Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep2Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep2Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step2";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedSoftware> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasetAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep3Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep3Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep3Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step3";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedDataset> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublicationAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep4Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep4Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep4Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step4";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedPublication> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -32,7 +32,7 @@ public class PrepareGroupsJob {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
JoinEntitiesJob.class
|
PrepareGroupsJob.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public class PrepareGroupsJob {
|
||||||
ClusterUtils.removeDir(spark, groupsPath);
|
ClusterUtils.removeDir(spark, groupsPath);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> results = ClusterUtils
|
final Dataset<OaBrokerMainEntity> results = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<Relation> mergedRels = ClusterUtils
|
final Dataset<Relation> mergedRels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
|
|
@ -61,6 +61,7 @@ public class PrepareRelatedDatasetsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
|
@ -29,8 +27,6 @@ public class PrepareRelatedProjectsJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -67,6 +63,7 @@ public class PrepareRelatedProjectsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
@ -28,8 +26,6 @@ public class PrepareRelatedPublicationsJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -68,6 +64,7 @@ public class PrepareRelatedPublicationsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
|
@ -29,8 +27,6 @@ public class PrepareRelatedSoftwaresJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -67,6 +63,7 @@ public class PrepareRelatedSoftwaresJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -43,16 +43,19 @@ public abstract class UpdateMatcher<T> {
|
||||||
if (source != res) {
|
if (source != res) {
|
||||||
for (final T hl : findDifferences(source, res)) {
|
for (final T hl : findDifferences(source, res)) {
|
||||||
final Topic topic = getTopicFunction().apply(hl);
|
final Topic topic = getTopicFunction().apply(hl);
|
||||||
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(),
|
if (topic != null) {
|
||||||
|
final UpdateInfo<T> info = new UpdateInfo<>(topic, hl, source, res,
|
||||||
|
getCompileHighlightFunction(),
|
||||||
getHighlightToStringFunction(), dedupConfig);
|
getHighlightToStringFunction(), dedupConfig);
|
||||||
|
|
||||||
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
|
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
|
||||||
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
|
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
|
||||||
} else {
|
|
||||||
infoMap.put(s, info);
|
infoMap.put(s, info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
final Collection<UpdateInfo<T>> values = infoMap.values();
|
final Collection<UpdateInfo<T>> values = infoMap.values();
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBroke
|
||||||
super(true,
|
super(true,
|
||||||
rel -> topic,
|
rel -> topic,
|
||||||
(p, rel) -> p.getDatasets().add(rel),
|
(p, rel) -> p.getDatasets().add(rel),
|
||||||
rel -> rel.getOriginalId());
|
rel -> rel.getOpenaireId());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract boolean filterByType(String relType);
|
protected abstract boolean filterByType(String relType);
|
||||||
|
@ -29,14 +29,14 @@ public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBroke
|
||||||
.getDatasets()
|
.getDatasets()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(rel -> filterByType(rel.getRelType()))
|
.filter(rel -> filterByType(rel.getRelType()))
|
||||||
.map(OaBrokerRelatedDataset::getOriginalId)
|
.map(OaBrokerRelatedDataset::getOpenaireId)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
return source
|
return source
|
||||||
.getDatasets()
|
.getDatasets()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(rel -> filterByType(rel.getRelType()))
|
.filter(rel -> filterByType(rel.getRelType()))
|
||||||
.filter(d -> !existingDatasets.contains(d.getOriginalId()))
|
.filter(d -> !existingDatasets.contains(d.getOpenaireId()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaB
|
||||||
super(true,
|
super(true,
|
||||||
rel -> topic,
|
rel -> topic,
|
||||||
(p, rel) -> p.getPublications().add(rel),
|
(p, rel) -> p.getPublications().add(rel),
|
||||||
rel -> rel.getOriginalId());
|
rel -> rel.getOpenaireId());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,14 +31,14 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaB
|
||||||
.getPublications()
|
.getPublications()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(rel -> filterByType(rel.getRelType()))
|
.filter(rel -> filterByType(rel.getRelType()))
|
||||||
.map(OaBrokerRelatedPublication::getOriginalId)
|
.map(OaBrokerRelatedPublication::getOpenaireId)
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
|
|
||||||
return source
|
return source
|
||||||
.getPublications()
|
.getPublications()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(rel -> filterByType(rel.getRelType()))
|
.filter(rel -> filterByType(rel.getRelType()))
|
||||||
.filter(p -> !existingPublications.contains(p.getOriginalId()))
|
.filter(p -> !existingPublications.contains(p.getOpenaireId()))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ public class EnrichMissingSoftware
|
||||||
super(true,
|
super(true,
|
||||||
s -> Topic.ENRICH_MISSING_SOFTWARE,
|
s -> Topic.ENRICH_MISSING_SOFTWARE,
|
||||||
(p, s) -> p.getSoftwares().add(s),
|
(p, s) -> p.getSoftwares().add(s),
|
||||||
s -> s.getName());
|
s -> s.getOpenaireId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -16,7 +16,7 @@ public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
|
||||||
super(true,
|
super(true,
|
||||||
s -> Topic.ENRICH_MORE_SOFTWARE,
|
s -> Topic.ENRICH_MORE_SOFTWARE,
|
||||||
(p, s) -> p.getSoftwares().add(s),
|
(p, s) -> p.getSoftwares().add(s),
|
||||||
s -> s.getName());
|
s -> s.getOpenaireId());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,6 +22,7 @@ public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
|
||||||
@Override
|
@Override
|
||||||
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
|
||||||
final OaBrokerMainEntity target) {
|
final OaBrokerMainEntity target) {
|
||||||
|
|
||||||
final Set<String> existingSubjects = target
|
final Set<String> existingSubjects = target
|
||||||
.getSubjects()
|
.getSubjects()
|
||||||
.stream()
|
.stream()
|
||||||
|
|
|
@ -17,6 +17,8 @@ public class BrokerConstants {
|
||||||
public static final float MIN_TRUST = 0.25f;
|
public static final float MIN_TRUST = 0.25f;
|
||||||
public static final float MAX_TRUST = 1.00f;
|
public static final float MAX_TRUST = 1.00f;
|
||||||
|
|
||||||
|
public static final int MAX_NUMBER_OF_RELS = 20;
|
||||||
|
|
||||||
public static Class<?>[] getModelClasses() {
|
public static Class<?>[] getModelClasses() {
|
||||||
final Set<Class<?>> list = new HashSet<>();
|
final Set<Class<?>> list = new HashSet<>();
|
||||||
list.addAll(Arrays.asList(ModelSupport.getOafModelClasses()));
|
list.addAll(Arrays.asList(ModelSupport.getOafModelClasses()));
|
||||||
|
|
|
@ -7,29 +7,7 @@ import java.util.List;
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
import eu.dnetlib.dhp.broker.model.EventFactory;
|
import eu.dnetlib.dhp.broker.model.EventFactory;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsRelatedTo;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedBy;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsSupplementedTo;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetReferences;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMissingProject;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedProjects.EnrichMoreProject;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsReferencedBy;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsRelatedTo;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedBy;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationIsSupplementedTo;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedPublications.EnrichMissingPublicationReferences;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMissingSoftware;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware.EnrichMoreSoftware;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAbstract;
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingAuthorOrcid;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingOpenAccess;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPid;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingPublicationDate;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMissingSubject;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject;
|
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
|
||||||
|
@ -38,31 +16,31 @@ public class EventFinder {
|
||||||
private static List<UpdateMatcher<?>> matchers = new ArrayList<>();
|
private static List<UpdateMatcher<?>> matchers = new ArrayList<>();
|
||||||
static {
|
static {
|
||||||
matchers.add(new EnrichMissingAbstract());
|
matchers.add(new EnrichMissingAbstract());
|
||||||
matchers.add(new EnrichMissingAuthorOrcid());
|
// matchers.add(new EnrichMissingAuthorOrcid());
|
||||||
matchers.add(new EnrichMissingOpenAccess());
|
// matchers.add(new EnrichMissingOpenAccess());
|
||||||
matchers.add(new EnrichMissingPid());
|
// matchers.add(new EnrichMissingPid());
|
||||||
matchers.add(new EnrichMissingPublicationDate());
|
// matchers.add(new EnrichMissingPublicationDate());
|
||||||
matchers.add(new EnrichMissingSubject());
|
// matchers.add(new EnrichMissingSubject());
|
||||||
matchers.add(new EnrichMoreOpenAccess());
|
// matchers.add(new EnrichMoreOpenAccess());
|
||||||
matchers.add(new EnrichMorePid());
|
// matchers.add(new EnrichMorePid());
|
||||||
matchers.add(new EnrichMoreSubject());
|
// matchers.add(new EnrichMoreSubject());
|
||||||
|
|
||||||
// Advanced matchers
|
// // Advanced matchers
|
||||||
matchers.add(new EnrichMissingProject());
|
// matchers.add(new EnrichMissingProject());
|
||||||
matchers.add(new EnrichMoreProject());
|
// matchers.add(new EnrichMoreProject());
|
||||||
matchers.add(new EnrichMissingSoftware());
|
// matchers.add(new EnrichMissingSoftware());
|
||||||
matchers.add(new EnrichMoreSoftware());
|
// matchers.add(new EnrichMoreSoftware());
|
||||||
matchers.add(new EnrichMissingPublicationIsRelatedTo());
|
// matchers.add(new EnrichMissingPublicationIsRelatedTo());
|
||||||
matchers.add(new EnrichMissingPublicationIsReferencedBy());
|
// matchers.add(new EnrichMissingPublicationIsReferencedBy());
|
||||||
matchers.add(new EnrichMissingPublicationReferences());
|
// matchers.add(new EnrichMissingPublicationReferences());
|
||||||
matchers.add(new EnrichMissingPublicationIsSupplementedTo());
|
// matchers.add(new EnrichMissingPublicationIsSupplementedTo());
|
||||||
matchers.add(new EnrichMissingPublicationIsSupplementedBy());
|
// matchers.add(new EnrichMissingPublicationIsSupplementedBy());
|
||||||
matchers.add(new EnrichMissingDatasetIsRelatedTo());
|
// matchers.add(new EnrichMissingDatasetIsRelatedTo());
|
||||||
matchers.add(new EnrichMissingDatasetIsReferencedBy());
|
// matchers.add(new EnrichMissingDatasetIsReferencedBy());
|
||||||
matchers.add(new EnrichMissingDatasetReferences());
|
// matchers.add(new EnrichMissingDatasetReferences());
|
||||||
matchers.add(new EnrichMissingDatasetIsSupplementedTo());
|
// matchers.add(new EnrichMissingDatasetIsSupplementedTo());
|
||||||
matchers.add(new EnrichMissingDatasetIsSupplementedBy());
|
// matchers.add(new EnrichMissingDatasetIsSupplementedBy());
|
||||||
matchers.add(new EnrichMissingAbstract());
|
// matchers.add(new EnrichMissingAbstract());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
|
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
|
||||||
|
|
|
@ -14,12 +14,16 @@ public class EventGroup implements Serializable {
|
||||||
*/
|
*/
|
||||||
private static final long serialVersionUID = 765977943803533130L;
|
private static final long serialVersionUID = 765977943803533130L;
|
||||||
|
|
||||||
private final List<Event> data = new ArrayList<>();
|
private List<Event> data = new ArrayList<>();
|
||||||
|
|
||||||
public List<Event> getData() {
|
public List<Event> getData() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setData(final List<Event> data) {
|
||||||
|
this.data = data;
|
||||||
|
}
|
||||||
|
|
||||||
public EventGroup addElement(final Event elem) {
|
public EventGroup addElement(final Event elem) {
|
||||||
data.add(elem);
|
data.add(elem);
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -111,7 +111,7 @@ public final class UpdateInfo<T> {
|
||||||
final OaBrokerMainEntity hl = new OaBrokerMainEntity();
|
final OaBrokerMainEntity hl = new OaBrokerMainEntity();
|
||||||
compileHighlight.accept(hl, getHighlightValue());
|
compileHighlight.accept(hl, getHighlightValue());
|
||||||
|
|
||||||
final String provId = getSource().getOriginalId();
|
final String provId = getSource().getOpenaireId();
|
||||||
final String provRepo = getSource().getCollectedFromName();
|
final String provRepo = getSource().getCollectedFromName();
|
||||||
|
|
||||||
final String provUrl = getSource()
|
final String provUrl = getSource()
|
||||||
|
|
|
@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class RelatedDatasetAggregator
|
public class RelatedDatasetAggregator
|
||||||
|
@ -30,7 +31,7 @@ public class RelatedDatasetAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
|
||||||
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
if (t._2 != null) {
|
if (t._2 != null && res.getDatasets().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
|
||||||
res.getDatasets().add(t._2.getRelDataset());
|
res.getDatasets().add(t._2.getRelDataset());
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
@ -40,7 +41,14 @@ public class RelatedDatasetAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
||||||
|
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getDatasets().size();
|
||||||
|
if (availables > 0) {
|
||||||
|
if (g2.getDatasets().size() <= availables) {
|
||||||
g1.getDatasets().addAll(g2.getDatasets());
|
g1.getDatasets().addAll(g2.getDatasets());
|
||||||
|
} else {
|
||||||
|
g1.getDatasets().addAll(g2.getDatasets().subList(0, availables));
|
||||||
|
}
|
||||||
|
}
|
||||||
return g1;
|
return g1;
|
||||||
} else {
|
} else {
|
||||||
return g2;
|
return g2;
|
||||||
|
|
|
@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class RelatedProjectAggregator
|
public class RelatedProjectAggregator
|
||||||
|
@ -30,7 +31,7 @@ public class RelatedProjectAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
|
||||||
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
if (t._2 != null) {
|
if (t._2 != null && res.getProjects().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
|
||||||
res.getProjects().add(t._2.getRelProject());
|
res.getProjects().add(t._2.getRelProject());
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
@ -40,7 +41,14 @@ public class RelatedProjectAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
||||||
|
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getProjects().size();
|
||||||
|
if (availables > 0) {
|
||||||
|
if (g2.getProjects().size() <= availables) {
|
||||||
g1.getProjects().addAll(g2.getProjects());
|
g1.getProjects().addAll(g2.getProjects());
|
||||||
|
} else {
|
||||||
|
g1.getProjects().addAll(g2.getProjects().subList(0, availables));
|
||||||
|
}
|
||||||
|
}
|
||||||
return g1;
|
return g1;
|
||||||
} else {
|
} else {
|
||||||
return g2;
|
return g2;
|
||||||
|
|
|
@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class RelatedPublicationAggregator
|
public class RelatedPublicationAggregator
|
||||||
|
@ -31,7 +32,7 @@ public class RelatedPublicationAggregator
|
||||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
|
||||||
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
|
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
|
||||||
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
if (t._2 != null) {
|
if (t._2 != null && res.getPublications().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
|
||||||
res.getPublications().add(t._2.getRelPublication());
|
res.getPublications().add(t._2.getRelPublication());
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
@ -41,8 +42,16 @@ public class RelatedPublicationAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
||||||
|
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getPublications().size();
|
||||||
|
if (availables > 0) {
|
||||||
|
if (g2.getPublications().size() <= availables) {
|
||||||
g1.getPublications().addAll(g2.getPublications());
|
g1.getPublications().addAll(g2.getPublications());
|
||||||
|
} else {
|
||||||
|
g1.getPublications().addAll(g2.getPublications().subList(0, availables));
|
||||||
|
}
|
||||||
|
}
|
||||||
return g1;
|
return g1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return g2;
|
return g2;
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.expressions.Aggregator;
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class RelatedSoftwareAggregator
|
public class RelatedSoftwareAggregator
|
||||||
|
@ -30,7 +31,7 @@ public class RelatedSoftwareAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
|
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
|
||||||
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
|
||||||
if (t._2 != null) {
|
if (t._2 != null && res.getSoftwares().size() < BrokerConstants.MAX_NUMBER_OF_RELS) {
|
||||||
res.getSoftwares().add(t._2.getRelSoftware());
|
res.getSoftwares().add(t._2.getRelSoftware());
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
|
@ -40,7 +41,14 @@ public class RelatedSoftwareAggregator
|
||||||
@Override
|
@Override
|
||||||
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
|
||||||
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
|
||||||
|
final int availables = BrokerConstants.MAX_NUMBER_OF_RELS - g1.getSoftwares().size();
|
||||||
|
if (availables > 0) {
|
||||||
|
if (g2.getSoftwares().size() <= availables) {
|
||||||
g1.getSoftwares().addAll(g2.getSoftwares());
|
g1.getSoftwares().addAll(g2.getSoftwares());
|
||||||
|
} else {
|
||||||
|
g1.getSoftwares().addAll(g2.getSoftwares().subList(0, availables));
|
||||||
|
}
|
||||||
|
}
|
||||||
return g1;
|
return g1;
|
||||||
} else {
|
} else {
|
||||||
return g2;
|
return g2;
|
||||||
|
|
|
@ -216,14 +216,86 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_entities_and_rels" to="join_entities"/>
|
<join name="wait_entities_and_rels" to="join_entities_step1"/>
|
||||||
|
|
||||||
<action name="join_entities">
|
<action name="join_entities_step1">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>JoinEntitiesJob</name>
|
<name>JoinStep1</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step2"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step2">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep2</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step3"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step3">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep3</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step4"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step4">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep4</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
|
|
@ -73,61 +73,13 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="join_entities"/>
|
<start to="generate_events"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
<action name="join_entities">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>JoinEntitiesJob</name>
|
|
||||||
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
|
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="prepare_groups"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="prepare_groups">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>PrepareGroupsJob</name>
|
|
||||||
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
|
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="generate_events"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="generate_events">
|
<action name="generate_events">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
|
|
@ -271,6 +271,26 @@ object DoiBoostMappingUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = {
|
||||||
|
val sp = new StructuredProperty
|
||||||
|
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
|
||||||
|
sp.setValue(value)
|
||||||
|
sp
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = {
|
||||||
|
val sp = new StructuredProperty
|
||||||
|
sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
|
||||||
|
sp.setValue(value)
|
||||||
|
sp.setDataInfo(dataInfo)
|
||||||
|
sp
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
|
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
|
||||||
val sp = new StructuredProperty
|
val sp = new StructuredProperty
|
||||||
sp.setQualifier(createQualifier(classId, schemeId))
|
sp.setQualifier(createQualifier(classId, schemeId))
|
||||||
|
@ -279,6 +299,8 @@ object DoiBoostMappingUtil {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
|
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
|
||||||
val sp = new StructuredProperty
|
val sp = new StructuredProperty
|
||||||
sp.setQualifier(createQualifier(classId, schemeId))
|
sp.setQualifier(createQualifier(classId, schemeId))
|
||||||
|
|
|
@ -129,16 +129,16 @@ case object ConversionUtil {
|
||||||
val fieldOfStudy = item._2
|
val fieldOfStudy = item._2
|
||||||
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
|
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
|
||||||
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
|
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
|
||||||
val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies")
|
val s1 = createSP(s.DisplayName, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
|
||||||
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
|
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
|
||||||
var resList: List[StructuredProperty] = List(s1)
|
var resList: List[StructuredProperty] = List(s1)
|
||||||
if (s.MainType.isDefined) {
|
if (s.MainType.isDefined) {
|
||||||
val maintp = s.MainType.get
|
val maintp = s.MainType.get
|
||||||
val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies")
|
val s2 = createSP(s.MainType.get, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
|
||||||
s2.setDataInfo(di)
|
s2.setDataInfo(di)
|
||||||
resList = resList ::: List(s2)
|
resList = resList ::: List(s2)
|
||||||
if (maintp.contains(".")) {
|
if (maintp.contains(".")) {
|
||||||
val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies")
|
val s3 = createSP(maintp.split("\\.").head, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
|
||||||
s3.setDataInfo(di)
|
s3.setDataInfo(di)
|
||||||
resList = resList ::: List(s3)
|
resList = resList ::: List(s3)
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ object SparkPreProcessMAG {
|
||||||
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
|
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
|
||||||
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
|
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
|
||||||
|
|
||||||
logger.info("Phase 6) Enrich Publication with description")
|
logger.info("Phase 0) Enrich Publication with description")
|
||||||
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
|
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
|
||||||
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
|
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<start to="GenerateActionSet"/>
|
<start to="CreateDOIBoost"/>
|
||||||
|
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
|
|
|
@ -18,6 +18,9 @@ class CrossrefMappingTest {
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testFunderRelationshipsMapping(): Unit = {
|
def testFunderRelationshipsMapping(): Unit = {
|
||||||
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString
|
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString
|
||||||
|
@ -58,6 +61,27 @@ class CrossrefMappingTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testOrcidID() :Unit = {
|
||||||
|
val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString
|
||||||
|
|
||||||
|
|
||||||
|
assertNotNull(json)
|
||||||
|
assertFalse(json.isEmpty);
|
||||||
|
|
||||||
|
val resultList: List[Oaf] = Crossref2Oaf.convert(json)
|
||||||
|
|
||||||
|
assertTrue(resultList.nonEmpty)
|
||||||
|
|
||||||
|
val items = resultList.filter(p => p.isInstanceOf[Result])
|
||||||
|
|
||||||
|
|
||||||
|
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
||||||
|
items.foreach(p => println(mapper.writeValueAsString(p)))
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testEmptyTitle() :Unit = {
|
def testEmptyTitle() :Unit = {
|
||||||
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString
|
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString
|
||||||
|
|
|
@ -0,0 +1,271 @@
|
||||||
|
{
|
||||||
|
"DOI":"10.1016/j.carbpol.2020.115930",
|
||||||
|
"issued":{
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
4
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"published-print":{
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
4
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"prefix":"10.1016",
|
||||||
|
"subject":[
|
||||||
|
"Organic Chemistry",
|
||||||
|
"Materials Chemistry",
|
||||||
|
"Polymers and Plastics"
|
||||||
|
],
|
||||||
|
"author":[
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Lei",
|
||||||
|
"family":"Fang",
|
||||||
|
"sequence":"first"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Hua",
|
||||||
|
"family":"Lin",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Zhenfeng",
|
||||||
|
"family":"Wu",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Zhen",
|
||||||
|
"family":"Wang",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Xinxin",
|
||||||
|
"family":"Fan",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Ziting",
|
||||||
|
"family":"Cheng",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"given":"Xiaoya",
|
||||||
|
"family":"Hou",
|
||||||
|
"sequence":"additional"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"authenticated-orcid":false,
|
||||||
|
"given":"Daquan",
|
||||||
|
"family":"Chen",
|
||||||
|
"sequence":"additional",
|
||||||
|
"affiliation":[
|
||||||
|
|
||||||
|
],
|
||||||
|
"ORCID":"http://orcid.org/0000-0002-6796-0204"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"reference-count":41,
|
||||||
|
"ISSN":[
|
||||||
|
"0144-8617"
|
||||||
|
],
|
||||||
|
"assertion":[
|
||||||
|
{
|
||||||
|
"name":"publisher",
|
||||||
|
"value":"Elsevier",
|
||||||
|
"label":"This article is maintained by"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"articletitle",
|
||||||
|
"value":"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle",
|
||||||
|
"label":"Article Title"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"journaltitle",
|
||||||
|
"value":"Carbohydrate Polymers",
|
||||||
|
"label":"Journal Title"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"articlelink",
|
||||||
|
"value":"https://doi.org/10.1016/j.carbpol.2020.115930",
|
||||||
|
"label":"CrossRef DOI link to publisher maintained version"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"content_type",
|
||||||
|
"value":"article",
|
||||||
|
"label":"Content Type"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"copyright",
|
||||||
|
"value":"\\u00a9 2020 Elsevier Ltd. All rights reserved.",
|
||||||
|
"label":"Copyright"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"member":"78",
|
||||||
|
"source":"Crossref",
|
||||||
|
"score":1.0,
|
||||||
|
"deposited":{
|
||||||
|
"timestamp":1584590965000,
|
||||||
|
"date-time":"2020-03-19T04:09:25Z",
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
3,
|
||||||
|
19
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"indexed":{
|
||||||
|
"timestamp":1584592912467,
|
||||||
|
"date-time":"2020-03-19T04:41:52Z",
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
3,
|
||||||
|
19
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"type":"journal-article",
|
||||||
|
"URL":"http://dx.doi.org/10.1016/j.carbpol.2020.115930",
|
||||||
|
"is-referenced-by-count":0,
|
||||||
|
"volume":"234",
|
||||||
|
"issn-type":[
|
||||||
|
{
|
||||||
|
"type":"print",
|
||||||
|
"value":"0144-8617"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"link":[
|
||||||
|
{
|
||||||
|
"URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/xml",
|
||||||
|
"intended-application":"text-mining",
|
||||||
|
"content-version":"vor",
|
||||||
|
"content-type":"text/xml"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/plain",
|
||||||
|
"intended-application":"text-mining",
|
||||||
|
"content-version":"vor",
|
||||||
|
"content-type":"text/plain"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"update-policy":"http://dx.doi.org/10.1016/elsevier_cm_policy",
|
||||||
|
"references-count":41,
|
||||||
|
"short-container-title":[
|
||||||
|
"Carbohydrate Polymers"
|
||||||
|
],
|
||||||
|
"publisher":"Elsevier BV",
|
||||||
|
"content-domain":{
|
||||||
|
"domain":[
|
||||||
|
"elsevier.com",
|
||||||
|
"sciencedirect.com"
|
||||||
|
],
|
||||||
|
"crossmark-restriction":true
|
||||||
|
},
|
||||||
|
"language":"en",
|
||||||
|
"license":[
|
||||||
|
{
|
||||||
|
"URL":"https://www.elsevier.com/tdm/userlicense/1.0/",
|
||||||
|
"start":{
|
||||||
|
"timestamp":1585699200000,
|
||||||
|
"date-time":"2020-04-01T00:00:00Z",
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
4,
|
||||||
|
1
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"content-version":"tdm",
|
||||||
|
"delay-in-days":0
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"created":{
|
||||||
|
"timestamp":1581759678000,
|
||||||
|
"date-time":"2020-02-15T09:41:18Z",
|
||||||
|
"date-parts":[
|
||||||
|
[
|
||||||
|
2020,
|
||||||
|
2,
|
||||||
|
15
|
||||||
|
]
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"title":[
|
||||||
|
"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle"
|
||||||
|
],
|
||||||
|
"alternative-id":[
|
||||||
|
"S0144861720301041"
|
||||||
|
],
|
||||||
|
"container-title":[
|
||||||
|
"Carbohydrate Polymers"
|
||||||
|
],
|
||||||
|
"funder":[
|
||||||
|
{
|
||||||
|
"doi-asserted-by":"publisher",
|
||||||
|
"DOI":"10.13039/501100007129",
|
||||||
|
"name":"Natural Science Foundation of Shandong Province",
|
||||||
|
"award":[
|
||||||
|
"ZR2019ZD24",
|
||||||
|
"ZR2019YQ30"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"doi-asserted-by":"publisher",
|
||||||
|
"DOI":"10.13039/100010449",
|
||||||
|
"name":"Ministry of Education, Libya",
|
||||||
|
"award":[
|
||||||
|
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"doi-asserted-by":"publisher",
|
||||||
|
"DOI":"10.13039/501100012249",
|
||||||
|
"name":"Jiangxi University of Traditional Chinese Medicine",
|
||||||
|
"award":[
|
||||||
|
"TCM-0906"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"Taishan Scholar Program",
|
||||||
|
"award":[
|
||||||
|
"qnts20161035"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"Open fund project of Key Laboratory of Modern Preparation of TCM",
|
||||||
|
"award":[
|
||||||
|
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"page":"115930",
|
||||||
|
"article-number":"115930"
|
||||||
|
}
|
|
@ -147,9 +147,23 @@ public class CleanGraphSparkJob {
|
||||||
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
|
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
|
||||||
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
|
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
|
||||||
}
|
}
|
||||||
|
if (Objects.isNull(i.getRefereed())) {
|
||||||
|
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (Objects.nonNull(r.getAuthor())) {
|
||||||
|
boolean nullRank = r
|
||||||
|
.getAuthor()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(a -> Objects.isNull(a.getRank()));
|
||||||
|
if (nullRank) {
|
||||||
|
int i = 1;
|
||||||
|
for (Author author : r.getAuthor()) {
|
||||||
|
author.setRank(i++);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (value instanceof Publication) {
|
if (value instanceof Publication) {
|
||||||
|
|
||||||
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
} else if (value instanceof eu.dnetlib.dhp.schema.oaf.Dataset) {
|
||||||
|
|
|
@ -50,8 +50,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.DbClient;
|
import eu.dnetlib.dhp.common.DbClient;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||||
|
@ -106,6 +104,9 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
final String dbPassword = parser.get("postgresPassword");
|
final String dbPassword = parser.get("postgresPassword");
|
||||||
log.info("postgresPassword: xxx");
|
log.info("postgresPassword: xxx");
|
||||||
|
|
||||||
|
final String dbSchema = parser.get("dbschema");
|
||||||
|
log.info("dbSchema {}: " + dbSchema);
|
||||||
|
|
||||||
final String isLookupUrl = parser.get("isLookupUrl");
|
final String isLookupUrl = parser.get("isLookupUrl");
|
||||||
log.info("isLookupUrl: {}", isLookupUrl);
|
log.info("isLookupUrl: {}", isLookupUrl);
|
||||||
|
|
||||||
|
@ -125,7 +126,11 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
||||||
|
|
||||||
log.info("Processing projects...");
|
log.info("Processing projects...");
|
||||||
|
if (dbSchema.equalsIgnoreCase("beta")) {
|
||||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
||||||
|
} else {
|
||||||
|
smdbe.execute("queryProjects_production.sql", smdbe::processProject);
|
||||||
|
}
|
||||||
|
|
||||||
log.info("Processing orgs...");
|
log.info("Processing orgs...");
|
||||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
||||||
|
|
|
@ -9,7 +9,15 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils;
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
|
|
||||||
public class OafMapperUtils {
|
public class OafMapperUtils {
|
||||||
|
@ -89,7 +97,9 @@ public class OafMapperUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static StructuredProperty structuredProperty(
|
public static StructuredProperty structuredProperty(
|
||||||
final String value, final Qualifier qualifier, final DataInfo dataInfo) {
|
final String value,
|
||||||
|
final Qualifier qualifier,
|
||||||
|
final DataInfo dataInfo) {
|
||||||
if (value == null) {
|
if (value == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -192,8 +202,12 @@ public class OafMapperUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String createOpenaireId(
|
public static String createOpenaireId(
|
||||||
final int prefix, final String originalId, final boolean to_md5) {
|
final int prefix,
|
||||||
if (to_md5) {
|
final String originalId,
|
||||||
|
final boolean to_md5) {
|
||||||
|
if (StringUtils.isBlank(originalId)) {
|
||||||
|
return null;
|
||||||
|
} else if (to_md5) {
|
||||||
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
|
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
|
||||||
final String rest = StringUtils.substringAfter(originalId, "::");
|
final String rest = StringUtils.substringAfter(originalId, "::");
|
||||||
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
|
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
|
||||||
|
@ -203,7 +217,9 @@ public class OafMapperUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String createOpenaireId(
|
public static String createOpenaireId(
|
||||||
final String type, final String originalId, final boolean to_md5) {
|
final String type,
|
||||||
|
final String originalId,
|
||||||
|
final boolean to_md5) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case "datasource":
|
case "datasource":
|
||||||
return createOpenaireId(10, originalId, to_md5);
|
return createOpenaireId(10, originalId, to_md5);
|
||||||
|
|
|
@ -34,5 +34,11 @@
|
||||||
"paramLongName": "isLookupUrl",
|
"paramLongName": "isLookupUrl",
|
||||||
"paramDescription": "the url of the ISLookupService",
|
"paramDescription": "the url of the ISLookupService",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "dbschema",
|
||||||
|
"paramLongName": "dbschema",
|
||||||
|
"paramDescription": "the database schema according to the D-Net infrastructure (beta or production)",
|
||||||
|
"paramRequired": true
|
||||||
}
|
}
|
||||||
]
|
]
|
|
@ -25,6 +25,11 @@
|
||||||
<property>
|
<property>
|
||||||
<name>postgresPassword</name>
|
<name>postgresPassword</name>
|
||||||
<description>the password postgres</description>
|
<description>the password postgres</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>dbSchema</name>
|
||||||
|
<value>beta</value>
|
||||||
|
<description>the database schema according to the D-Net infrastructure (beta or production)</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>mongoURL</name>
|
<name>mongoURL</name>
|
||||||
|
@ -125,6 +130,7 @@
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>claims</arg>
|
<arg>--action</arg><arg>claims</arg>
|
||||||
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF_claims"/>
|
<ok to="ImportODF_claims"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
@ -175,6 +181,7 @@
|
||||||
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
<arg>--postgresUser</arg><arg>${postgresUser}</arg>
|
||||||
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
</java>
|
</java>
|
||||||
<ok to="ImportODF"/>
|
<ok to="ImportODF"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
|
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
|
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
|
|
||||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 {
|
||||||
Class<E> clazz,
|
Class<E> clazz,
|
||||||
String outputPath) {
|
String outputPath) {
|
||||||
|
|
||||||
Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
|
Dataset<Tuple2<String, Relation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
|
||||||
.filter("dataInfo.deletedbyinference == false")
|
.filter("dataInfo.deletedbyinference == false")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r),
|
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(),
|
||||||
Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)))
|
r),
|
||||||
|
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
|
||||||
.cache();
|
.cache();
|
||||||
|
|
||||||
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
|
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
|
||||||
|
@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 {
|
||||||
relsByTarget
|
relsByTarget
|
||||||
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
|
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
|
(MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
|
||||||
t._1()._2(), t._2()._2()),
|
t._1()._2(), t._2()._2()),
|
||||||
Encoders.kryo(RelatedEntityWrapper.class))
|
Encoders.kryo(RelatedEntityWrapper.class))
|
||||||
.write()
|
.write()
|
||||||
|
@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 {
|
||||||
* @param relationPath
|
* @param relationPath
|
||||||
* @return the Dataset<SortableRelation> containing all the relationships
|
* @return the Dataset<SortableRelation> containing all the relationships
|
||||||
*/
|
*/
|
||||||
private static Dataset<SortableRelation> readPathRelation(
|
private static Dataset<Relation> readPathRelation(
|
||||||
SparkSession spark, final String relationPath) {
|
SparkSession spark, final String relationPath) {
|
||||||
|
|
||||||
log.info("Reading relations from: {}", relationPath);
|
log.info("Reading relations from: {}", relationPath);
|
||||||
return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class));
|
return spark.read().load(relationPath).as(Encoders.bean(Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
|
|
@ -3,35 +3,31 @@ package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.HashSet;
|
||||||
import java.util.function.Function;
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.*;
|
|
||||||
import org.apache.spark.rdd.RDD;
|
import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.clearspring.analytics.util.Lists;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Iterators;
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
|
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
|
||||||
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
|
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
|
||||||
import scala.Function1;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -133,22 +129,35 @@ public class PrepareRelationsJob {
|
||||||
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
|
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
|
||||||
int relPartitions) {
|
int relPartitions) {
|
||||||
|
|
||||||
RDD<SortableRelation> cappedRels = readPathRelationRDD(spark, inputRelationsPath)
|
|
||||||
.repartition(relPartitions)
|
|
||||||
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
|
|
||||||
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
|
|
||||||
// group by SOURCE and apply limit
|
// group by SOURCE and apply limit
|
||||||
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel))
|
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
|
||||||
.groupByKey(new RelationPartitioner(relPartitions))
|
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
|
||||||
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
|
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
|
||||||
|
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
|
||||||
|
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
|
||||||
|
.groupBy(Tuple2::_1)
|
||||||
|
.map(Tuple2::_2)
|
||||||
|
.map(t -> Iterables.limit(t, maxRelations))
|
||||||
|
.flatMap(Iterable::iterator)
|
||||||
|
.map(Tuple2::_2)
|
||||||
|
.rdd();
|
||||||
|
|
||||||
// group by TARGET and apply limit
|
// group by TARGET and apply limit
|
||||||
.mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel))
|
RDD<Relation> byTarget = readPathRelationRDD(spark, inputRelationsPath)
|
||||||
.groupByKey(new RelationPartitioner(relPartitions))
|
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
|
||||||
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
|
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
|
||||||
|
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r))
|
||||||
|
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
|
||||||
|
.groupBy(Tuple2::_1)
|
||||||
|
.map(Tuple2::_2)
|
||||||
|
.map(t -> Iterables.limit(t, maxRelations))
|
||||||
|
.flatMap(Iterable::iterator)
|
||||||
|
.map(Tuple2::_2)
|
||||||
.rdd();
|
.rdd();
|
||||||
|
|
||||||
spark
|
spark
|
||||||
.createDataset(cappedRels, Encoders.bean(SortableRelation.class))
|
.createDataset(bySource.union(byTarget), Encoders.bean(Relation.class))
|
||||||
|
.repartition(relPartitions)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.parquet(outputPath);
|
.parquet(outputPath);
|
||||||
|
@ -162,10 +171,10 @@ public class PrepareRelationsJob {
|
||||||
* @param inputPath
|
* @param inputPath
|
||||||
* @return the JavaRDD<SortableRelation> containing all the relationships
|
* @return the JavaRDD<SortableRelation> containing all the relationships
|
||||||
*/
|
*/
|
||||||
private static JavaRDD<SortableRelation> readPathRelationRDD(
|
private static JavaRDD<Relation> readPathRelationRDD(
|
||||||
SparkSession spark, final String inputPath) {
|
SparkSession spark, final String inputPath) {
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class));
|
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
private static void removeOutputDir(SparkSession spark, String path) {
|
||||||
|
|
|
@ -19,7 +19,7 @@ public class ProvisionModelSupport {
|
||||||
RelatedEntityWrapper.class,
|
RelatedEntityWrapper.class,
|
||||||
JoinedEntity.class,
|
JoinedEntity.class,
|
||||||
RelatedEntity.class,
|
RelatedEntity.class,
|
||||||
SortableRelation.class));
|
SortableRelationKey.class));
|
||||||
return modelClasses.toArray(new Class[] {});
|
return modelClasses.toArray(new Class[] {});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,28 +5,30 @@ import java.io.Serializable;
|
||||||
|
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
public class RelatedEntityWrapper implements Serializable {
|
public class RelatedEntityWrapper implements Serializable {
|
||||||
|
|
||||||
private SortableRelation relation;
|
private Relation relation;
|
||||||
private RelatedEntity target;
|
private RelatedEntity target;
|
||||||
|
|
||||||
public RelatedEntityWrapper() {
|
public RelatedEntityWrapper() {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) {
|
public RelatedEntityWrapper(Relation relation, RelatedEntity target) {
|
||||||
this(null, relation, target);
|
this(null, relation, target);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) {
|
public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) {
|
||||||
this.relation = relation;
|
this.relation = relation;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SortableRelation getRelation() {
|
public Relation getRelation() {
|
||||||
return relation;
|
return relation;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setRelation(SortableRelation relation) {
|
public void setRelation(Relation relation) {
|
||||||
this.relation = relation;
|
this.relation = relation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,38 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.provision.model;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import com.google.common.collect.ComparisonChain;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
|
|
||||||
public class SortableRelation extends Relation implements Comparable<Relation>, Serializable {
|
|
||||||
|
|
||||||
private static final Map<String, Integer> weights = Maps.newHashMap();
|
|
||||||
|
|
||||||
static {
|
|
||||||
weights.put("outcome", 0);
|
|
||||||
weights.put("supplement", 1);
|
|
||||||
weights.put("affiliation", 2);
|
|
||||||
weights.put("relationship", 3);
|
|
||||||
weights.put("publicationDataset", 4);
|
|
||||||
weights.put("similarity", 5);
|
|
||||||
|
|
||||||
weights.put("provision", 6);
|
|
||||||
weights.put("participation", 7);
|
|
||||||
weights.put("dedup", 8);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Relation o) {
|
|
||||||
return ComparisonChain
|
|
||||||
.start()
|
|
||||||
.compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
|
|
||||||
.compare(getSource(), o.getSource())
|
|
||||||
.compare(getTarget(), o.getTarget())
|
|
||||||
.result();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,90 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.provision.model;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import com.google.common.base.Objects;
|
||||||
|
import com.google.common.collect.ComparisonChain;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
public class SortableRelationKey implements Comparable<SortableRelationKey>, Serializable {
|
||||||
|
|
||||||
|
private static final Map<String, Integer> weights = Maps.newHashMap();
|
||||||
|
|
||||||
|
static {
|
||||||
|
weights.put("outcome", 0);
|
||||||
|
weights.put("supplement", 1);
|
||||||
|
weights.put("review", 2);
|
||||||
|
weights.put("citation", 3);
|
||||||
|
weights.put("affiliation", 4);
|
||||||
|
weights.put("relationship", 5);
|
||||||
|
weights.put("publicationDataset", 6);
|
||||||
|
weights.put("similarity", 7);
|
||||||
|
|
||||||
|
weights.put("provision", 8);
|
||||||
|
weights.put("participation", 9);
|
||||||
|
weights.put("dedup", 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 3232323;
|
||||||
|
|
||||||
|
private String groupingKey;
|
||||||
|
|
||||||
|
private String subRelType;
|
||||||
|
|
||||||
|
public static SortableRelationKey create(Relation r, String groupingKey) {
|
||||||
|
SortableRelationKey sr = new SortableRelationKey();
|
||||||
|
sr.setGroupingKey(groupingKey);
|
||||||
|
sr.setSubRelType(r.getSubRelType());
|
||||||
|
return sr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o)
|
||||||
|
return true;
|
||||||
|
if (o == null || getClass() != o.getClass())
|
||||||
|
return false;
|
||||||
|
SortableRelationKey that = (SortableRelationKey) o;
|
||||||
|
return getGroupingKey().equals(that.getGroupingKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hashCode(getGroupingKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(SortableRelationKey o) {
|
||||||
|
return ComparisonChain
|
||||||
|
.start()
|
||||||
|
.compare(getGroupingKey(), o.getGroupingKey())
|
||||||
|
.compare(getWeight(this), getWeight(o))
|
||||||
|
.result();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer getWeight(SortableRelationKey o) {
|
||||||
|
return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSubRelType() {
|
||||||
|
return subRelType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSubRelType(String subRelType) {
|
||||||
|
this.subRelType = subRelType;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getGroupingKey() {
|
||||||
|
return groupingKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setGroupingKey(String groupingKey) {
|
||||||
|
this.groupingKey = groupingKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.provision.utils;
|
||||||
import org.apache.spark.Partitioner;
|
import org.apache.spark.Partitioner;
|
||||||
import org.apache.spark.util.Utils;
|
import org.apache.spark.util.Utils;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
|
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
|
||||||
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
|
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
|
||||||
*/
|
*/
|
||||||
public class RelationPartitioner extends Partitioner {
|
public class RelationPartitioner extends Partitioner {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 343434456L;
|
||||||
|
|
||||||
private final int numPartitions;
|
private final int numPartitions;
|
||||||
|
|
||||||
public RelationPartitioner(int numPartitions) {
|
public RelationPartitioner(int numPartitions) {
|
||||||
|
@ -23,8 +27,18 @@ public class RelationPartitioner extends Partitioner {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getPartition(Object key) {
|
public int getPartition(Object key) {
|
||||||
String partitionKey = (String) key;
|
SortableRelationKey partitionKey = (SortableRelationKey) key;
|
||||||
return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions());
|
return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (obj instanceof RelationPartitioner) {
|
||||||
|
RelationPartitioner p = (RelationPartitioner) obj;
|
||||||
|
if (p.numPartitions() == numPartitions())
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable {
|
||||||
pidType,
|
pidType,
|
||||||
pidValue
|
pidValue
|
||||||
.toLowerCase()
|
.toLowerCase()
|
||||||
.replaceAll("orcid", "")));
|
.replaceAll("^.*orcid\\.org\\/", "")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.provision;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
public class SortableRelationKeyTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void doTesSorting() throws IOException {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json"));
|
||||||
|
final List<Relation> relations = mapper.readValue(json, new TypeReference<List<Relation>>() {
|
||||||
|
});
|
||||||
|
|
||||||
|
relations
|
||||||
|
.stream()
|
||||||
|
.map(r -> SortableRelationKey.create(r, r.getSource()))
|
||||||
|
.sorted()
|
||||||
|
.forEach(
|
||||||
|
|
||||||
|
it -> {
|
||||||
|
try {
|
||||||
|
System.out.println(mapper.writeValueAsString(it));
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,90 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"collectedfrom": [],
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:entityregistry",
|
||||||
|
"classname": "Harvested",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"lastupdatetimestamp": 1592688952862,
|
||||||
|
"properties": [],
|
||||||
|
"relClass": "hasAuthorInstitution",
|
||||||
|
"relType": "resultOrganization",
|
||||||
|
"source": "1",
|
||||||
|
"subRelType": "affiliation",
|
||||||
|
"target": "2"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"collectedfrom": [],
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:entityregistry",
|
||||||
|
"classname": "Harvested",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"lastupdatetimestamp": 1592688952862,
|
||||||
|
"properties": [],
|
||||||
|
"relClass": "isAuthorInstitutionOf",
|
||||||
|
"relType": "resultOrganization",
|
||||||
|
"source": "2",
|
||||||
|
"subRelType": "affiliation",
|
||||||
|
"target": "1"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"collectedfrom": [],
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:entityregistry",
|
||||||
|
"classname": "Harvested",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"lastupdatetimestamp": 1592688952862,
|
||||||
|
"properties": [],
|
||||||
|
"relClass": "isProducedBy",
|
||||||
|
"relType": "resultProject",
|
||||||
|
"source": "1",
|
||||||
|
"subRelType": "outcome",
|
||||||
|
"target": "2"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"collectedfrom": [],
|
||||||
|
"dataInfo": {
|
||||||
|
"deletedbyinference": false,
|
||||||
|
"inferred": false,
|
||||||
|
"invisible": false,
|
||||||
|
"provenanceaction": {
|
||||||
|
"classid": "sysimport:crosswalk:entityregistry",
|
||||||
|
"classname": "Harvested",
|
||||||
|
"schemeid": "dnet:provenanceActions",
|
||||||
|
"schemename": "dnet:provenanceActions"
|
||||||
|
},
|
||||||
|
"trust": "0.9"
|
||||||
|
},
|
||||||
|
"lastupdatetimestamp": 1592688952862,
|
||||||
|
"properties": [],
|
||||||
|
"relClass": "produces",
|
||||||
|
"relType": "resultProject",
|
||||||
|
"source": "2",
|
||||||
|
"subRelType": "outcome",
|
||||||
|
"target": "1"
|
||||||
|
}
|
||||||
|
]
|
Loading…
Reference in New Issue