Merge branch 'master' of code-repo.d4science.org:D-Net/dnet-hadoop

This commit is contained in:
Sandro La Bruzzo 2020-06-24 14:06:50 +02:00
commit 96689a8994
79 changed files with 1875 additions and 583 deletions

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build-assembly-resources</artifactId> <artifactId>dhp-build-assembly-resources</artifactId>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build-properties-maven-plugin</artifactId> <artifactId>dhp-build-properties-maven-plugin</artifactId>

View File

@ -5,7 +5,7 @@
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-code-style</artifactId> <artifactId>dhp-code-style</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -1,6 +1,10 @@
package eu.dnetlib.dhp.schema.common; package eu.dnetlib.dhp.schema.common;
import java.security.Key;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants { public class ModelConstants {
@ -95,6 +99,9 @@ public class ModelConstants {
SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS); DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
"10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
private static Qualifier qualifier( private static Qualifier qualifier(
final String classid, final String classid,
final String classname, final String classname,
@ -107,4 +114,12 @@ public class ModelConstants {
q.setSchemename(schemename); q.setSchemename(schemename);
return q; return q;
} }
private static KeyValue keyValue(String key, String value) {
KeyValue kv = new KeyValue();
kv.setKey(key);
kv.setValue(value);
kv.setDataInfo(new DataInfo());
return kv;
}
} }

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-actionmanager</artifactId> <artifactId>dhp-actionmanager</artifactId>

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import java.io.IOException;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -20,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob {
.map( .map(
(MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz), (MapFunction<String, G>) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz)); Encoders.bean(rowClazz));
/*
* return spark .read() .parquet(path) .as(Encoders.bean(rowClazz));
*/
} }
private static <A extends Oaf> Dataset<A> readActionPayload( private static <A extends Oaf> Dataset<A> readActionPayload(
SparkSession spark, String path, Class<A> actionPayloadClazz) { SparkSession spark, String path, Class<A> actionPayloadClazz) {
logger.info("Reading action payload from path: {}", path); logger.info("Reading action payload from path: {}", path);
return spark return spark
.read() .read()
.parquet(path) .parquet(path)
.map((MapFunction<Row, String>) value -> extractPayload(value), Encoders.STRING())
.map( .map(
(MapFunction<Row, A>) value -> OBJECT_MAPPER (MapFunction<String, A>) value -> decodePayload(actionPayloadClazz, value),
.readValue(value.<String> getAs("payload"), actionPayloadClazz),
Encoders.bean(actionPayloadClazz)); Encoders.bean(actionPayloadClazz));
} }
private static String extractPayload(Row value) {
try {
return value.<String> getAs("payload");
} catch (IllegalArgumentException | ClassCastException e) {
logger.error("cannot extract payload from action: {}", value.toString());
throw e;
}
}
private static <A extends Oaf> A decodePayload(Class<A> actionPayloadClazz, String payload) throws IOException {
try {
return OBJECT_MAPPER.readValue(payload, actionPayloadClazz);
} catch (UnrecognizedPropertyException e) {
logger.error("error decoding payload: {}", payload);
throw e;
}
}
private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable( private static <G extends Oaf, A extends Oaf> Dataset<G> promoteActionPayloadForGraphTable(
Dataset<G> rowDS, Dataset<G> rowDS,
Dataset<A> actionPayloadDS, Dataset<A> actionPayloadDS,

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-aggregation</artifactId> <artifactId>dhp-aggregation</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
@ -53,7 +53,7 @@
<dependency> <dependency>
<groupId>eu.dnetlib</groupId> <groupId>eu.dnetlib</groupId>
<artifactId>dnet-openaire-broker-common</artifactId> <artifactId>dnet-openaire-broker-common</artifactId>
<version>[3.0.2,4.0.0)</version> <version>[3.0.3,4.0.0)</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -11,7 +11,7 @@ import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils; import org.apache.commons.lang3.time.DateUtils;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
public class EventFactory { public class EventFactory {
@ -49,8 +49,8 @@ public class EventFactory {
private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) { private static Map<String, Object> createMapFromResult(final UpdateInfo<?> updateInfo) {
final Map<String, Object> map = new HashMap<>(); final Map<String, Object> map = new HashMap<>();
final OpenaireBrokerResult source = updateInfo.getSource(); final OaBrokerMainEntity source = updateInfo.getSource();
final OpenaireBrokerResult target = updateInfo.getTarget(); final OaBrokerMainEntity target = updateInfo.getTarget();
map.put("target_datasource_id", target.getCollectedFromId()); map.put("target_datasource_id", target.getCollectedFromId());
map.put("target_datasource_name", target.getCollectedFromName()); map.put("target_datasource_name", target.getCollectedFromName());

View File

@ -1,229 +0,0 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OpenaireBrokerResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedEntityFactory;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final SparkConf conf = new SparkConf();
// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
// conf.registerKryoClasses(BrokerConstants.getModelClasses());
// TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
removeOutputDir(spark, eventsPath);
// TODO REMOVE THIS
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
relatedEntities(projects, rels, RelatedProject.class)
.write()
.mode(SaveMode.Overwrite)
.json(eventsPath);
// TODO UNCOMMENT THIS
// spark
// .emptyDataset(Encoders.bean(Event.class))
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(eventsPath);
});
}
private static void removeOutputDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <SRC extends Result> Dataset<Event> generateEvents(
final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass,
final DedupConfig dedupConfig) {
final Dataset<OpenaireBrokerResult> results = expandResultsWithRelations(spark, graphPath, sourceClass);
final Dataset<Relation> mergedRels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OpenaireBrokerResult, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn();
return results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
(MapFunction<Tuple2<OpenaireBrokerResult, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1)
.map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
}
private static <SRC extends Result> Dataset<OpenaireBrokerResult> expandResultsWithRelations(
final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass) {
final Dataset<Project> projects = readPath(spark, graphPath + "/project", Project.class);
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset<Relation> rels = readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.cache();
final Dataset<OpenaireBrokerResult> r0 = readPath(
spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OpenaireBrokerResult.class));
// TODO UNCOMMENT THIS
final Dataset<OpenaireBrokerResult> r1 = join(r0, rels, relatedEntities(projects, rels, RelatedProject.class));
// final Dataset<OpenaireBrokerResult> r2 = join(r1, rels, relatedEntities(softwares, rels,
// RelatedSoftware.class));
// final Dataset<OpenaireBrokerResult> r3 = join(r2, rels, relatedEntities(datasets, rels,
// RelatedDataset.class));
// final Dataset<OpenaireBrokerResult> r4 = join(r3, rels, relatedEntities(publications, rels,
// RelatedPublication.class));;
return r0; // TODO it should be r4
}
private static <T, RT> Dataset<RT> relatedEntities(final Dataset<T> targets,
final Dataset<Relation> rels,
final Class<RT> clazz) {
return rels
.joinWith(targets, targets.col("id").equalTo(rels.col("target")), "inner")
.map(
t -> RelatedEntityFactory.newRelatedEntity(t._1.getSource(), t._1.getRelType(), t._2, clazz),
Encoders.bean(clazz));
}
private static <T> Dataset<OpenaireBrokerResult> join(final Dataset<OpenaireBrokerResult> sources,
final Dataset<Relation> rels,
final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult> aggr = new OpenaireBrokerResultAggregator<T>()
.toColumn();
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(rels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OpenaireBrokerResult, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OpenaireBrokerResult.class));
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService
.getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
// dedupConfig.getWf().setConfigurationId("???");
return dedupConfig;
}
}

