merge with upstream

This commit is contained in:
Miriam Baglioni 2020-05-15 15:53:05 +02:00
commit e26a67c3eb
184 changed files with 2563 additions and 2052 deletions

View File

@ -13,6 +13,7 @@ public class ModelConstants {
public static final String DNET_DATA_CITE_DATE = "dnet:dataCite_date";
public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
@ -49,6 +50,13 @@ public class ModelConstants {
public static final String HAS_PARTICIPANT = "hasParticipant";
public static final String IS_PARTICIPANT = "isParticipant";
public static final String RESULT_ORGANIZATION = "resultOrganization";
public static final String AFFILIATION = "affiliation";
public static final String IS_AUTHOR_INSTITUTION_OF = "isAuthorInstitutionOf";
public static final String HAS_AUTHOR_INSTITUTION = "hasAuthorInstitution";
public static final String MERGES = "merges";
public static final String UNKNOWN = "UNKNOWN";
public static final String NOT_AVAILABLE = "not available";

View File

@ -1,9 +1,15 @@
package eu.dnetlib.dhp.schema.common;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
@ -378,6 +384,21 @@ public class ModelSupport {
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static <T extends Oaf> String tableIdentifier(String dbName, String tableName) {
checkArgument(StringUtils.isNotBlank(dbName), "DB name cannot be empty");
checkArgument(StringUtils.isNotBlank(tableName), "table name cannot be empty");
return String.format("%s.%s", dbName, tableName);
}
public static <T extends Oaf> String tableIdentifier(String dbName, Class<T> clazz) {
checkArgument(Objects.nonNull(clazz), "clazz is needed to derive the table name, thus cannot be null");
return tableIdentifier(dbName, clazz.getSimpleName().toLowerCase());
}
public static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {

View File

@ -523,7 +523,9 @@ public class ProtoConverter implements Serializable {
}
private static Context mapContext(ResultProtos.Result.Context context) {
if (context == null || StringUtils.isBlank(context.getId())) {
return null;
}
final Context entity = new Context();
entity.setId(context.getId());
entity
@ -537,6 +539,10 @@ public class ProtoConverter implements Serializable {
}
public static KeyValue mapKV(FieldTypeProtos.KeyValue kv) {
if (kv == null || StringUtils.isBlank(kv.getKey()) & StringUtils.isBlank(kv.getValue())) {
return null;
}
final KeyValue keyValue = new KeyValue();
keyValue.setKey(kv.getKey());
keyValue.setValue(kv.getValue());
@ -575,6 +581,10 @@ public class ProtoConverter implements Serializable {
}
public static StructuredProperty mapStructuredProperty(FieldTypeProtos.StructuredProperty sp) {
if (sp == null | StringUtils.isBlank(sp.getValue())) {
return null;
}
final StructuredProperty structuredProperty = new StructuredProperty();
structuredProperty.setValue(sp.getValue());
structuredProperty.setQualifier(mapQualifier(sp.getQualifier()));
@ -611,6 +621,10 @@ public class ProtoConverter implements Serializable {
}
public static Field<String> mapStringField(FieldTypeProtos.StringField s) {
if (s == null || StringUtils.isBlank(s.getValue())) {
return null;
}
final Field<String> stringField = new Field<>();
stringField.setValue(s.getValue());
stringField.setDataInfo(mapDataInfo(s.getDataInfo()));
@ -618,19 +632,16 @@ public class ProtoConverter implements Serializable {
}
public static Field<Boolean> mapBoolField(FieldTypeProtos.BoolField b) {
if (b == null) {
return null;
}
final Field<Boolean> booleanField = new Field<>();
booleanField.setValue(b.getValue());
booleanField.setDataInfo(mapDataInfo(b.getDataInfo()));
return booleanField;
}
public static Field<Integer> mapIntField(FieldTypeProtos.IntField b) {
final Field<Integer> entity = new Field<>();
entity.setValue(b.getValue());
entity.setDataInfo(mapDataInfo(b.getDataInfo()));
return entity;
}
public static Journal mapJournal(FieldTypeProtos.Journal j) {
final Journal journal = new Journal();
journal.setConferencedate(j.getConferencedate());

View File

@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareMergedRelationJob {
@ -56,6 +57,7 @@ public class PrepareMergedRelationJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
selectMergesRelations(
spark,
inputPath,
@ -73,19 +75,6 @@ public class PrepareMergedRelationJob {
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
// relation.createOrReplaceTempView("relation");
//
// spark
// .sql(
// "Select * from relation " +
// "where relclass = 'merges' " +
// "and datainfo.deletedbyinference = false")
// .as(Encoders.bean(Relation.class))
// .toJSON()
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .text(outputPath);
}
public static org.apache.spark.sql.Dataset<Relation> readRelations(
@ -97,4 +86,9 @@ public class PrepareMergedRelationJob {
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -65,8 +65,7 @@ public class ReadBlacklistFromDB implements Closeable {
}
}
public void execute(final String sql, final Function<ResultSet, List<Relation>> producer)
throws Exception {
public void execute(final String sql, final Function<ResultSet, List<Relation>> producer) throws Exception {
final Consumer<ResultSet> consumer = rs -> producer.apply(rs).forEach(r -> writeRelation(r));

View File

@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
@ -62,6 +63,7 @@ public class SparkRemoveBlacklistedRelationJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
removeBlacklistedRelations(
spark,
blacklistPath,
@ -69,7 +71,6 @@ public class SparkRemoveBlacklistedRelationJob {
outputPath,
mergesPath);
});
}
private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath,
@ -84,7 +85,7 @@ public class SparkRemoveBlacklistedRelationJob {
.joinWith(
mergesRelation, blackListed.col("source").equalTo(mergesRelation.col("target")),
"left_outer")
.map(c -> {
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) c -> {
Optional
.ofNullable(c._2())
.ifPresent(mr -> c._1().setSource(mr.getSource()));
@ -95,7 +96,7 @@ public class SparkRemoveBlacklistedRelationJob {
.joinWith(
mergesRelation, dedupSource.col("target").equalTo(mergesRelation.col("target")),
"left_outer")
.map(c -> {
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) c -> {
Optional
.ofNullable(c._2())
.ifPresent(mr -> c._1().setTarget(mr.getSource()));
@ -107,7 +108,6 @@ public class SparkRemoveBlacklistedRelationJob {
.mode(SaveMode.Overwrite)
.json(blacklistPath + "/deduped");
inputRelation
.joinWith(
dedupBL, (inputRelation
@ -118,7 +118,7 @@ public class SparkRemoveBlacklistedRelationJob {
.col("target")
.equalTo(dedupBL.col("target")))),
"left_outer")
.map(c -> {
.map((MapFunction<Tuple2<Relation, Relation>, Relation>) c -> {
Relation ir = c._1();
Optional<Relation> obl = Optional.ofNullable(c._2());
if (obl.isPresent()) {
@ -127,17 +127,14 @@ public class SparkRemoveBlacklistedRelationJob {
}
}
return ir;
}, Encoders.bean(Relation.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
public static org.apache.spark.sql.Dataset<Relation> readRelations(
SparkSession spark, String inputPath) {
return spark
@ -148,4 +145,8 @@ public class SparkRemoveBlacklistedRelationJob {
Encoders.bean(Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}

View File

@ -1,5 +1,5 @@
<workflow-app name="blacklisting" xmlns="uri:oozie:workflow:0.5">
<parameters>
<workflow-app name="blacklist_relations" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>postgresURL</name>
<description>the url of the postgress server to query</description>
@ -18,28 +18,121 @@
</property>
<property>
<name>outputPath</name>
<description>the path were to store the graph without the blacklisted relations</description>
<description>the graph output path</description>
</property>
</parameters>
<start to="reset-outputpath"/>
</parameters>
<kill name="Kill">
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
</kill>
<action name="reset-outputpath">
<action name="reset_outputpath">
<fs>
<delete path="${outputPath}"/>
<mkdir path="${outputPath}"/>
</fs>
<ok to="read_blacklist"/>
<ok to="copy_entities"/>
<error to="Kill"/>
</action>
<fork name="copy_entities">
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_orp"/>
<path start="copy_software"/>
<path start="copy_datasource"/>
<path start="copy_project"/>
<path start="copy_organization"/>
</fork>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${outputPath}/publication</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${outputPath}/dataset</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${outputPath}/otherresearchproduct</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${outputPath}/software</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
<ok to="wait"/>
<error to="Kill"/>
</action>
<join name="wait" to="read_blacklist"/>
<action name="read_blacklist">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.blacklist.ReadBlacklistFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/blacklist</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
@ -66,6 +159,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/mergesRelation</arg>
@ -90,6 +184,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
@ -99,5 +194,7 @@
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
<end name="End"/>
</workflow-app>

View File

@ -29,31 +29,32 @@ public class EventFactory {
"yyyy-MM-dd"
};
public static Event newBrokerEvent(final Result source, final Result target, final UpdateInfo<?> updateInfo) {
public static Event newBrokerEvent(final UpdateInfo<?> updateInfo) {
final long now = new Date().getTime();
final Event res = new Event();
final Map<String, Object> map = createMapFromResult(target, source, updateInfo);
final Map<String, Object> map = createMapFromResult(updateInfo);
final String payload = createPayload(target, updateInfo);
final String payload = createPayload(updateInfo);
final String eventId = calculateEventId(
updateInfo.getTopic(), target.getOriginalId().get(0), updateInfo.getHighlightValueAsString());
updateInfo.getTopicPath(), updateInfo.getTarget().getOriginalId().get(0),
updateInfo.getHighlightValueAsString());
res.setEventId(eventId);
res.setProducerId(PRODUCER_ID);
res.setPayload(payload);
res.setMap(map);
res.setTopic(updateInfo.getTopic());
res.setTopic(updateInfo.getTopicPath());
res.setCreationDate(now);
res.setExpiryDate(calculateExpiryDate(now));
res.setInstantMessage(false);
return res;
}
private static String createPayload(final Result result, final UpdateInfo<?> updateInfo) {
private static String createPayload(final UpdateInfo<?> updateInfo) {
final OpenAireEventPayload payload = new OpenAireEventPayload();
// TODO
@ -62,32 +63,34 @@ public class EventFactory {
return payload.toJSON();
}
private static Map<String, Object> createMapFromResult(final Result oaf, final Result source,
final UpdateInfo<?> updateInfo) {
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
final Map<String, Object> map = new HashMap<>();
final List<KeyValue> collectedFrom = oaf.getCollectedfrom();
final Result source = updateInfo.getSource();
final Result target = updateInfo.getTarget();
final List<KeyValue> collectedFrom = target.getCollectedfrom();
if (collectedFrom.size() == 1) {
map.put("target_datasource_id", collectedFrom.get(0).getKey());
map.put("target_datasource_name", collectedFrom.get(0).getValue());
}
final List<String> ids = oaf.getOriginalId();
final List<String> ids = target.getOriginalId();
if (ids.size() > 0) {
map.put("target_publication_id", ids.get(0));
}
final List<StructuredProperty> titles = oaf.getTitle();
final List<StructuredProperty> titles = target.getTitle();
if (titles.size() > 0) {
map.put("target_publication_title", titles.get(0));
}
final long date = parseDateTolong(oaf.getDateofacceptance().getValue());
final long date = parseDateTolong(target.getDateofacceptance().getValue());
if (date > 0) {
map.put("target_dateofacceptance", date);
}
final List<StructuredProperty> subjects = oaf.getSubject();
final List<StructuredProperty> subjects = target.getSubject();
if (subjects.size() > 0) {
map
.put(
@ -95,7 +98,7 @@ public class EventFactory {
subjects.stream().map(StructuredProperty::getValue).collect(Collectors.toList()));
}
final List<Author> authors = oaf.getAuthor();
final List<Author> authors = target.getAuthor();
if (authors.size() > 0) {
map
.put(

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.broker.model;
public enum Topic {
// ENRICHMENT MISSING
ENRICH_MISSING_OA_VERSION("ENRICH/MISSING/OPENACCESS_VERSION"), ENRICH_MISSING_ABSTRACT(
"ENRICH/MISSING/ABSTRACT"), ENRICH_MISSING_PUBLICATION_DATE(
"ENRICH/MISSING/PUBLICATION_DATE"), ENRICH_MISSING_PID(
"ENRICH/MISSING/PID"), ENRICH_MISSING_PROJECT("ENRICH/MISSING/PROJECT"), ENRICH_MISSING_SOFTWARE(
"ENRICH/MISSING/SOFTWARE"), ENRICH_MISSING_SUBJECT_MESHEUROPMC(
"ENRICH/MISSING/SUBJECT/MESHEUROPMC"), ENRICH_MISSING_SUBJECT_ARXIV(
"ENRICH/MISSING/SUBJECT/ARXIV"), ENRICH_MISSING_SUBJECT_JEL(
"ENRICH/MISSING/SUBJECT/JEL"), ENRICH_MISSING_SUBJECT_DDC(
"ENRICH/MISSING/SUBJECT/DDC"), ENRICH_MISSING_SUBJECT_ACM(
"ENRICH/MISSING/SUBJECT/ACM"), ENRICH_MISSING_SUBJECT_RVK(
"ENRICH/MISSING/SUBJECT/RVK"), ENRICH_MISSING_AUTHOR_ORCID(
"ENRICH/MISSING/AUTHOR/ORCID"),
// ENRICHMENT MORE
ENRICH_MORE_PID("ENRICH/MORE/PID"), ENRICH_MORE_OA_VERSION("ENRICH/MORE/OPENACCESS_VERSION"), ENRICH_MORE_ABSTRACT(
"ENRICH/MORE/ABSTRACT"), ENRICH_MORE_PUBLICATION_DATE("ENRICH/MORE/PUBLICATION_DATE"), ENRICH_MORE_PROJECT(
"ENRICH/MORE/PROJECT"), ENRICH_MORE_SUBJECT_MESHEUROPMC(
"ENRICH/MORE/SUBJECT/MESHEUROPMC"), ENRICH_MORE_SUBJECT_ARXIV(
"ENRICH/MORE/SUBJECT/ARXIV"), ENRICH_MORE_SUBJECT_JEL(
"ENRICH/MORE/SUBJECT/JEL"), ENRICH_MORE_SUBJECT_DDC(
"ENRICH/MORE/SUBJECT/DDC"), ENRICH_MORE_SUBJECT_ACM(
"ENRICH/MORE/SUBJECT/ACM"), ENRICH_MORE_SUBJECT_RVK("ENRICH/MORE/SUBJECT/RVK"),
// ADDITION
ADD_BY_PROJECT("ADD/BY_PROJECT");
Topic(final String path) {
this.path = path;
}
protected String path;
public String getPath() {
return this.path;
}
public static Topic fromPath(final String path) {
for (final Topic t : Topic.values()) {
if (t.getPath().equals(path)) {
return t;
}
}
return null;
}
}

View File

@ -14,21 +14,20 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.util.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.util.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.util.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAbstract;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingAuthorOrcid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingProject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingPublicationDate;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMissingSubject;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreOpenAccess;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMorePid;
import eu.dnetlib.dhp.broker.oa.matchers.EnrichMoreSubject;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
@ -37,7 +36,16 @@ public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final UpdateMatcher<?> enrichMissingAbstract = new EnrichMissingAbstract();
private static final UpdateMatcher<?> enrichMissingAuthorOrcid = new EnrichMissingAuthorOrcid();
private static final UpdateMatcher<?> enrichMissingOpenAccess = new EnrichMissingOpenAccess();
private static final UpdateMatcher<?> enrichMissingPid = new EnrichMissingPid();
private static final UpdateMatcher<?> enrichMissingProject = new EnrichMissingProject();
private static final UpdateMatcher<?> enrichMissingPublicationDate = new EnrichMissingPublicationDate();
private static final UpdateMatcher<?> enrichMissingSubject = new EnrichMissingSubject();
private static final UpdateMatcher<?> enrichMoreOpenAccess = new EnrichMoreOpenAccess();
private static final UpdateMatcher<?> enrichMorePid = new EnrichMorePid();
private static final UpdateMatcher<?> enrichMoreSubject = new EnrichMoreSubject();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
@ -76,37 +84,22 @@ public class GenerateEventsApplication {
}
private List<Event> generateEvents(final Result... children) {
final List<Event> list = new ArrayList<>();
for (final Result source : children) {
for (final Result target : children) {
if (source != target) {
list
.addAll(
findUpdates(source, target)
.stream()
.map(info -> EventFactory.newBrokerEvent(source, target, info))
.collect(Collectors.toList()));
}
}
}
return list;
}
private List<UpdateInfo<?>> findUpdates(final Result source, final Result target) {
final List<UpdateInfo<?>> list = new ArrayList<>();
list.addAll(EnrichMissingAbstract.findUpdates(source, target));
list.addAll(EnrichMissingAuthorOrcid.findUpdates(source, target));
list.addAll(EnrichMissingOpenAccess.findUpdates(source, target));
list.addAll(EnrichMissingPid.findUpdates(source, target));
list.addAll(EnrichMissingProject.findUpdates(source, target));
list.addAll(EnrichMissingPublicationDate.findUpdates(source, target));
list.addAll(EnrichMissingSubject.findUpdates(source, target));
list.addAll(EnrichMoreOpenAccess.findUpdates(source, target));
list.addAll(EnrichMorePid.findUpdates(source, target));
list.addAll(EnrichMoreSubject.findUpdates(source, target));
return list;
for (final Result target : children) {
list.addAll(enrichMissingAbstract.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingAuthorOrcid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPid.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingProject.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingPublicationDate.searchUpdatesForRecord(target, children));
list.addAll(enrichMissingSubject.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreOpenAccess.searchUpdatesForRecord(target, children));
list.addAll(enrichMorePid.searchUpdatesForRecord(target, children));
list.addAll(enrichMoreSubject.searchUpdatesForRecord(target, children));
}
return list.stream().map(EventFactory::newBrokerEvent).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAbstract extends UpdateMatcher<String> {
public EnrichMissingAbstract() {
super(false);
}
@Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target) {
if (isMissing(target.getDescription()) && !isMissing(source.getDescription())) {
return Arrays.asList(generateUpdateInfo(source.getDescription().get(0).getValue(), source, target));
}
return new ArrayList<>();
}
@Override
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_ABSTRACT,
highlightValue, source, target,
(p, s) -> p.getAbstracts().add(s),
s -> s);
}
}

View File

@ -0,0 +1,34 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Pair<String, String>> {
public EnrichMissingAuthorOrcid() {
super(true);
}
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source, final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_AUTHOR_ORCID,
highlightValue, source, target,
(p, pair) -> p.getCreators().add(pair.getLeft() + " - ORCID: " + pair.getRight()),
pair -> pair.getLeft() + "::" + pair.getRight());
}
}

View File

@ -0,0 +1,55 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> {
public EnrichMissingOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
final long count = target
.getInstance()
.stream()
.map(i -> i.getAccessright().getClassid())
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count();
if (count > 0) {
return Arrays.asList();
}
return source
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(s -> s)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -0,0 +1,45 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPid extends UpdateMatcher<Pid> {
public EnrichMissingPid() {
super(true);
}
@Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target) {
final long count = target.getPid().size();
if (count > 0) {
return Arrays.asList();
}
return source
.getPid()
.stream()
.map(ConversionUtils::oafPidToBrokerPid)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PID,
highlightValue, source, target,
(p, pid) -> p.getPids().add(pid),
pid -> pid.getType() + "::" + pid.getValue());
}
}

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingProject extends UpdateMatcher<Project> {
public EnrichMissingProject() {
super(true);
}
@Override
protected List<UpdateInfo<Project>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
@Override
public UpdateInfo<Project> generateUpdateInfo(final Project highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PROJECT,
highlightValue, source, target,
(p, prj) -> p.getProjects().add(prj),
prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode());
}
}

View File

@ -0,0 +1,33 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
public EnrichMissingPublicationDate() {
super(false);
}
@Override
protected List<UpdateInfo<String>> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
@Override
public UpdateInfo<String> generateUpdateInfo(final String highlightValue, final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MISSING_PUBLICATION_DATE,
highlightValue, source, target,
(p, date) -> p.setPublicationdate(date),
s -> s);
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class EnrichMissingSubject extends UpdateMatcher<Pair<String, String>> {
public EnrichMissingSubject() {
super(true);
}
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
final Set<String> existingTypes = target
.getSubject()
.stream()
.map(StructuredProperty::getQualifier)
.map(Qualifier::getClassid)
.collect(Collectors.toSet());
return source
.getPid()
.stream()
.filter(pid -> !existingTypes.contains(pid.getQualifier().getClassid()))
.map(ConversionUtils::oafSubjectToPair)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.fromPath("ENRICH/MISSING/SUBJECT/" + highlightValue.getLeft()),
highlightValue, source, target,
(p, pair) -> p.getSubjects().add(pair.getRight()),
pair -> pair.getLeft() + "::" + pair.getRight());
}
}

View File

@ -0,0 +1,53 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> {
public EnrichMoreOpenAccess() {
super(true);
}
@Override
protected List<UpdateInfo<Instance>> findUpdates(final Result source, final Result target) {
final Set<String> urls = target
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(i -> i.getUrl())
.flatMap(List::stream)
.collect(Collectors.toSet());
return source
.getInstance()
.stream()
.filter(i -> i.getAccessright().getClassid().equals(BrokerConstants.OPEN_ACCESS))
.map(ConversionUtils::oafInstanceToBrokerInstances)
.flatMap(s -> s)
.filter(i -> !urls.contains(i.getUrl()))
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Instance> generateUpdateInfo(final Instance highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_OA_VERSION,
highlightValue, source, target,
(p, i) -> p.getInstances().add(i),
Instance::getUrl);
}
}

View File

@ -0,0 +1,46 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMorePid extends UpdateMatcher<Pid> {
public EnrichMorePid() {
super(true);
}
@Override
protected List<UpdateInfo<Pid>> findUpdates(final Result source, final Result target) {
final Set<String> existingPids = target
.getPid()
.stream()
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
.collect(Collectors.toSet());
return source
.getPid()
.stream()
.filter(pid -> !existingPids.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
.map(ConversionUtils::oafPidToBrokerPid)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pid> generateUpdateInfo(final Pid highlightValue, final Result source, final Result target) {
return new UpdateInfo<>(
Topic.ENRICH_MORE_PID,
highlightValue, source, target,
(p, pid) -> p.getPids().add(pid),
pid -> pid.getType() + "::" + pid.getValue());
}
}

View File

@ -0,0 +1,50 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreSubject extends UpdateMatcher<Pair<String, String>> {
public EnrichMoreSubject() {
super(true);
}
@Override
protected List<UpdateInfo<Pair<String, String>>> findUpdates(final Result source, final Result target) {
final Set<String> existingSubjects = target
.getSubject()
.stream()
.map(pid -> pid.getQualifier().getClassid() + "::" + pid.getValue())
.collect(Collectors.toSet());
return source
.getPid()
.stream()
.filter(pid -> !existingSubjects.contains(pid.getQualifier().getClassid() + "::" + pid.getValue()))
.map(ConversionUtils::oafSubjectToPair)
.map(i -> generateUpdateInfo(i, source, target))
.collect(Collectors.toList());
}
@Override
public UpdateInfo<Pair<String, String>> generateUpdateInfo(final Pair<String, String> highlightValue,
final Result source,
final Result target) {
return new UpdateInfo<>(
Topic.fromPath("ENRICH/MORE/SUBJECT/" + highlightValue.getLeft()),
highlightValue, source, target,
(p, pair) -> p.getSubjects().add(pair.getRight()),
pair -> pair.getLeft() + "::" + pair.getRight());
}
}

View File

@ -0,0 +1,64 @@
package eu.dnetlib.dhp.broker.oa.matchers;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Result;
public abstract class UpdateMatcher<T> {
private final boolean multipleUpdate;
public UpdateMatcher(final boolean multipleUpdate) {
this.multipleUpdate = multipleUpdate;
}
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final Result res, final Result... others) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
for (final Result source : others) {
if (source != res) {
for (final UpdateInfo<T> info : findUpdates(source, res)) {
final String s = DigestUtils.md5Hex(info.getHighlightValueAsString());
if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) {
} else {
infoMap.put(s, info);
}
}
}
}
final Collection<UpdateInfo<T>> values = infoMap.values();
if (values.isEmpty() || multipleUpdate) {
return values;
} else {
final UpdateInfo<T> v = values
.stream()
.sorted((o1, o2) -> Float.compare(o1.getTrust(), o2.getTrust()))
.findFirst()
.get();
return Arrays.asList(v);
}
}
protected abstract List<UpdateInfo<T>> findUpdates(Result source, Result target);
protected abstract UpdateInfo<T> generateUpdateInfo(final T highlightValue, final Result source,
final Result target);
protected static boolean isMissing(final List<Field<String>> list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0).getValue());
}
}

View File

@ -0,0 +1,7 @@
package eu.dnetlib.dhp.broker.oa.util;
public class BrokerConstants {
public final static String OPEN_ACCESS = "OPEN";
}

View File

@ -0,0 +1,36 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
public class ConversionUtils {
public static Stream<Instance> oafInstanceToBrokerInstances(final eu.dnetlib.dhp.schema.oaf.Instance i) {
return i.getUrl().stream().map(url -> {
final Instance r = new Instance();
r.setUrl(url);
r.setInstancetype(i.getInstancetype().getClassid());
r.setLicense(BrokerConstants.OPEN_ACCESS);
r.setHostedby(i.getHostedby().getValue());
return r;
});
}
public static Pid oafPidToBrokerPid(final StructuredProperty sp) {
final Pid pid = new Pid();
pid.setValue(sp.getValue());
pid.setType(sp.getQualifier().getClassid());
return pid;
}
public static final Pair<String, String> oafSubjectToPair(final StructuredProperty sp) {
return Pair.of(sp.getQualifier().getClassid(), sp.getValue());
}
}

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAbstract extends UpdateInfo<String> {
public static List<EnrichMissingAbstract> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingAbstract(final String highlightValue, final float trust) {
super("ENRICH/MISSING/ABSTRACT", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getAbstracts().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingAuthorOrcid extends UpdateInfo<String> {
public static List<EnrichMissingAuthorOrcid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingAuthorOrcid(final String highlightValue, final float trust) {
super("ENRICH/MISSING/AUTHOR/ORCID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
// TODO
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -1,32 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingOpenAccess extends UpdateInfo<Instance> {
public static List<EnrichMissingOpenAccess> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingOpenAccess(final Instance highlightValue, final float trust) {
super("ENRICH/MISSING/OPENACCESS_VERSION", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getInstances().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getUrl();
}
}

View File

@ -1,32 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPid extends UpdateInfo<Pid> {
public static List<EnrichMissingPid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingPid(final Pid highlightValue, final float trust) {
super("ENRICH/MISSING/PID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getPids().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
}
}

View File

@ -1,33 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Project;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingProject extends UpdateInfo<Project> {
public static List<EnrichMissingProject> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingProject(final Project highlightValue, final float trust) {
super("ENRICH/MISSING/PROJECT", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getProjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getFunder() + "::" + getHighlightValue().getFundingProgram()
+ getHighlightValue().getCode();
}
}

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingPublicationDate extends UpdateInfo<String> {
public static List<EnrichMissingPublicationDate> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMissingPublicationDate(final String highlightValue, final float trust) {
super("ENRICH/MISSING/PUBLICATION_DATE", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().setPublicationdate(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -1,36 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMissingSubject extends UpdateInfo<String> {
public static List<EnrichMissingSubject> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
return Arrays.asList();
}
private EnrichMissingSubject(final String subjectClassification, final String highlightValue, final float trust) {
super("ENRICH/MISSING/SUBJECT/" + subjectClassification, highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getSubjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -1,32 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.Instance;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreOpenAccess extends UpdateInfo<Instance> {
public static List<EnrichMoreOpenAccess> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMoreOpenAccess(final Instance highlightValue, final float trust) {
super("ENRICH/MORE/OPENACCESS_VERSION", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getInstances().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getUrl();
}
}

View File

@ -1,32 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Pid;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMorePid extends UpdateInfo<Pid> {
public static List<EnrichMorePid> findUpdates(final Result source, final Result target) {
// return Arrays.asList(new EnrichMissingAbstract("xxxxxxx", 0.9f));
return Arrays.asList();
}
private EnrichMorePid(final Pid highlightValue, final float trust) {
super("ENRICH/MORE/PID", highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getPids().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue().getType() + "::" + getHighlightValue().getValue();
}
}

View File

@ -1,36 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.Arrays;
import java.util.List;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.dhp.schema.oaf.Result;
public class EnrichMoreSubject extends UpdateInfo<String> {
public static List<EnrichMoreSubject> findUpdates(final Result source, final Result target) {
// MESHEUROPMC
// ARXIV
// JEL
// DDC
// ACM
return Arrays.asList();
}
private EnrichMoreSubject(final String subjectClassification, final String highlightValue, final float trust) {
super("ENRICH/MORE/SUBJECT/" + subjectClassification, highlightValue, trust);
}
@Override
public void compileHighlight(final OpenAireEventPayload payload) {
payload.getHighlight().getSubjects().add(getHighlightValue());
}
@Override
public String getHighlightValueAsString() {
return getHighlightValue();
}
}

View File

@ -1,36 +1,77 @@
package eu.dnetlib.dhp.broker.oa.util;
import java.util.function.BiConsumer;
import java.util.function.Function;
import eu.dnetlib.broker.objects.OpenAireEventPayload;
import eu.dnetlib.broker.objects.Publication;
import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.schema.oaf.Result;
public abstract class UpdateInfo<T> {
public final class UpdateInfo<T> {
private final String topic;
private final Topic topic;
private final T highlightValue;
private final Result source;
private final Result target;
private final BiConsumer<Publication, T> compileHighlight;
private final Function<T, String> highlightToString;
private final float trust;
protected UpdateInfo(final String topic, final T highlightValue, final float trust) {
public UpdateInfo(final Topic topic, final T highlightValue, final Result source, final Result target,
final BiConsumer<Publication, T> compileHighlight,
final Function<T, String> highlightToString) {
this.topic = topic;
this.highlightValue = highlightValue;
this.trust = trust;
this.source = source;
this.target = target;
this.compileHighlight = compileHighlight;
this.highlightToString = highlightToString;
this.trust = calculateTrust(source, target);
}
public T getHighlightValue() {
return highlightValue;
}
public Result getSource() {
return source;
}
public Result getTarget() {
return target;
}
private float calculateTrust(final Result source, final Result target) {
// TODO
return 0.9f;
}
protected Topic getTopic() {
return topic;
}
public String getTopicPath() {
return topic.getPath();
}
public float getTrust() {
return trust;
}
public String getTopic() {
return topic;
public void compileHighlight(final OpenAireEventPayload payload) {
compileHighlight.accept(payload.getHighlight(), getHighlightValue());
}
abstract public void compileHighlight(OpenAireEventPayload payload);
abstract public String getHighlightValueAsString();
public String getHighlightValueAsString() {
return highlightToString.apply(getHighlightValue());
}
}

View File

@ -1,7 +0,0 @@
#sandboxName when not provided explicitly will be generated
sandboxName=${sandboxName}
sandboxDir=/user/${dhp.hadoop.frontend.user.name}/${sandboxName}
workingDir=${sandboxDir}/working_dir
oozie.wf.application.path = ${nameNode}${sandboxDir}/${oozieAppDir}
oozieTopWfApplicationPath = ${oozie.wf.application.path}

View File

@ -1,166 +0,0 @@
package eu.dnetlib.dhp;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.DocumentException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.google.gson.Gson;
import eu.dnetlib.dhp.community.CommunityConfiguration;
import eu.dnetlib.dhp.community.CommunityConfigurationFactory;
import eu.dnetlib.dhp.community.Constraint;
import eu.dnetlib.dhp.community.SelectionConstraints;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
/** Created by miriam on 03/08/2018. */
public class CommunityConfigurationFactoryTest {
private final VerbResolver resolver = new VerbResolver();
@Test
public void parseTest() throws DocumentException, IOException {
String xml = IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/community_configuration.xml"));
final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
Assertions.assertEquals(5, cc.size());
cc
.getCommunityList()
.forEach(c -> Assertions.assertTrue(StringUtils.isNoneBlank(c.getId())));
}
@Test
public void applyVerb()
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException,
InstantiationException {
Constraint sc = new Constraint();
sc.setVerb("not_contains");
sc.setField("contributor");
sc.setValue("DARIAH");
sc.setSelection(resolver.getSelectionCriteria(sc.getVerb(), sc.getValue()));
String metadata = "This work has been partially supported by DARIAH-EU infrastructure";
Assertions.assertFalse(sc.verifyCriteria(metadata));
}
@Test
public void loadSelCriteriaTest() throws DocumentException, IOException {
String xml = IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.xml"));
final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
Map<String, List<String>> param = new HashMap<>();
param.put("author", new ArrayList<>(Collections.singletonList("Pippo Pippi")));
param
.put(
"description",
new ArrayList<>(
Collections
.singletonList(
"This work has been partially supported by DARIAH-EU infrastructure")));
param
.put(
"contributor",
new ArrayList<>(
Collections
.singletonList(
"Pallino ha aiutato a scrivere il paper. Pallino lavora per DARIAH")));
List<String> comm = cc
.getCommunityForDatasource(
"openaire____::1cfdb2e14977f31a98e0118283401f32", param);
Assertions.assertEquals(1, comm.size());
Assertions.assertEquals("dariah", comm.get(0));
}
@Test
public void test4() throws DocumentException, IOException {
final CommunityConfiguration cc = CommunityConfigurationFactory
.fromJson(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/community_configuration_selcrit.json")));
cc.toString();
}
@Test
public void test5() throws IOException, DocumentException {
// final CommunityConfiguration cc =
// CommunityConfigurationFactory.newInstance(IOUtils.toString(getClass().getResourceAsStream("test.xml")));
final CommunityConfiguration cc = CommunityConfigurationFactory
.fromJson(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/community_configuration.json")));
System.out.println(cc.toJson());
}
@Test
public void test6() {
String json = "{\"criteria\":[{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}]}";
String step1 = "{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}";
Constraint c = new Gson().fromJson(step1, Constraint.class);
//
// String step2 =
// "{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}";
//
// ConstraintEncapsulator ce = new
// Gson().fromJson(step2,ConstraintEncapsulator.class);
//
//
// String step3 =
// "{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}";
//
// Constraints cons = new Gson().fromJson(step3,Constraints.class);
//
// String step4 =
// "{\"criteria\":[{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}]}";
//
// ConstraintsList cl = new Gson().fromJson(step4,ConstraintsList.class);
//
// String step5 =
// "{\"cl\":{\"criteria\":[{\"ce\":{\"constraint\":[{\"verb\":\"contains\",\"field\":\"contributor\",\"value\":\"DARIAH\"}]}}]}}";
SelectionConstraints sl = new Gson().fromJson(json, SelectionConstraints.class);
}
@Test
public void test7() throws IOException {
final CommunityConfiguration cc = CommunityConfigurationFactory
.fromJson(
IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/tagging_conf.json")));
System.out.println(cc.toJson());
}
@Test
public void temporaneo() throws Exception {
String xml = IOUtils
.toString(
getClass()
.getResourceAsStream(
"/eu/dnetlib/dhp/communityconfiguration/tagging_conf.xml"));
final CommunityConfiguration cc = CommunityConfigurationFactory.newInstance(xml);
System.out.println(cc.toJson());
}
}

View File

@ -9,7 +9,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-bulktag</artifactId>
<artifactId>dhp-enrichment</artifactId>
<dependencies>
<dependency>
@ -31,6 +31,12 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
@ -43,12 +49,16 @@
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>io.github.classgraph</groupId>
<artifactId>classgraph</artifactId>
<version>4.8.71</version>
</dependency>
</dependencies>
</project>

View File

@ -15,6 +15,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class PropagationConstant {
@ -24,10 +26,6 @@ public class PropagationConstant {
public static final String TRUE = "true";
public static final String DNET_COUNTRY_SCHEMA = "dnet:countries";
public static final String DNET_SCHEMA_NAME = "dnet:provenanceActions";
public static final String DNET_SCHEMA_ID = "dnet:provenanceActions";
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_ID = "country:instrepos";
public static final String PROPAGATION_COUNTRY_INSTREPO_CLASS_NAME = "Propagation of country to result collected from datasources of type institutional repositories";
@ -46,22 +44,6 @@ public class PropagationConstant {
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_ID = "authorpid:result";
public static final String PROPAGATION_ORCID_TO_RESULT_FROM_SEM_REL_CLASS_NAME = "Propagation of authors pid to result through semantic relations";
public static final String RELATION_DATASOURCE_ORGANIZATION_REL_CLASS = "isProvidedBy";
public static final String RELATION_RESULTORGANIZATION_REL_TYPE = "resultOrganization";
public static final String RELATION_RESULTORGANIZATION_SUBREL_TYPE = "affiliation";
public static final String RELATION_ORGANIZATION_RESULT_REL_CLASS = "isAuthorInstitutionOf";
public static final String RELATION_RESULT_ORGANIZATION_REL_CLASS = "hasAuthorInstitution";
public static final String RELATION_RESULTRESULT_REL_TYPE = "resultResult";
public static final String RELATION_RESULTPROJECT_REL_TYPE = "resultProject";
public static final String RELATION_RESULTPROJECT_SUBREL_TYPE = "outcome";
public static final String RELATION_RESULT_PROJECT_REL_CLASS = "isProducedBy";
public static final String RELATION_PROJECT_RESULT_REL_CLASS = "produces";
public static final String RELATION_REPRESENTATIVERESULT_RESULT_CLASS = "merges";
public static final String PROPAGATION_AUTHOR_PID = "ORCID";
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -76,8 +58,8 @@ public class PropagationConstant {
Country nc = new Country();
nc.setClassid(classid);
nc.setClassname(classname);
nc.setSchemename(DNET_COUNTRY_SCHEMA);
nc.setSchemeid(DNET_COUNTRY_SCHEMA);
nc.setSchemename(ModelConstants.DNET_COUNTRY_TYPE);
nc.setSchemeid(ModelConstants.DNET_COUNTRY_TYPE);
nc
.setDataInfo(
getDataInfo(
@ -102,8 +84,8 @@ public class PropagationConstant {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
pa.setSchemeid(DNET_SCHEMA_ID);
pa.setSchemename(DNET_SCHEMA_NAME);
pa.setSchemeid(ModelConstants.DNET_PID_TYPES);
pa.setSchemename(ModelConstants.DNET_PID_TYPES);
return pa;
}

View File

@ -1,6 +1,7 @@
package eu.dnetlib.dhp.bulktag;
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
@ -19,8 +20,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.community.*;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.bulktag.community.*;
import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkBulkTagJob {
@ -84,6 +85,7 @@ public class SparkBulkTagJob {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
execBulkTag(spark, inputPath, outputPath, protoMappingParams, resultClazz, cc);
});
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.util.ArrayList;
@ -17,7 +17,7 @@ public class Community implements Serializable {
private String id;
private List<String> subjects = new ArrayList<>();
private List<Datasource> datasources = new ArrayList<>();
private List<Provider> providers = new ArrayList<>();
private List<ZenodoCommunity> zenodoCommunities = new ArrayList<>();
public String toJson() {
@ -27,7 +27,7 @@ public class Community implements Serializable {
public boolean isValid() {
return !getSubjects().isEmpty()
|| !getDatasources().isEmpty()
|| !getProviders().isEmpty()
|| !getZenodoCommunities().isEmpty();
}
@ -47,12 +47,12 @@ public class Community implements Serializable {
this.subjects = subjects;
}
public List<Datasource> getDatasources() {
return datasources;
public List<Provider> getProviders() {
return providers;
}
public void setDatasources(List<Datasource> datasources) {
this.datasources = datasources;
public void setProviders(List<Provider> providers) {
this.providers = providers;
}
public List<ZenodoCommunity> getZenodoCommunities() {

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.util.ArrayList;
@ -16,8 +16,8 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import eu.dnetlib.dhp.selectioncriteria.InterfaceAdapter;
import eu.dnetlib.dhp.selectioncriteria.Selection;
import eu.dnetlib.dhp.bulktag.criteria.InterfaceAdapter;
import eu.dnetlib.dhp.bulktag.criteria.Selection;
/** Created by miriam on 02/08/2018. */
public class CommunityConfiguration implements Serializable {
@ -84,7 +84,7 @@ public class CommunityConfiguration implements Serializable {
add(sbj.toLowerCase().trim(), p, subjectMap);
}
// get datasources
for (Datasource d : c.getDatasources()) {
for (Provider d : c.getProviders()) {
add(d.getOpenaireId(), new Pair<>(id, d.getSelectionConstraints()), datasourceMap);
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.StringReader;
import java.util.ArrayList;
@ -19,10 +19,10 @@ import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import eu.dnetlib.dhp.selectioncriteria.InterfaceAdapter;
import eu.dnetlib.dhp.selectioncriteria.Selection;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
import eu.dnetlib.dhp.selectioncriteria.VerbResolverFactory;
import eu.dnetlib.dhp.bulktag.criteria.InterfaceAdapter;
import eu.dnetlib.dhp.bulktag.criteria.Selection;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolverFactory;
/** Created by miriam on 03/08/2018. */
public class CommunityConfigurationFactory {
@ -77,7 +77,7 @@ public class CommunityConfigurationFactory {
log.info(String.format("community id: %s", c.getId()));
c.setSubjects(parseSubjects(node));
c.setDatasources(parseDatasources(node));
c.setProviders(parseDatasources(node));
c.setZenodoCommunities(parseZenodoCommunities(node));
return c;
}
@ -96,17 +96,17 @@ public class CommunityConfigurationFactory {
return subjects;
}
private static List<Datasource> parseDatasources(final Node node) {
private static List<Provider> parseDatasources(final Node node) {
final List<Node> list = node.selectNodes("./datasources/datasource");
final List<Datasource> datasourceList = new ArrayList<>();
final List<Provider> providerList = new ArrayList<>();
for (Node n : list) {
Datasource d = new Datasource();
Provider d = new Provider();
d.setOpenaireId(n.selectSingleNode("./openaireId").getText());
d.setSelCriteria(n.selectSingleNode("./selcriteria"), resolver);
datasourceList.add(d);
providerList.add(d);
}
log.info("size of the datasource list " + datasourceList.size());
return datasourceList;
log.info("size of the datasource list " + providerList.size());
return providerList;
}
private static List<ZenodoCommunity> parseZenodoCommunities(final Node node) {

View File

@ -1,11 +1,11 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import eu.dnetlib.dhp.selectioncriteria.Selection;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.Selection;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
public class Constraint implements Serializable {
private String verb;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
@ -14,7 +14,7 @@ import org.apache.commons.logging.LogFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
/** Created by miriam on 02/08/2018. */
public class Constraints implements Serializable {

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.util.HashMap;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
@ -9,11 +9,11 @@ import org.dom4j.Node;
import com.google.gson.Gson;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
/** Created by miriam on 01/08/2018. */
public class Datasource implements Serializable {
private static final Log log = LogFactory.getLog(Datasource.class);
public class Provider implements Serializable {
private static final Log log = LogFactory.getLog(Provider.class);
private String openaireId;

View File

@ -1,7 +1,8 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import static eu.dnetlib.dhp.community.TagginConstants.*;
import static eu.dnetlib.dhp.bulktag.community.TaggingConstants.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.io.Serializable;
import java.util.*;
@ -239,8 +240,8 @@ public class ResultTagger implements Serializable {
Qualifier pa = new Qualifier();
pa.setClassid(inference_class_id);
pa.setClassname(inference_class_name);
pa.setSchemeid(DNET_SCHEMA_ID);
pa.setSchemename(DNET_SCHEMA_NAME);
pa.setSchemeid(DNET_PROVENANCE_ACTIONS);
pa.setSchemename(DNET_PROVENANCE_ACTIONS);
return pa;
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;
import java.lang.reflect.Type;
@ -10,7 +10,7 @@ import java.util.Map;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import eu.dnetlib.dhp.selectioncriteria.VerbResolver;
import eu.dnetlib.dhp.bulktag.criteria.VerbResolver;
public class SelectionConstraints implements Serializable {
private List<Constraints> criteria;

View File

@ -1,20 +1,14 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
public class TagginConstants {
public class TaggingConstants {
public static final String BULKTAG_DATA_INFO_TYPE = "bulktagging";
public static final String DNET_SCHEMA_NAME = "dnet:provenanceActions";
public static final String DNET_SCHEMA_ID = "dnet:provenanceActions";
public static final String CLASS_ID_SUBJECT = "community:subject";
public static final String CLASS_ID_DATASOURCE = "community:datasource";
public static final String CLASS_ID_CZENODO = "community:zenodocommunity";
public static final String SCHEMA_ID = "dnet:provenanceActions";
public static final String COUNTER_GROUP = "Bulk Tagging";
public static final String ZENODO_COMMUNITY_INDICATOR = "zenodo.org/communities/";
public static final String CLASS_NAME_BULKTAG_SUBJECT = "Bulktagging for Community - Subject";

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.community;
package eu.dnetlib.dhp.bulktag.community;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.lang.reflect.Type;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
public interface Selection {

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
@ -22,12 +22,12 @@ public class VerbResolver implements Serializable {
.verbose() // If you want to enable logging to stderr
.enableAllInfo() // Scan classes, methods, fields, annotations
.whitelistPackages(
"eu.dnetlib.dhp.selectioncriteria") // Scan com.xyz and subpackages
"eu.dnetlib.dhp.bulktag.criteria") // Scan com.xyz and subpackages
.scan()) { // Perform the scan and return a ScanResult
ClassInfoList routeClassInfoList = scanResult
.getClassesWithAnnotation(
"eu.dnetlib.dhp.selectioncriteria.VerbClass");
"eu.dnetlib.dhp.bulktag.criteria.VerbClass");
this.map = routeClassInfoList
.stream()

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.selectioncriteria;
package eu.dnetlib.dhp.bulktag.criteria;
public class VerbResolverFactory {

View File

@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
/**
@ -100,7 +101,7 @@ public class PrepareDatasourceCountryAssociation {
+ "JOIN ( SELECT source, target "
+ " FROM relation "
+ " WHERE relclass = '"
+ RELATION_DATASOURCE_ORGANIZATION_REL_CLASS
+ ModelConstants.IS_PROVIDED_BY
+ "' "
+ " AND datainfo.deletedbyinference = false ) rel "
+ "ON d.id = rel.source "

View File

@ -69,13 +69,16 @@ public class SparkCountryPropagationJob {
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> execPropagation(
spark -> {
removeOutputDir(spark, outputPath);
execPropagation(
spark,
sourcePath,
preparedInfoPath,
outputPath,
resultClazz,
saveGraph));
saveGraph);
});
}
private static <R extends Result> void execPropagation(

View File

@ -74,9 +74,7 @@ public class PrepareResultOrcidAssociationStep1 {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
prepareInfo(
spark, inputRelationPath, inputResultPath, outputResultPath, resultClazz, allowedsemrel);
});
@ -97,22 +95,22 @@ public class PrepareResultOrcidAssociationStep1 {
Dataset<R> result = readPath(spark, inputResultPath, resultClazz);
result.createOrReplaceTempView("result");
String query = " select target resultId, author authorList"
+ " from (select id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
+ " from ( "
+ " select id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
+ " from result "
+ " lateral view explode (author) a as MyT "
+ " lateral view explode (MyT.pid) p as MyP "
+ " where MyP.qualifier.classid = 'ORCID') tmp "
+ " group by id) r_t "
+ " join ("
+ " select source, target "
+ " from relation "
+ " where datainfo.deletedbyinference = false "
String query = "SELECT target resultId, author authorList"
+ " FROM (SELECT id, collect_set(named_struct('name', name, 'surname', surname, 'fullname', fullname, 'orcid', orcid)) author "
+ " FROM ( "
+ " SELECT DISTINCT id, MyT.fullname, MyT.name, MyT.surname, MyP.value orcid "
+ " FROM result "
+ " LATERAL VIEW EXPLODE (author) a AS MyT "
+ " LATERAL VIEW EXPLODE (MyT.pid) p AS MyP "
+ " WHERE MyP.qualifier.classid = 'ORCID') tmp "
+ " GROUP BY id) r_t "
+ " JOIN ("
+ " SELECT source, target "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ getConstraintList(" relclass = '", allowedsemrel)
+ ") rel_rel "
+ " on source = id";
+ " ) rel_rel "
+ " ON source = id";
spark
.sql(query)
.as(Encoders.bean(ResultOrcidList.class))

View File

@ -50,9 +50,7 @@ public class PrepareResultOrcidAssociationStep2 {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
mergeInfo(spark, inputPath, outputPath);
});
}

View File

@ -70,11 +70,10 @@ public class SparkOrcidToResultFromSemRelJob {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph)
if (saveGraph) {
execPropagation(spark, possibleUpdates, inputPath, outputPath, resultClazz);
}
});
}
@ -132,16 +131,16 @@ public class SparkOrcidToResultFromSemRelJob {
private static boolean enrichAuthor(AutoritativeAuthor autoritative_author, Author author) {
boolean toaddpid = false;
if (StringUtils.isNoneEmpty(autoritative_author.getSurname())) {
if (StringUtils.isNoneEmpty(author.getSurname())) {
if (StringUtils.isNotEmpty(autoritative_author.getSurname())) {
if (StringUtils.isNotEmpty(author.getSurname())) {
if (autoritative_author
.getSurname()
.trim()
.equalsIgnoreCase(author.getSurname().trim())) {
// have the same surname. Check the name
if (StringUtils.isNoneEmpty(autoritative_author.getName())) {
if (StringUtils.isNoneEmpty(author.getName())) {
if (StringUtils.isNotEmpty(autoritative_author.getName())) {
if (StringUtils.isNotEmpty(author.getName())) {
if (autoritative_author
.getName()
.trim()
@ -150,6 +149,7 @@ public class SparkOrcidToResultFromSemRelJob {
}
// they could be differently written (i.e. only the initials of the name
// in one of the two
else {
if (autoritative_author
.getName()
.trim()
@ -162,6 +162,7 @@ public class SparkOrcidToResultFromSemRelJob {
}
}
}
}
if (toaddpid) {
StructuredProperty p = new StructuredProperty();
p.setValue(autoritative_author.getOrcid());

View File

@ -21,6 +21,7 @@ import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareProjectResultsAssociation {
@ -60,6 +61,8 @@ public class PrepareProjectResultsAssociation {
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, potentialUpdatePath);
removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(
spark,
inputPath,
@ -83,7 +86,7 @@ public class PrepareProjectResultsAssociation {
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND relClass = '"
+ RELATION_RESULT_PROJECT_REL_CLASS
+ ModelConstants.IS_PRODUCED_BY
+ "'";
Dataset<Row> resproj_relation = spark.sql(resproj_relation_query);

View File

@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
@ -122,9 +123,9 @@ public class SparkResultToProjectThroughSemRelJob {
getRelation(
resId,
projectId,
RELATION_RESULT_PROJECT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
ModelConstants.IS_PRODUCED_BY,
ModelConstants.RESULT_PROJECT,
ModelConstants.OUTCOME,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));
@ -133,9 +134,9 @@ public class SparkResultToProjectThroughSemRelJob {
getRelation(
projectId,
resId,
RELATION_PROJECT_RESULT_REL_CLASS,
RELATION_RESULTPROJECT_REL_TYPE,
RELATION_RESULTPROJECT_SUBREL_TYPE,
ModelConstants.PRODUCES,
ModelConstants.RESULT_PROJECT,
ModelConstants.OUTCOME,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_ID,
PROPAGATION_RELATION_RESULT_PROJECT_SEM_REL_CLASS_NAME));

View File

@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareResultCommunitySet {
@ -55,9 +56,7 @@ public class PrepareResultCommunitySet {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
prepareInfo(spark, inputPath, outputPath, organizationMap);
});
}
@ -76,13 +75,13 @@ public class PrepareResultCommunitySet {
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND relClass = '"
+ RELATION_RESULT_ORGANIZATION_REL_CLASS
+ ModelConstants.HAS_AUTHOR_INSTITUTION
+ "') result_organization "
+ "LEFT JOIN (SELECT source, collect_set(target) org_set "
+ " FROM relation "
+ " WHERE datainfo.deletedbyinference = false "
+ " AND relClass = '"
+ RELATION_REPRESENTATIVERESULT_RESULT_CLASS
+ ModelConstants.MERGES
+ "' "
+ " GROUP BY source) organization_organization "
+ "ON result_organization.target = organization_organization.source ";

View File

@ -68,11 +68,10 @@ public class SparkResultToCommunityFromOrganizationJob {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph)
if (saveGraph) {
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}
});
}

View File

@ -17,6 +17,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
@ -58,30 +59,15 @@ public class PrepareResultInstRepoAssociation {
isSparkSessionManaged,
spark -> {
readNeededResources(spark, inputPath);
removeOutputDir(spark, datasourceOrganizationPath);
prepareDatasourceOrganization(spark, datasourceOrganizationPath);
removeOutputDir(spark, alreadyLinkedPath);
prepareAlreadyLinkedAssociation(spark, alreadyLinkedPath);
});
}
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
String query = "Select source resultId, collect_set(target) organizationSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and relClass = '"
+ RELATION_RESULT_ORGANIZATION_REL_CLASS
+ "' "
+ "group by source";
spark
.sql(query)
.as(Encoders.bean(ResultOrganizationSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
private static void readNeededResources(SparkSession spark, String inputPath) {
Dataset<Datasource> datasource = readPath(spark, inputPath + "/datasource", Datasource.class);
datasource.createOrReplaceTempView("datasource");
@ -106,7 +92,7 @@ public class PrepareResultInstRepoAssociation {
+ "JOIN ( SELECT source, target "
+ "FROM relation "
+ "WHERE relclass = '"
+ RELATION_DATASOURCE_ORGANIZATION_REL_CLASS
+ ModelConstants.IS_PROVIDED_BY
+ "' "
+ "AND datainfo.deletedbyinference = false ) rel "
+ "ON d.id = rel.source ";
@ -119,4 +105,24 @@ public class PrepareResultInstRepoAssociation {
.option("compression", "gzip")
.json(datasourceOrganizationPath);
}
private static void prepareAlreadyLinkedAssociation(
SparkSession spark, String alreadyLinkedPath) {
String query = "Select source resultId, collect_set(target) organizationSet "
+ "from relation "
+ "where datainfo.deletedbyinference = false "
+ "and relClass = '"
+ ModelConstants.HAS_AUTHOR_INSTITUTION
+ "' "
+ "group by source";
spark
.sql(query)
.as(Encoders.bean(ResultOrganizationSet.class))
// TODO retry to stick with datasets
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}
}

View File

@ -19,6 +19,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -83,10 +84,8 @@ public class SparkResultToOrganizationFromIstRepoJob {
conf,
isSparkSessionManaged,
spark -> {
if (isTest(parser)) {
removeOutputDir(spark, outputPath);
}
if (saveGraph)
if (saveGraph) {
execPropagation(
spark,
datasourceorganization,
@ -94,6 +93,7 @@ public class SparkResultToOrganizationFromIstRepoJob {
inputPath,
outputPath,
resultClazz);
}
});
}
@ -151,9 +151,9 @@ public class SparkResultToOrganizationFromIstRepoJob {
getRelation(
orgId,
resultId,
RELATION_ORGANIZATION_RESULT_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
ModelConstants.IS_AUTHOR_INSTITUTION_OF,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));
@ -162,9 +162,9 @@ public class SparkResultToOrganizationFromIstRepoJob {
getRelation(
resultId,
orgId,
RELATION_RESULT_ORGANIZATION_REL_CLASS,
RELATION_RESULTORGANIZATION_REL_TYPE,
RELATION_RESULTORGANIZATION_SUBREL_TYPE,
ModelConstants.HAS_AUTHOR_INSTITUTION,
ModelConstants.RESULT_ORGANIZATION,
ModelConstants.AFFILIATION,
PROPAGATION_DATA_INFO_TYPE,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_ID,
PROPAGATION_RELATION_RESULT_ORGANIZATION_INST_REPO_CLASS_NAME));

View File

@ -18,6 +18,17 @@
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
@ -42,8 +53,6 @@
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
@ -53,8 +62,6 @@
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
@ -64,8 +71,6 @@
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
@ -75,8 +80,6 @@
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
@ -95,13 +98,11 @@
<action name="join_bulktag_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-publication</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
@ -124,13 +125,11 @@
<action name="join_bulktag_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-dataset</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
@ -153,13 +152,11 @@
<action name="join_bulktag_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-orp</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}
@ -182,13 +179,11 @@
<action name="join_bulktag_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>bulkTagging-software</name>
<class>eu.dnetlib.dhp.bulktag.SparkBulkTagJob</class>
<jar>dhp-bulktag-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--num-executors=${sparkExecutorNumber}
--executor-memory=${sparkExecutorMemory}

View File

@ -19,6 +19,17 @@
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/>
<kill name="Kill">
@ -43,8 +54,6 @@
<action name="copy_relation">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/relation</arg>
<arg>${nameNode}/${outputPath}/relation</arg>
</distcp>
@ -54,18 +63,15 @@
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg>
</distcp>
<ok to="copy_wait"/>
<error to="Kill"/>
</action>
<action name="copy_projects">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg>
</distcp>
@ -75,8 +81,6 @@
<action name="copy_datasources">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg>
</distcp>
@ -92,7 +96,7 @@
<mode>cluster</mode>
<name>PrepareDatasourceCountryAssociation</name>
<class>eu.dnetlib.dhp.countrypropagation.PrepareDatasourceCountryAssociation</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -126,7 +130,7 @@
<mode>cluster</mode>
<name>prepareResultCountry-Publication</name>
<class>eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -156,7 +160,7 @@
<mode>cluster</mode>
<name>prepareResultCountry-Dataset</name>
<class>eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -186,7 +190,7 @@
<mode>cluster</mode>
<name>prepareResultCountry-ORP</name>
<class>eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -216,7 +220,7 @@
<mode>cluster</mode>
<name>prepareResultCountry-Software</name>
<class>eu.dnetlib.dhp.countrypropagation.PrepareResultCountrySet</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -255,7 +259,7 @@
<mode>cluster</mode>
<name>countryPropagationForPublications</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -285,7 +289,7 @@
<mode>cluster</mode>
<name>countryPropagationForDataset</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -315,7 +319,7 @@
<mode>cluster</mode>
<name>countryPropagationForORP</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
@ -345,7 +349,7 @@
<mode>cluster</mode>
<name>countryPropagationForSoftware</name>
<class>eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob</class>
<jar>dhp-propagation-${projectVersion}.jar</jar>
<jar>dhp-enrichment-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}

Some files were not shown because too many files have changed in this diff Show More