View File

@ -0,0 +1,103 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
public class GenerateEventsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final String eventsPath = workingPath + "/events";
log.info("eventsPath: {}", eventsPath);
final SparkConf conf = new SparkConf();
// TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, eventsPath);
final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
final Dataset<Event> events = groups
.map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
events.write().mode(SaveMode.Overwrite).json(eventsPath);
});
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService
.getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
// dedupConfig.getWf().setConfigurationId("???");
return dedupConfig;
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
import scala.Tuple2;
public class JoinEntitiesJob {
private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
JoinEntitiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String joinedEntitiesPath = workingPath + "/joinedEntities";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, joinedEntitiesPath);
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<OaBrokerMainEntity> r1 = join(
r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class),
new RelatedProjectAggregator());
// final Dataset<OaBrokerMainEntity> r2 = join(
// r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new
// RelatedDatasetAggregator());
// final Dataset<OaBrokerMainEntity> r3 = join(
// r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new
// RelatedPublicationAggregator());
// final Dataset<OaBrokerMainEntity> r4 = join(
// r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new
// RelatedSoftwareAggregator());
r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
});
}
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
final Dataset<T> typedRels,
final Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> aggr) {
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr.toColumn())
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
}
}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareGroupsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareGroupsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
JoinEntitiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String groupsPath = workingPath + "/duplicates";
log.info("groupsPath: {}", groupsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, groupsPath);
final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn();
final Dataset<ResultGroup> groups = results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(),
Encoders.STRING())
.agg(aggr)
.map(
(MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1);
groups
.write()
.mode(SaveMode.Overwrite)
.json(groupsPath);
});
}
}

View File

@ -0,0 +1,84 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareRelatedDatasetsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasetsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareRelatedDatasetsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedDatasets";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
final Dataset<OaBrokerRelatedDataset> datasets = ClusterUtils
.readPath(spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class)
.filter(d -> !ClusterUtils.isDedupRoot(d.getId()))
.map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> {
final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2);
rel.getRelDataset().setRelType(t._1.getRelClass());
return rel;
}, Encoders.bean(RelatedDataset.class))
.write()
.mode(SaveMode.Overwrite)
.json(relsPath);
});
}
}

View File

@ -0,0 +1,85 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareRelatedProjectsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareRelatedProjectsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedProjects";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
final Dataset<OaBrokerProject> projects = ClusterUtils
.readPath(spark, graphPath + "/project", Project.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
.map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels
.joinWith(projects, projects.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedProject(t._1.getSource(), t._2), Encoders.bean(RelatedProject.class))
.write()
.mode(SaveMode.Overwrite)
.json(relsPath);
});
}
}

View File

@ -0,0 +1,91 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareRelatedPublicationsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareRelatedPublicationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedPublications";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
final Dataset<OaBrokerRelatedPublication> pubs = ClusterUtils
.readPath(spark, graphPath + "/publication", Publication.class)
.filter(p -> !ClusterUtils.isDedupRoot(p.getId()))
.map(
ConversionUtils::oafPublicationToBrokerPublication,
Encoders.bean(OaBrokerRelatedPublication.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> {
final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2);
rel.getRelPublication().setRelType(t._1.getRelClass());
return rel;
}, Encoders.bean(RelatedPublication.class))
.write()
.mode(SaveMode.Overwrite)
.json(relsPath);
});
}
}

View File

@ -0,0 +1,86 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
public class PrepareRelatedSoftwaresJob {
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareRelatedSoftwaresJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedSoftwares";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath);
final Dataset<OaBrokerRelatedSoftware> softwares = ClusterUtils
.readPath(spark, graphPath + "/software", Software.class)
.filter(sw -> !ClusterUtils.isDedupRoot(sw.getId()))
.map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
.filter(r -> !ClusterUtils.isDedupRoot(r.getTarget()));
rels
.joinWith(softwares, softwares.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> new RelatedSoftware(t._1.getSource(), t._2), Encoders.bean(RelatedSoftware.class))
.write()
.mode(SaveMode.Overwrite)
.json(relsPath);
});
}
}

View File

@ -0,0 +1,82 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class PrepareSimpleEntititiesJob {
private static final Logger log = LoggerFactory.getLogger(PrepareSimpleEntititiesJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareSimpleEntititiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String simpleEntitiesPath = workingPath + "/simpleEntities";
log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, simpleEntitiesPath);
prepareSimpleEntities(spark, graphPath, Publication.class)
.union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
.union(prepareSimpleEntities(spark, graphPath, Software.class))
.union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.json(simpleEntitiesPath);
});
}
private static <SRC extends Result> Dataset<OaBrokerMainEntity> prepareSimpleEntities(
final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass) {
return ClusterUtils
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> !ClusterUtils.isDedupRoot(r.getId()))
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
}
}

View File

@ -12,7 +12,7 @@ import java.util.function.Function;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo;
import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.config.DedupConfig;
@ -21,11 +21,11 @@ public abstract class UpdateMatcher<T> {
private final boolean multipleUpdate; private final boolean multipleUpdate;
private final Function<T, Topic> topicFunction; private final Function<T, Topic> topicFunction;
private final BiConsumer<OpenaireBrokerResult, T> compileHighlightFunction; private final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction;
private final Function<T, String> highlightToStringFunction; private final Function<T, String> highlightToStringFunction;
public UpdateMatcher(final boolean multipleUpdate, final Function<T, Topic> topicFunction, public UpdateMatcher(final boolean multipleUpdate, final Function<T, Topic> topicFunction,
final BiConsumer<OpenaireBrokerResult, T> compileHighlightFunction, final BiConsumer<OaBrokerMainEntity, T> compileHighlightFunction,
final Function<T, String> highlightToStringFunction) { final Function<T, String> highlightToStringFunction) {
this.multipleUpdate = multipleUpdate; this.multipleUpdate = multipleUpdate;
this.topicFunction = topicFunction; this.topicFunction = topicFunction;
@ -33,13 +33,13 @@ public abstract class UpdateMatcher<T> {
this.highlightToStringFunction = highlightToStringFunction; this.highlightToStringFunction = highlightToStringFunction;
} }
public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OpenaireBrokerResult res, public Collection<UpdateInfo<T>> searchUpdatesForRecord(final OaBrokerMainEntity res,
final Collection<OpenaireBrokerResult> others, final Collection<OaBrokerMainEntity> others,
final DedupConfig dedupConfig) { final DedupConfig dedupConfig) {
final Map<String, UpdateInfo<T>> infoMap = new HashMap<>(); final Map<String, UpdateInfo<T>> infoMap = new HashMap<>();
for (final OpenaireBrokerResult source : others) { for (final OaBrokerMainEntity source : others) {
if (source != res) { if (source != res) {
for (final T hl : findDifferences(source, res)) { for (final T hl : findDifferences(source, res)) {
final Topic topic = getTopicFunction().apply(hl); final Topic topic = getTopicFunction().apply(hl);
@ -68,7 +68,7 @@ public abstract class UpdateMatcher<T> {
} }
} }
protected abstract List<T> findDifferences(OpenaireBrokerResult source, OpenaireBrokerResult target); protected abstract List<T> findDifferences(OaBrokerMainEntity source, OaBrokerMainEntity target);
protected static boolean isMissing(final List<String> list) { protected static boolean isMissing(final List<String> list) {
return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0)); return list == null || list.isEmpty() || StringUtils.isBlank(list.get(0));
@ -86,7 +86,7 @@ public abstract class UpdateMatcher<T> {
return topicFunction; return topicFunction;
} }
public BiConsumer<OpenaireBrokerResult, T> getCompileHighlightFunction() { public BiConsumer<OaBrokerMainEntity, T> getCompileHighlightFunction() {
return compileHighlightFunction; return compileHighlightFunction;
} }

View File

@ -5,13 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Dataset; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public abstract class AbstractEnrichMissingDataset public abstract class AbstractEnrichMissingDataset extends UpdateMatcher<OaBrokerRelatedDataset> {
extends UpdateMatcher<Dataset> {
public AbstractEnrichMissingDataset(final Topic topic) { public AbstractEnrichMissingDataset(final Topic topic) {
super(true, super(true,
@ -23,14 +22,14 @@ public abstract class AbstractEnrichMissingDataset
protected abstract boolean filterByType(String relType); protected abstract boolean filterByType(String relType);
@Override @Override
protected final List<Dataset> findDifferences(final OpenaireBrokerResult source, protected final List<OaBrokerRelatedDataset> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingDatasets = target final Set<String> existingDatasets = target
.getDatasets() .getDatasets()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.map(Dataset::getOriginalId) .map(OaBrokerRelatedDataset::getOriginalId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source

View File

@ -4,12 +4,12 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedProjects;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.Project; import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingProject extends UpdateMatcher<Project> { public class EnrichMissingProject extends UpdateMatcher<OaBrokerProject> {
public EnrichMissingProject() { public EnrichMissingProject() {
super(true, super(true,
@ -19,7 +19,7 @@ public class EnrichMissingProject extends UpdateMatcher<Project> {
} }
@Override @Override
protected List<Project> findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
if (target.getProjects().isEmpty()) { if (target.getProjects().isEmpty()) {
return source.getProjects(); return source.getProjects();
} else { } else {

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.Project; import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreProject extends UpdateMatcher<Project> { public class EnrichMoreProject extends UpdateMatcher<OaBrokerProject> {
public EnrichMoreProject() { public EnrichMoreProject() {
super(true, super(true,
@ -19,13 +19,13 @@ public class EnrichMoreProject extends UpdateMatcher<Project> {
prj -> projectAsString(prj)); prj -> projectAsString(prj));
} }
private static String projectAsString(final Project prj) { private static String projectAsString(final OaBrokerProject prj) {
return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode(); return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode();
} }
@Override @Override
protected List<eu.dnetlib.broker.objects.Project> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerProject> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingProjects = target final Set<String> existingProjects = target
.getProjects() .getProjects()

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<Publication> { public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<OaBrokerRelatedPublication> {
public AbstractEnrichMissingPublication(final Topic topic) { public AbstractEnrichMissingPublication(final Topic topic) {
super(true, super(true,
@ -23,15 +23,15 @@ public abstract class AbstractEnrichMissingPublication extends UpdateMatcher<Pub
protected abstract boolean filterByType(String relType); protected abstract boolean filterByType(String relType);
@Override @Override
protected final List<eu.dnetlib.broker.objects.Publication> findDifferences( protected final List<OaBrokerRelatedPublication> findDifferences(
final OpenaireBrokerResult source, final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingPublications = target final Set<String> existingPublications = target
.getPublications() .getPublications()
.stream() .stream()
.filter(rel -> filterByType(rel.getRelType())) .filter(rel -> filterByType(rel.getRelType()))
.map(Publication::getOriginalId) .map(OaBrokerRelatedPublication::getOriginalId)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return source return source

View File

@ -4,12 +4,13 @@ package eu.dnetlib.dhp.broker.oa.matchers.relatedSoftware;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingSoftware public class EnrichMissingSoftware
extends UpdateMatcher<eu.dnetlib.broker.objects.Software> { extends UpdateMatcher<OaBrokerRelatedSoftware> {
public EnrichMissingSoftware() { public EnrichMissingSoftware() {
super(true, super(true,
@ -19,9 +20,9 @@ public class EnrichMissingSoftware
} }
@Override @Override
protected List<eu.dnetlib.broker.objects.Software> findDifferences( protected List<OaBrokerRelatedSoftware> findDifferences(
final OpenaireBrokerResult source, final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
if (target.getSoftwares().isEmpty()) { if (target.getSoftwares().isEmpty()) {
return source.getSoftwares(); return source.getSoftwares();

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.Software; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreSoftware extends UpdateMatcher<Software> { public class EnrichMoreSoftware extends UpdateMatcher<OaBrokerRelatedSoftware> {
public EnrichMoreSoftware() { public EnrichMoreSoftware() {
super(true, super(true,
@ -20,14 +20,14 @@ public class EnrichMoreSoftware extends UpdateMatcher<Software> {
} }
@Override @Override
protected List<eu.dnetlib.broker.objects.Software> findDifferences( protected List<OaBrokerRelatedSoftware> findDifferences(
final OpenaireBrokerResult source, final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingSoftwares = source final Set<String> existingSoftwares = source
.getSoftwares() .getSoftwares()
.stream() .stream()
.map(Software::getName) .map(OaBrokerRelatedSoftware::getName)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
return target return target

View File

@ -5,7 +5,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@ -19,7 +19,7 @@ public class EnrichMissingAbstract extends UpdateMatcher<String> {
} }
@Override @Override
protected List<String> findDifferences(final OpenaireBrokerResult source, final OpenaireBrokerResult target) { protected List<String> findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) {
if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) { if (isMissing(target.getAbstracts()) && !isMissing(source.getAbstracts())) {
return Arrays.asList(source.getAbstracts().get(0)); return Arrays.asList(source.getAbstracts().get(0));
} else { } else {

View File

@ -7,12 +7,12 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.broker.objects.Author; import eu.dnetlib.broker.objects.OaBrokerAuthor;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingAuthorOrcid extends UpdateMatcher<Author> { public class EnrichMissingAuthorOrcid extends UpdateMatcher<OaBrokerAuthor> {
public EnrichMissingAuthorOrcid() { public EnrichMissingAuthorOrcid() {
super(true, super(true,
@ -22,13 +22,13 @@ public class EnrichMissingAuthorOrcid extends UpdateMatcher<Author> {
} }
@Override @Override
protected List<Author> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerAuthor> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingOrcids = target final Set<String> existingOrcids = target
.getCreators() .getCreators()
.stream() .stream()
.map(Author::getOrcid) .map(OaBrokerAuthor::getOrcid)
.filter(StringUtils::isNotBlank) .filter(StringUtils::isNotBlank)
.collect(Collectors.toSet()); .collect(Collectors.toSet());

View File

@ -5,28 +5,28 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMissingOpenAccess extends UpdateMatcher<Instance> { public class EnrichMissingOpenAccess extends UpdateMatcher<OaBrokerInstance> {
public EnrichMissingOpenAccess() { public EnrichMissingOpenAccess() {
super(true, super(true,
i -> Topic.ENRICH_MISSING_OA_VERSION, i -> Topic.ENRICH_MISSING_OA_VERSION,
(p, i) -> p.getInstances().add(i), (p, i) -> p.getInstances().add(i),
Instance::getUrl); OaBrokerInstance::getUrl);
} }
@Override @Override
protected List<Instance> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final long count = target final long count = target
.getInstances() .getInstances()
.stream() .stream()
.map(Instance::getLicense) .map(OaBrokerInstance::getLicense)
.filter(right -> right.equals(BrokerConstants.OPEN_ACCESS)) .filter(right -> right.equals(BrokerConstants.OPEN_ACCESS))
.count(); .count();

View File

@ -5,12 +5,12 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingPid extends UpdateMatcher<TypedValue> { public class EnrichMissingPid extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMissingPid() { public EnrichMissingPid() {
super(true, super(true,
@ -20,8 +20,8 @@ public class EnrichMissingPid extends UpdateMatcher<TypedValue> {
} }
@Override @Override
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final long count = target.getPids().size(); final long count = target.getPids().size();
if (count > 0) { if (count > 0) {

View File

@ -5,7 +5,7 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
@ -19,8 +19,8 @@ public class EnrichMissingPublicationDate extends UpdateMatcher<String> {
} }
@Override @Override
protected List<String> findDifferences(final OpenaireBrokerResult source, protected List<String> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) { if (isMissing(target.getPublicationdate()) && !isMissing(source.getPublicationdate())) {
return Arrays.asList(source.getPublicationdate()); return Arrays.asList(source.getPublicationdate());

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMissingSubject extends UpdateMatcher<TypedValue> { public class EnrichMissingSubject extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMissingSubject() { public EnrichMissingSubject() {
super(true, super(true,
@ -20,8 +20,8 @@ public class EnrichMissingSubject extends UpdateMatcher<TypedValue> {
} }
@Override @Override
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingSubject = target final Set<String> existingSubject = target
.getSubjects() .getSubjects()
.stream() .stream()
@ -35,7 +35,7 @@ public class EnrichMissingSubject extends UpdateMatcher<TypedValue> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static String subjectAsString(final TypedValue s) { private static String subjectAsString(final OaBrokerTypedValue s) {
return s.getType() + "::" + s.getValue(); return s.getType() + "::" + s.getValue();
} }

View File

@ -5,24 +5,24 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants; import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
public class EnrichMoreOpenAccess extends UpdateMatcher<Instance> { public class EnrichMoreOpenAccess extends UpdateMatcher<OaBrokerInstance> {
public EnrichMoreOpenAccess() { public EnrichMoreOpenAccess() {
super(true, super(true,
i -> Topic.ENRICH_MORE_OA_VERSION, i -> Topic.ENRICH_MORE_OA_VERSION,
(p, i) -> p.getInstances().add(i), (p, i) -> p.getInstances().add(i),
Instance::getUrl); OaBrokerInstance::getUrl);
} }
@Override @Override
protected List<Instance> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerInstance> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> urls = target final Set<String> urls = target
.getInstances() .getInstances()
.stream() .stream()

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMorePid extends UpdateMatcher<TypedValue> { public class EnrichMorePid extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMorePid() { public EnrichMorePid() {
super(true, super(true,
@ -20,8 +20,8 @@ public class EnrichMorePid extends UpdateMatcher<TypedValue> {
} }
@Override @Override
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingPids = target final Set<String> existingPids = target
.getPids() .getPids()
.stream() .stream()
@ -35,7 +35,7 @@ public class EnrichMorePid extends UpdateMatcher<TypedValue> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static String pidAsString(final TypedValue pid) { private static String pidAsString(final OaBrokerTypedValue pid) {
return pid.getType() + "::" + pid.getValue(); return pid.getType() + "::" + pid.getValue();
} }
} }

View File

@ -5,12 +5,12 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
public class EnrichMoreSubject extends UpdateMatcher<TypedValue> { public class EnrichMoreSubject extends UpdateMatcher<OaBrokerTypedValue> {
public EnrichMoreSubject() { public EnrichMoreSubject() {
super(true, super(true,
@ -20,8 +20,8 @@ public class EnrichMoreSubject extends UpdateMatcher<TypedValue> {
} }
@Override @Override
protected List<TypedValue> findDifferences(final OpenaireBrokerResult source, protected List<OaBrokerTypedValue> findDifferences(final OaBrokerMainEntity source,
final OpenaireBrokerResult target) { final OaBrokerMainEntity target) {
final Set<String> existingSubjects = target final Set<String> existingSubjects = target
.getSubjects() .getSubjects()
.stream() .stream()
@ -35,7 +35,7 @@ public class EnrichMoreSubject extends UpdateMatcher<TypedValue> {
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
private static String subjectAsString(final TypedValue s) { private static String subjectAsString(final OaBrokerTypedValue s) {
return s.getType() + "::" + s.getValue(); return s.getType() + "::" + s.getValue();
} }
} }

View File

@ -0,0 +1,47 @@
package eu.dnetlib.dhp.broker.oa.util;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
public class ClusterUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void createDirIfMissing(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static void removeDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
final Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
public static boolean isDedupRoot(final String id) {
return id.contains("dedup_wf_");
}
public static final boolean isValidResultResultClass(final String s) {
return s.equals("isReferencedBy")
|| s.equals("isRelatedTo")
|| s.equals("references")
|| s.equals("isSupplementedBy")
|| s.equals("isSupplementedTo");
}
}

View File

@ -15,8 +15,16 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Function; import com.google.common.base.Function;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerAuthor;
import eu.dnetlib.broker.objects.TypedValue; import eu.dnetlib.broker.objects.OaBrokerExternalReference;
import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OaBrokerJournal;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.OaBrokerProject;
import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
import eu.dnetlib.broker.objects.OaBrokerTypedValue;
import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.ExternalReference; import eu.dnetlib.dhp.schema.oaf.ExternalReference;
@ -35,13 +43,13 @@ public class ConversionUtils {
private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class);
public static List<eu.dnetlib.broker.objects.Instance> oafInstanceToBrokerInstances(final Instance i) { public static List<OaBrokerInstance> oafInstanceToBrokerInstances(final Instance i) {
if (i == null) { if (i == null) {
return new ArrayList<>(); return new ArrayList<>();
} }
return mappedList(i.getUrl(), url -> { return mappedList(i.getUrl(), url -> {
final eu.dnetlib.broker.objects.Instance res = new eu.dnetlib.broker.objects.Instance(); final OaBrokerInstance res = new OaBrokerInstance();
res.setUrl(url); res.setUrl(url);
res.setInstancetype(classId(i.getInstancetype())); res.setInstancetype(classId(i.getInstancetype()));
res.setLicense(BrokerConstants.OPEN_ACCESS); res.setLicense(BrokerConstants.OPEN_ACCESS);
@ -50,20 +58,21 @@ public class ConversionUtils {
}); });
} }
public static TypedValue oafPidToBrokerPid(final StructuredProperty sp) { public static OaBrokerTypedValue oafPidToBrokerPid(final StructuredProperty sp) {
return oafStructPropToBrokerTypedValue(sp); return oafStructPropToBrokerTypedValue(sp);
} }
public static TypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) { public static OaBrokerTypedValue oafStructPropToBrokerTypedValue(final StructuredProperty sp) {
return sp != null ? new TypedValue(classId(sp.getQualifier()), sp.getValue()) : null; return sp != null ? new OaBrokerTypedValue(classId(sp.getQualifier()), sp.getValue()) : null;
} }
public static final eu.dnetlib.broker.objects.Dataset oafDatasetToBrokerDataset(final Dataset d) { public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) {
if (d == null) { if (d == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.Dataset res = new eu.dnetlib.broker.objects.Dataset(); final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset();
res.setOpenaireId(d.getId());
res.setOriginalId(first(d.getOriginalId())); res.setOriginalId(first(d.getOriginalId()));
res.setTitle(structPropValue(d.getTitle())); res.setTitle(structPropValue(d.getTitle()));
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
@ -72,12 +81,13 @@ public class ConversionUtils {
return res; return res;
} }
public static eu.dnetlib.broker.objects.Publication oafPublicationToBrokerPublication(final Publication p) { public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) {
if (p == null) { if (p == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.Publication res = new eu.dnetlib.broker.objects.Publication(); final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication();
res.setOpenaireId(p.getId());
res.setOriginalId(first(p.getOriginalId())); res.setOriginalId(first(p.getOriginalId()));
res.setTitle(structPropValue(p.getTitle())); res.setTitle(structPropValue(p.getTitle()));
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid)); res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
@ -87,12 +97,12 @@ public class ConversionUtils {
return res; return res;
} }
public static final OpenaireBrokerResult oafResultToBrokerResult(final Result result) { public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) {
if (result == null) { if (result == null) {
return null; return null;
} }
final OpenaireBrokerResult res = new OpenaireBrokerResult(); final OaBrokerMainEntity res = new OaBrokerMainEntity();
res.setOpenaireId(result.getId()); res.setOpenaireId(result.getId());
res.setOriginalId(first(result.getOriginalId())); res.setOriginalId(first(result.getOriginalId()));
@ -118,7 +128,7 @@ public class ConversionUtils {
return res; return res;
} }
private static eu.dnetlib.broker.objects.Author oafAuthorToBrokerAuthor(final Author author) { private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) {
if (author == null) { if (author == null) {
return null; return null;
} }
@ -135,15 +145,15 @@ public class ConversionUtils {
.findFirst() .findFirst()
.orElse(null) : null; .orElse(null) : null;
return new eu.dnetlib.broker.objects.Author(author.getFullname(), pids); return new OaBrokerAuthor(author.getFullname(), pids);
} }
private static eu.dnetlib.broker.objects.Journal oafJournalToBrokerJournal(final Journal journal) { private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) {
if (journal == null) { if (journal == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.Journal res = new eu.dnetlib.broker.objects.Journal(); final OaBrokerJournal res = new OaBrokerJournal();
res.setName(journal.getName()); res.setName(journal.getName());
res.setIssn(journal.getIssnPrinted()); res.setIssn(journal.getIssnPrinted());
res.setEissn(journal.getIssnOnline()); res.setEissn(journal.getIssnOnline());
@ -152,12 +162,12 @@ public class ConversionUtils {
return res; return res;
} }
private static eu.dnetlib.broker.objects.ExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) {
if (ref == null) { if (ref == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.ExternalReference res = new eu.dnetlib.broker.objects.ExternalReference(); final OaBrokerExternalReference res = new OaBrokerExternalReference();
res.setRefidentifier(ref.getRefidentifier()); res.setRefidentifier(ref.getRefidentifier());
res.setSitename(ref.getSitename()); res.setSitename(ref.getSitename());
res.setType(classId(ref.getQualifier())); res.setType(classId(ref.getQualifier()));
@ -165,12 +175,13 @@ public class ConversionUtils {
return res; return res;
} }
public static final eu.dnetlib.broker.objects.Project oafProjectToBrokerProject(final Project p) { public static final OaBrokerProject oafProjectToBrokerProject(final Project p) {
if (p == null) { if (p == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.Project res = new eu.dnetlib.broker.objects.Project(); final OaBrokerProject res = new OaBrokerProject();
res.setOpenaireId(p.getId());
res.setTitle(fieldValue(p.getTitle())); res.setTitle(fieldValue(p.getTitle()));
res.setAcronym(fieldValue(p.getAcronym())); res.setAcronym(fieldValue(p.getAcronym()));
res.setCode(fieldValue(p.getCode())); res.setCode(fieldValue(p.getCode()));
@ -190,12 +201,13 @@ public class ConversionUtils {
return res; return res;
} }
public static final eu.dnetlib.broker.objects.Software oafSoftwareToBrokerSoftware(final Software sw) { public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) {
if (sw == null) { if (sw == null) {
return null; return null;
} }
final eu.dnetlib.broker.objects.Software res = new eu.dnetlib.broker.objects.Software(); final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware();
res.setOpenaireId(sw.getId());
res.setName(structPropValue(sw.getTitle())); res.setName(structPropValue(sw.getTitle()));
res.setDescription(fieldValue(sw.getDescription())); res.setDescription(fieldValue(sw.getDescription()));
res.setRepository(fieldValue(sw.getCodeRepositoryUrl())); res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
@ -247,7 +259,7 @@ public class ConversionUtils {
: new ArrayList<>(); : new ArrayList<>();
} }
private static List<TypedValue> structPropTypedList(final List<StructuredProperty> list) { private static List<OaBrokerTypedValue> structPropTypedList(final List<StructuredProperty> list) {
if (list == null) { if (list == null) {
return new ArrayList<>(); return new ArrayList<>();
} }

View File

@ -4,7 +4,7 @@ package eu.dnetlib.dhp.broker.oa.util;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.broker.model.EventFactory; import eu.dnetlib.dhp.broker.model.EventFactory;
import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher;
import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy; import eu.dnetlib.dhp.broker.oa.matchers.relatedDatasets.EnrichMissingDatasetIsReferencedBy;
@ -68,7 +68,7 @@ public class EventFinder {
public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) { public static EventGroup generateEvents(final ResultGroup results, final DedupConfig dedupConfig) {
final List<UpdateInfo<?>> list = new ArrayList<>(); final List<UpdateInfo<?>> list = new ArrayList<>();
for (final OpenaireBrokerResult target : results.getData()) { for (final OaBrokerMainEntity target : results.getData()) {
for (final UpdateMatcher<?> matcher : matchers) { for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig)); list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig));
} }

View File

@ -9,10 +9,10 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.Instance; import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.broker.objects.OpenAireEventPayload; import eu.dnetlib.broker.objects.OaBrokerInstance;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.broker.objects.Provenance; import eu.dnetlib.broker.objects.OaBrokerProvenance;
import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.model.Topic;
import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.model.MapDocument;
@ -25,11 +25,11 @@ public final class UpdateInfo<T> {
private final T highlightValue; private final T highlightValue;
private final OpenaireBrokerResult source; private final OaBrokerMainEntity source;
private final OpenaireBrokerResult target; private final OaBrokerMainEntity target;
private final BiConsumer<OpenaireBrokerResult, T> compileHighlight; private final BiConsumer<OaBrokerMainEntity, T> compileHighlight;
private final Function<T, String> highlightToString; private final Function<T, String> highlightToString;
@ -37,9 +37,9 @@ public final class UpdateInfo<T> {
private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class);
public UpdateInfo(final Topic topic, final T highlightValue, final OpenaireBrokerResult source, public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source,
final OpenaireBrokerResult target, final OaBrokerMainEntity target,
final BiConsumer<OpenaireBrokerResult, T> compileHighlight, final BiConsumer<OaBrokerMainEntity, T> compileHighlight,
final Function<T, String> highlightToString, final Function<T, String> highlightToString,
final DedupConfig dedupConfig) { final DedupConfig dedupConfig) {
this.topic = topic; this.topic = topic;
@ -55,17 +55,17 @@ public final class UpdateInfo<T> {
return highlightValue; return highlightValue;
} }
public OpenaireBrokerResult getSource() { public OaBrokerMainEntity getSource() {
return source; return source;
} }
public OpenaireBrokerResult getTarget() { public OaBrokerMainEntity getTarget() {
return target; return target;
} }
private float calculateTrust(final DedupConfig dedupConfig, private float calculateTrust(final DedupConfig dedupConfig,
final OpenaireBrokerResult r1, final OaBrokerMainEntity r1,
final OpenaireBrokerResult r2) { final OaBrokerMainEntity r2) {
if (dedupConfig == null) { if (dedupConfig == null) {
return BrokerConstants.MIN_TRUST; return BrokerConstants.MIN_TRUST;
@ -104,11 +104,11 @@ public final class UpdateInfo<T> {
return highlightToString.apply(getHighlightValue()); return highlightToString.apply(getHighlightValue());
} }
public OpenAireEventPayload asBrokerPayload() { public OaBrokerEventPayload asBrokerPayload() {
compileHighlight.accept(target, getHighlightValue()); compileHighlight.accept(target, getHighlightValue());
final OpenaireBrokerResult hl = new OpenaireBrokerResult(); final OaBrokerMainEntity hl = new OaBrokerMainEntity();
compileHighlight.accept(hl, getHighlightValue()); compileHighlight.accept(hl, getHighlightValue());
final String provId = getSource().getOriginalId(); final String provId = getSource().getOriginalId();
@ -117,14 +117,14 @@ public final class UpdateInfo<T> {
final String provUrl = getSource() final String provUrl = getSource()
.getInstances() .getInstances()
.stream() .stream()
.map(Instance::getUrl) .map(OaBrokerInstance::getUrl)
.findFirst() .findFirst()
.orElse(null); .orElse(null);
; ;
final Provenance provenance = new Provenance(provId, provRepo, provUrl); final OaBrokerProvenance provenance = new OaBrokerProvenance(provId, provRepo, provUrl);
final OpenAireEventPayload res = new OpenAireEventPayload(); final OaBrokerEventPayload res = new OaBrokerEventPayload();
res.setResult(target); res.setResult(target);
res.setHighlight(hl); res.setHighlight(hl);
res.setTrust(trust); res.setTrust(trust);

View File

@ -5,11 +5,11 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator; import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2; import scala.Tuple2;
public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Relation>, ResultGroup, ResultGroup> { public class ResultAggregator extends Aggregator<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup, ResultGroup> {
/** /**
* *
@ -22,7 +22,7 @@ public class ResultAggregator extends Aggregator<Tuple2<OpenaireBrokerResult, Re
} }
@Override @Override
public ResultGroup reduce(final ResultGroup group, final Tuple2<OpenaireBrokerResult, Relation> t) { public ResultGroup reduce(final ResultGroup group, final Tuple2<OaBrokerMainEntity, Relation> t) {
group.getData().add(t._1); group.getData().add(t._1);
return group; return group;
} }

View File

@ -5,7 +5,7 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.dnetlib.broker.objects.OpenaireBrokerResult; import eu.dnetlib.broker.objects.OaBrokerMainEntity;
public class ResultGroup implements Serializable { public class ResultGroup implements Serializable {
@ -14,13 +14,13 @@ public class ResultGroup implements Serializable {
*/ */
private static final long serialVersionUID = -3360828477088669296L; private static final long serialVersionUID = -3360828477088669296L;
private List<OpenaireBrokerResult> data = new ArrayList<>(); private List<OaBrokerMainEntity> data = new ArrayList<>();
public List<OpenaireBrokerResult> getData() { public List<OaBrokerMainEntity> getData() {
return data; return data;
} }
public void setData(final List<OpenaireBrokerResult> data) { public void setData(final List<OaBrokerMainEntity> data) {
this.data = data; this.data = data;
} }

View File

@ -1,69 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OpenaireBrokerResult;
import scala.Tuple2;
public class OpenaireBrokerResultAggregator<T>
extends Aggregator<Tuple2<OpenaireBrokerResult, T>, OpenaireBrokerResult, OpenaireBrokerResult> {
/**
*
*/
private static final long serialVersionUID = -3687878788861013488L;
@Override
public OpenaireBrokerResult zero() {
return new OpenaireBrokerResult();
}
@Override
public OpenaireBrokerResult finish(final OpenaireBrokerResult g) {
return g;
}
@Override
public OpenaireBrokerResult reduce(final OpenaireBrokerResult g, final Tuple2<OpenaireBrokerResult, T> t) {
if (g.getOriginalId() == null) {
return t._1;
} else if (t._2 instanceof RelatedSoftware) {
g.getSoftwares().add(((RelatedSoftware) t._2).getRelSoftware());
} else if (t._2 instanceof RelatedDataset) {
g.getDatasets().add(((RelatedDataset) t._2).getRelDataset());
} else if (t._2 instanceof RelatedPublication) {
g.getPublications().add(((RelatedPublication) t._2).getRelPublication());
} else if (t._2 instanceof RelatedProject) {
g.getProjects().add(((RelatedProject) t._2).getRelProject());
}
return g;
}
@Override
public OpenaireBrokerResult merge(final OpenaireBrokerResult g1, final OpenaireBrokerResult g2) {
if (g1.getOriginalId() != null) {
g1.getSoftwares().addAll(g2.getSoftwares());
g1.getDatasets().addAll(g2.getDatasets());
g1.getPublications().addAll(g2.getPublications());
g1.getProjects().addAll(g2.getProjects());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OpenaireBrokerResult> bufferEncoder() {
return Encoders.bean(OpenaireBrokerResult.class);
}
@Override
public Encoder<OpenaireBrokerResult> outputEncoder() {
return Encoders.bean(OpenaireBrokerResult.class);
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable; import java.io.Serializable;
import eu.dnetlib.broker.objects.Dataset; import eu.dnetlib.broker.objects.OaBrokerRelatedDataset;
public class RelatedDataset implements Serializable { public class RelatedDataset implements Serializable {
@ -11,16 +11,15 @@ public class RelatedDataset implements Serializable {
* *
*/ */
private static final long serialVersionUID = 774487705184038324L; private static final long serialVersionUID = 774487705184038324L;
private String source; private String source;
private String relType; private OaBrokerRelatedDataset relDataset;
private Dataset relDataset;
public RelatedDataset() { public RelatedDataset() {
} }
public RelatedDataset(final String source, final String relType, final Dataset relDataset) { public RelatedDataset(final String source, final OaBrokerRelatedDataset relDataset) {
this.source = source; this.source = source;
this.relType = relType;
this.relDataset = relDataset; this.relDataset = relDataset;
} }
@ -32,19 +31,11 @@ public class RelatedDataset implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() { public OaBrokerRelatedDataset getRelDataset() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public Dataset getRelDataset() {
return relDataset; return relDataset;
} }
public void setRelDataset(final Dataset relDataset) { public void setRelDataset(final OaBrokerRelatedDataset relDataset) {
this.relDataset = relDataset; this.relDataset = relDataset;
} }

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedDatasetAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 6969761680131482557L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedDataset> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) {
res.getDatasets().add(t._2.getRelDataset());
}
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getDatasets().addAll(g2.getDatasets());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -1,34 +0,0 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
public class RelatedEntityFactory {
@SuppressWarnings("unchecked")
public static <RT, T> RT newRelatedEntity(final String sourceId,
final String relType,
final T target,
final Class<RT> clazz) {
if (clazz == RelatedProject.class) {
return (RT) new RelatedProject(sourceId, relType,
ConversionUtils.oafProjectToBrokerProject((Project) target));
} else if (clazz == RelatedSoftware.class) {
return (RT) new RelatedSoftware(sourceId, relType,
ConversionUtils.oafSoftwareToBrokerSoftware((Software) target));
} else if (clazz == RelatedDataset.class) {
return (RT) new RelatedDataset(sourceId, relType,
ConversionUtils.oafDatasetToBrokerDataset((Dataset) target));
} else if (clazz == RelatedPublication.class) {
return (RT) new RelatedPublication(sourceId, relType,
ConversionUtils.oafPublicationToBrokerPublication((Publication) target));
} else {
return null;
}
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable; import java.io.Serializable;
import eu.dnetlib.broker.objects.Project; import eu.dnetlib.broker.objects.OaBrokerProject;
public class RelatedProject implements Serializable { public class RelatedProject implements Serializable {
@ -13,15 +13,13 @@ public class RelatedProject implements Serializable {
private static final long serialVersionUID = 4941437626549329870L; private static final long serialVersionUID = 4941437626549329870L;
private String source; private String source;
private String relType; private OaBrokerProject relProject;
private Project relProject;
public RelatedProject() { public RelatedProject() {
} }
public RelatedProject(final String source, final String relType, final Project relProject) { public RelatedProject(final String source, final OaBrokerProject relProject) {
this.source = source; this.source = source;
this.relType = relType;
this.relProject = relProject; this.relProject = relProject;
} }
@ -33,19 +31,11 @@ public class RelatedProject implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() { public OaBrokerProject getRelProject() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public Project getRelProject() {
return relProject; return relProject;
} }
public void setRelProject(final Project relProject) { public void setRelProject(final OaBrokerProject relProject) {
this.relProject = relProject; this.relProject = relProject;
} }

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedProjectAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 8559808519152275763L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedProject> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) {
res.getProjects().add(t._2.getRelProject());
}
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getProjects().addAll(g2.getProjects());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable; import java.io.Serializable;
import eu.dnetlib.broker.objects.Publication; import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
public class RelatedPublication implements Serializable { public class RelatedPublication implements Serializable {
@ -13,15 +13,13 @@ public class RelatedPublication implements Serializable {
private static final long serialVersionUID = 9021609640411395128L; private static final long serialVersionUID = 9021609640411395128L;
private String source; private String source;
private String relType; private OaBrokerRelatedPublication relPublication;
private Publication relPublication;
public RelatedPublication() { public RelatedPublication() {
} }
public RelatedPublication(final String source, final String relType, final Publication relPublication) { public RelatedPublication(final String source, final OaBrokerRelatedPublication relPublication) {
this.source = source; this.source = source;
this.relType = relType;
this.relPublication = relPublication; this.relPublication = relPublication;
} }
@ -33,19 +31,11 @@ public class RelatedPublication implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() { public OaBrokerRelatedPublication getRelPublication() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public Publication getRelPublication() {
return relPublication; return relPublication;
} }
public void setRelPublication(final Publication relPublication) { public void setRelPublication(final OaBrokerRelatedPublication relPublication) {
this.relPublication = relPublication; this.relPublication = relPublication;
} }

View File

@ -0,0 +1,61 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedPublicationAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = 4656934981558135919L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g,
final Tuple2<OaBrokerMainEntity, RelatedPublication> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) {
res.getPublications().add(t._2.getRelPublication());
}
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getPublications().addAll(g2.getPublications());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import java.io.Serializable; import java.io.Serializable;
import eu.dnetlib.broker.objects.Software; import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
public class RelatedSoftware implements Serializable { public class RelatedSoftware implements Serializable {
@ -11,16 +11,15 @@ public class RelatedSoftware implements Serializable {
* *
*/ */
private static final long serialVersionUID = 7573383356943300157L; private static final long serialVersionUID = 7573383356943300157L;
private String source; private String source;
private String relType; private OaBrokerRelatedSoftware relSoftware;
private Software relSoftware;
public RelatedSoftware() { public RelatedSoftware() {
} }
public RelatedSoftware(final String source, final String relType, final Software relSoftware) { public RelatedSoftware(final String source, final OaBrokerRelatedSoftware relSoftware) {
this.source = source; this.source = source;
this.relType = relType;
this.relSoftware = relSoftware; this.relSoftware = relSoftware;
} }
@ -32,19 +31,11 @@ public class RelatedSoftware implements Serializable {
this.source = source; this.source = source;
} }
public String getRelType() { public OaBrokerRelatedSoftware getRelSoftware() {
return relType;
}
public void setRelType(final String relType) {
this.relType = relType;
}
public Software getRelSoftware() {
return relSoftware; return relSoftware;
} }
public void setRelSoftware(final Software relSoftware) { public void setRelSoftware(final OaBrokerRelatedSoftware relSoftware) {
this.relSoftware = relSoftware; this.relSoftware = relSoftware;
} }

View File

@ -0,0 +1,60 @@
package eu.dnetlib.dhp.broker.oa.util.aggregators.withRels;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.expressions.Aggregator;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import scala.Tuple2;
public class RelatedSoftwareAggregator
extends Aggregator<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity, OaBrokerMainEntity> {
/**
*
*/
private static final long serialVersionUID = -8987959389106443702L;
@Override
public OaBrokerMainEntity zero() {
return new OaBrokerMainEntity();
}
@Override
public OaBrokerMainEntity finish(final OaBrokerMainEntity g) {
return g;
}
@Override
public OaBrokerMainEntity reduce(final OaBrokerMainEntity g, final Tuple2<OaBrokerMainEntity, RelatedSoftware> t) {
final OaBrokerMainEntity res = StringUtils.isNotBlank(g.getOpenaireId()) ? g : t._1;
if (t._2 != null) {
res.getSoftwares().add(t._2.getRelSoftware());
}
return res;
}
@Override
public OaBrokerMainEntity merge(final OaBrokerMainEntity g1, final OaBrokerMainEntity g2) {
if (StringUtils.isNotBlank(g1.getOpenaireId())) {
g1.getSoftwares().addAll(g2.getSoftwares());
return g1;
} else {
return g2;
}
}
@Override
public Encoder<OaBrokerMainEntity> bufferEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
@Override
public Encoder<OaBrokerMainEntity> outputEncoder() {
return Encoders.bean(OaBrokerMainEntity.class);
}
}

View File

@ -0,0 +1,14 @@
[
{
"paramName": "g",
"paramLongName": "graphPath",
"paramDescription": "the path where there the graph is stored",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the path where the temporary data will be stored",
"paramRequired": true
}
]

View File

@ -6,8 +6,8 @@
<description>the path where the graph is stored</description> <description>the path where the graph is stored</description>
</property> </property>
<property> <property>
<name>eventsOutputPath</name> <name>workingPath</name>
<description>the path where the the events will be stored</description> <description>the path where the the generated data will be stored</description>
</property> </property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
@ -73,18 +73,34 @@
</configuration> </configuration>
</global> </global>
<start to="generate_events"/> <start to="ensure_working_path"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="generate_events"> <action name="ensure_working_path">
<fs>
<mkdir path='${workingPath}'/>
</fs>
<ok to="start_entities_and_rels"/>
<error to="Kill"/>
</action>
<fork name="start_entities_and_rels">
<path start="prepare_simple_entities"/>
<path start="prepare_related_softwares"/>
<path start="prepare_related_datasets"/>
<path start="prepare_related_projects"/>
<path start="prepare_related_publications"/>
</fork>
<action name="prepare_simple_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>GenerateEvents</name> <name>PrepareSimpleEntititiesJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</class> <class>eu.dnetlib.dhp.broker.oa.PrepareSimpleEntititiesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -97,7 +113,177 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_datasets">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedDatasetsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedProjectsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedProjectsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_publications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedPublicationsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_softwares">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedSoftwaresJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedSoftwaresJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<join name="wait_entities_and_rels" to="join_entities"/>
<action name="join_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinEntitiesJob</name>
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark> </spark>
@ -105,7 +291,6 @@
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -1,13 +1,7 @@
[ [
{
"paramName": "g",
"paramLongName": "graphPath",
"paramDescription": "the path where there the graph is stored",
"paramRequired": true
},
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "eventsPath", "paramLongName": "workingPath",
"paramDescription": "the path where the generated events will be stored", "paramDescription": "the path where the generated events will be stored",
"paramRequired": true "paramRequired": true
}, },

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,158 @@
<workflow-app name="create broker events" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>graphInputPath</name>
<description>the path where the graph is stored</description>
</property>
<property>
<name>workingPath</name>
<description>the path where the the generated data will be stored</description>
</property>
<property>
<name>isLookupUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>dedupConfProfId</name>
<description>the id of a valid Dedup Configuration Profile</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="join_entities"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="join_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinEntitiesJob</name>
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dhp-dedup-openaire</artifactId> <artifactId>dhp-dedup-openaire</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -144,6 +144,9 @@ public class CleanGraphSparkJob {
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) { if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
} }
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
} }
} }

View File

@ -111,6 +111,7 @@ public class MappersTest {
assertNotNull(i.getAccessright()); assertNotNull(i.getAccessright());
assertEquals("OPEN", i.getAccessright().getClassid()); assertEquals("OPEN", i.getAccessright().getClassid());
}); });
assertEquals("0001", p.getInstance().get(0).getRefereed().getClassid());
assertNotNull(p.getBestaccessright()); assertNotNull(p.getBestaccessright());
assertEquals("OPEN", p.getBestaccessright().getClassid()); assertEquals("OPEN", p.getBestaccessright().getClassid());
@ -217,6 +218,7 @@ public class MappersTest {
assertNotNull(i.getAccessright()); assertNotNull(i.getAccessright());
assertEquals("OPEN", i.getAccessright().getClassid()); assertEquals("OPEN", i.getAccessright().getClassid());
}); });
assertEquals("0001", d.getInstance().get(0).getRefereed().getClassid());
assertValidId(r1.getSource()); assertValidId(r1.getSource());
assertValidId(r1.getTarget()); assertValidId(r1.getTarget());

View File

@ -57,6 +57,7 @@
<oaf:identifier identifierType="doi">10.3897/oneeco.2.e13718</oaf:identifier> <oaf:identifier identifierType="doi">10.3897/oneeco.2.e13718</oaf:identifier>
<oaf:fulltext>https://oneecosystem.pensoft.net/article/13718/</oaf:fulltext> <oaf:fulltext>https://oneecosystem.pensoft.net/article/13718/</oaf:fulltext>
<oaf:journal eissn="2367-8194" issn="">One Ecosystem</oaf:journal> <oaf:journal eissn="2367-8194" issn="">One Ecosystem</oaf:journal>
<oaf:refereed>0001</oaf:refereed>
</metadata> </metadata>
<about xmlns:oai="http://www.openarchives.org/OAI/2.0/"> <about xmlns:oai="http://www.openarchives.org/OAI/2.0/">
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd"> <provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">

View File

@ -90,6 +90,7 @@
<oaf:hostedBy id="re3data_____::r3d100010468" name="Zenodo"/> <oaf:hostedBy id="re3data_____::r3d100010468" name="Zenodo"/>
<oaf:projectid>corda_______::226852</oaf:projectid> <oaf:projectid>corda_______::226852</oaf:projectid>
<oaf:collectedFrom id="re3data_____::r3d100010468" name="Zenodo"/> <oaf:collectedFrom id="re3data_____::r3d100010468" name="Zenodo"/>
<oaf:refereed>0001</oaf:refereed>s
</metadata> </metadata>
<about xmlns:dc="http://purl.org/dc/elements/1.1/" <about xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:dri="http://www.driver-repository.eu/namespace/dri" xmlns:dri="http://www.driver-repository.eu/namespace/dri"

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dhp-stats-update</artifactId> <artifactId>dhp-stats-update</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -7,7 +7,7 @@
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/> <DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER> </HEADER>
<BODY> <BODY>
<WORKFLOW_NAME>Data Provision [OCEAN]</WORKFLOW_NAME> <WORKFLOW_NAME>Graph Construction [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE> <WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual"> <CONFIGURATION start="manual">

View File

@ -0,0 +1,73 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="5d750977-bec2-47f4-97bb-1b7500e4704e_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Graph to HiveDB [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setInputPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inputPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setHiveDbName" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">hiveDbName</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="aggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="graph2hive" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputPath' : 'inputPath',
'hiveDbName' : 'hiveDbName'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/hive/oozie_app'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,98 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="8d36cc94-5b82-413c-923f-e7b3953e41ba_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Update Solr [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setInputPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inputGraphRootPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCollection" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">format</PARAM>
<PARAM function="validValues(['TMF', 'DMF'])" managedBy="user" name="parameterValue" required="true" type="string">TMF</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setIsLookUpUrl" type="SetEnvParameter">
<DESCRIPTION>Set the lookup address</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">isLookupUrl</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="updateSolr"/>
</ARCS>
</NODE>
<NODE name="updateSolr" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputGraphRootPath' : 'inputGraphRootPath',
'isLookupUrl' : 'isLookupUrl',
'format' : 'format'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/provision/oozie_app',
'maxRelations' : '100',
'relPartitions' : '3000',
'batchSize' : '2000',
'relationFilter' : 'isAuthorInstitutionOf,produces,hasAmongTopNSimilarDocuments',
'otherDsTypeId' : 'scholarcomminfra,infospace,pubsrepository::mock,entityregistry,entityregistry::projects,entityregistry::repositories,websource',
'resumeFrom' : 'prepare_relations',
'sparkDriverMemoryForJoining' : '3G',
'sparkExecutorMemoryForJoining' : '7G',
'sparkExecutorCoresForJoining' : '4',
'sparkDriverMemoryForIndexing' : '2G',
'sparkExecutorMemoryForIndexing' : '2G',
'sparkExecutorCoresForIndexing' : '64',
'sparkNetworkTimeout' : '600',
'workingDir' : '/tmp/beta_provision/working_dir/update_solr'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,74 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="65ca9122-f8fe-4aa6-9fb2-bc1e1ffb2dda_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Update Stats [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setGraphDbName" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">openaire_db_name</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setStatsDbNameCollection" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">stats_db_name</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="updateStatsDB"/>
</ARCS>
</NODE>
<NODE name="updateStatsDB" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'openaire_db_name' : 'openaire_db_name',
'stats_db_name' : 'stats_db_name'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/stats/oozie_app',
'hive_timeout' : '3000'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<licenses> <licenses>