1
0
Fork 0

merge branch with fork master

This commit is contained in:
Miriam Baglioni 2020-06-22 16:25:48 +02:00
commit 250fd1c854
40 changed files with 804 additions and 277 deletions

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build-assembly-resources</artifactId> <artifactId>dhp-build-assembly-resources</artifactId>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build-properties-maven-plugin</artifactId> <artifactId>dhp-build-properties-maven-plugin</artifactId>

View File

@ -5,7 +5,7 @@
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-code-style</artifactId> <artifactId>dhp-code-style</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-build</artifactId> <artifactId>dhp-build</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-actionmanager</artifactId> <artifactId>dhp-actionmanager</artifactId>

View File

@ -4,7 +4,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<artifactId>dhp-aggregation</artifactId> <artifactId>dhp-aggregation</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -1,190 +0,0 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import scala.Tuple2;
public class GenerateEventsApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsApplication.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String eventsPath = parser.get("eventsPath");
log.info("eventsPath: {}", eventsPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final SparkConf conf = new SparkConf();
// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
// conf.registerKryoClasses(BrokerConstants.getModelClasses());
// TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, eventsPath);
// TODO REMOVE THIS
expandResultsWithRelations(spark, graphPath, Publication.class)
.write()
.mode(SaveMode.Overwrite)
.json(eventsPath);
// TODO UNCOMMENT THIS
// spark
// .emptyDataset(Encoders.bean(Event.class))
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(eventsPath);
});
}
private static <SRC extends Result> Dataset<Event> generateEvents(
final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass,
final DedupConfig dedupConfig) {
final Dataset<OaBrokerMainEntity> results = expandResultsWithRelations(spark, graphPath, sourceClass);
final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn();
return results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
.agg(aggr)
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1)
.map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
}
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations(
final SparkSession spark,
final String graphPath,
final Class<SRC> sourceClass) {
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
// TODO UNCOMMENT THIS
// final Dataset<OaBrokerMainEntity> r1 = join(r0, relatedProjects(spark, graphPath));
// final Dataset<OaBrokerMainEntity> r2 = join(r1, relatedDataset(spark, graphPath));
// final Dataset<OaBrokerMainEntity> r3 = join(r2, relatedPublications(spark, graphPath));
// final Dataset<OaBrokerMainEntity> r4 = join(r3, relatedSoftwares(spark, graphPath));
return r0; // TODO it should be r4
}
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
.toColumn();
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService
.getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
// dedupConfig.getWf().setConfigurationId("???");
return dedupConfig;
}
}

View File

@ -0,0 +1,106 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
public class GenerateEventsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateEventsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
GenerateEventsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String isLookupUrl = parser.get("isLookupUrl");
log.info("isLookupUrl: {}", isLookupUrl);
final String dedupConfigProfileId = parser.get("dedupConfProfile");
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
final String eventsPath = workingPath + "/eventsPath";
log.info("eventsPath: {}", eventsPath);
final SparkConf conf = new SparkConf();
// TODO UNCOMMENT
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
final DedupConfig dedupConfig = null;
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, eventsPath);
final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, graphPath + "/relation", ResultGroup.class);
final Dataset<Event> events = groups
.map(
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
Encoders.bean(EventGroup.class))
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
events.write().mode(SaveMode.Overwrite).json(eventsPath);
});
}
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final String conf = isLookUpService
.getResourceProfileByQuery(
String
.format(
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
profId));
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
dedupConfig.getPace().initModel();
dedupConfig.getPace().initTranslationMap();
// dedupConfig.getWf().setConfigurationId("???");
return dedupConfig;
}
}

View File

@ -0,0 +1,94 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import scala.Tuple2;
public class JoinEntitiesJob {
private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
JoinEntitiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String joinedEntitiesPath = workingPath + "/joinedEntities";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, joinedEntitiesPath);
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
.readPath(spark, graphPath + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<OaBrokerMainEntity> r1 = join(
r0, ClusterUtils.readPath(spark, graphPath + "/relatedProjects", RelatedProject.class));
final Dataset<OaBrokerMainEntity> r2 = join(
r1, ClusterUtils.readPath(spark, graphPath + "/relatedDatasets", RelatedDataset.class));
final Dataset<OaBrokerMainEntity> r3 = join(
r2, ClusterUtils.readPath(spark, graphPath + "/relatedPublications", RelatedPublication.class));
final Dataset<OaBrokerMainEntity> r4 = join(
r3, ClusterUtils.readPath(spark, graphPath + "/relatedSoftwares", RelatedSoftware.class));
r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
});
}
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
final Dataset<T> typedRels) {
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
.toColumn();
return sources
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
.agg(aggr)
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
}
}

View File

@ -0,0 +1,88 @@
package eu.dnetlib.dhp.broker.oa;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
import eu.dnetlib.dhp.schema.oaf.Relation;
import scala.Tuple2;
public class PrepareGroupsJob {
private static final Logger log = LoggerFactory.getLogger(PrepareGroupsJob.class);
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
JoinEntitiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String groupsPath = workingPath + "/groups";
log.info("groupsPath: {}", groupsPath);
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, groupsPath);
final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, graphPath + "/joinedEntities", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
.toColumn();
final Dataset<ResultGroup> groups = results
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
.groupByKey(
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(),
Encoders.STRING())
.agg(aggr)
.map(
(MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
.filter(rg -> rg.getData().size() > 1);
groups
.write()
.mode(SaveMode.Overwrite)
.json(groupsPath);
});
}
}

View File

@ -19,16 +19,16 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset; import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class GenerateRelatedDatasets { public class PrepareRelatedDatasetsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class); private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasetsJob.class);
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
GenerateRelatedDatasets.class PrepareRelatedDatasetsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -40,7 +40,10 @@ public class GenerateRelatedDatasets {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String relsPath = parser.get("relsPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedDatasets";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -23,9 +23,9 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class GenerateRelatedProjects { public class PrepareRelatedProjectsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class); private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -33,8 +33,8 @@ public class GenerateRelatedProjects {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
GenerateRelatedProjects.class PrepareRelatedProjectsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -46,7 +46,10 @@ public class GenerateRelatedProjects {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String relsPath = parser.get("relsPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedProjects";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
public class GenerateRelatedPublications { public class PrepareRelatedPublicationsJob {
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class); private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -32,8 +32,8 @@ public class GenerateRelatedPublications {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
GenerateRelatedPublications.class PrepareRelatedPublicationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -45,7 +45,10 @@ public class GenerateRelatedPublications {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String relsPath = parser.get("relsPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedPublications";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.Software;
public class GenerateRelatedSoftwares { public class PrepareRelatedSoftwaresJob {
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class); private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@ -32,8 +32,8 @@ public class GenerateRelatedSoftwares {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
GenerateRelatedSoftwares.class PrepareRelatedSoftwaresJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -45,7 +45,10 @@ public class GenerateRelatedSoftwares {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String relsPath = parser.get("relsPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String relsPath = workingPath + "/relatedSoftwares";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -53,6 +56,7 @@ public class GenerateRelatedSoftwares {
runWithSparkSession(conf, isSparkSessionManaged, spark -> { runWithSparkSession(conf, isSparkSessionManaged, spark -> {
ClusterUtils.removeDir(spark, relsPath); ClusterUtils.removeDir(spark, relsPath);
final Dataset<Software> softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class); final Dataset<Software> softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class);
final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class); final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);

View File

@ -18,19 +18,21 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils; import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
public class GenerateSimpleEntitities { public class PrepareSimpleEntititiesJob {
private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class); private static final Logger log = LoggerFactory.getLogger(PrepareSimpleEntititiesJob.class);
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(
GenerateSimpleEntitities.class PrepareSimpleEntititiesJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
final Boolean isSparkSessionManaged = Optional final Boolean isSparkSessionManaged = Optional
@ -42,7 +44,10 @@ public class GenerateSimpleEntitities {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String simpleEntitiesPath = parser.get("simpleEntitiesPath"); final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String simpleEntitiesPath = workingPath + "/simpleEntities";
log.info("simpleEntitiesPath: {}", simpleEntitiesPath); log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -51,27 +56,18 @@ public class GenerateSimpleEntitities {
ClusterUtils.removeDir(spark, simpleEntitiesPath); ClusterUtils.removeDir(spark, simpleEntitiesPath);
expandResultsWithRelations(spark, graphPath, Publication.class) prepareSimpleEntities(spark, graphPath, Publication.class)
.union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
.union(prepareSimpleEntities(spark, graphPath, Software.class))
.union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(simpleEntitiesPath); .json(simpleEntitiesPath);
// TODO UNCOMMENT THIS
// spark
// .emptyDataset(Encoders.bean(Event.class))
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
// .write()
// .mode(SaveMode.Overwrite)
// .option("compression", "gzip")
// .json(eventsPath);
}); });
} }
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations( private static <SRC extends Result> Dataset<OaBrokerMainEntity> prepareSimpleEntities(
final SparkSession spark, final SparkSession spark,
final String graphPath, final String graphPath,
final Class<SRC> sourceClass) { final Class<SRC> sourceClass) {

View File

@ -14,6 +14,10 @@ public class ClusterUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void createDirIfMissing(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static void removeDir(final SparkSession spark, final String path) { public static void removeDir(final SparkSession spark, final String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }

View File

@ -7,8 +7,8 @@
}, },
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "relsPath", "paramLongName": "workingPath",
"paramDescription": "the path where the generated relations will be stored", "paramDescription": "the path where the temporary data will be stored",
"paramRequired": true "paramRequired": true
} }
] ]

View File

@ -6,8 +6,8 @@
<description>the path where the graph is stored</description> <description>the path where the graph is stored</description>
</property> </property>
<property> <property>
<name>eventsOutputPath</name> <name>workingPath</name>
<description>the path where the the events will be stored</description> <description>the path where the the generated data will be stored</description>
</property> </property>
<property> <property>
<name>isLookupUrl</name> <name>isLookupUrl</name>
@ -73,18 +73,34 @@
</configuration> </configuration>
</global> </global>
<start to="generate_events"/> <start to="ensure_working_path"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="generate_events"> <action name="ensure_working_path">
<fs>
<mkdir path='${workingPath}'/>
</fs>
<ok to="start_entities_and_rels"/>
<error to="Kill"/>
</action>
<fork name="start_entities_and_rels">
<path start="prepare_simple_entities"/>
<path start="prepare_related_datasets"/>
<path start="prepare_related_projects"/>
<path start="prepare_related_publications"/>
<path start="prepare_related_softwares"/>
</fork>
<action name="prepare_simple_entities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>GenerateEvents</name> <name>PrepareSimpleEntititiesJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</class> <class>eu.dnetlib.dhp.broker.oa.PrepareSimpleEntititiesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar> <jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-cores=${sparkExecutorCores} --executor-cores=${sparkExecutorCores}
@ -97,14 +113,183 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg> <arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_datasets">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedDatasetsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_projects">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedProjectsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedProjectsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_publications">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedPublicationsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<action name="prepare_related_softwares">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareRelatedSoftwaresJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedSoftwaresJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
</action>
<join name="wait_entities_and_rels" to="join_entities"/>
<action name="join_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>JoinEntitiesJob</name>
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
</action>
<action name="prepare_groups">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>PrepareGroupsJob</name>
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
</action>
<action name="generate_events">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>GenerateEventsJob</name>
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg> <arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>

View File

@ -7,7 +7,7 @@
}, },
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "eventsPath", "paramLongName": "workingPath",
"paramDescription": "the path where the generated events will be stored", "paramDescription": "the path where the generated events will be stored",
"paramRequired": true "paramRequired": true
}, },

View File

@ -1,14 +0,0 @@
[
{
"paramName": "g",
"paramLongName": "graphPath",
"paramDescription": "the path where there the graph is stored",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "simpleEntitiesPath",
"paramDescription": "the path where the generated simple entities (without relations) will be stored",
"paramRequired": true
}
]

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dhp-dedup-openaire</artifactId> <artifactId>dhp-dedup-openaire</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>dhp-stats-update</artifactId> <artifactId>dhp-stats-update</artifactId>

View File

@ -3,7 +3,7 @@
<parent> <parent>
<artifactId>dhp-workflows</artifactId> <artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
</parent> </parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>

View File

@ -7,7 +7,7 @@
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/> <DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER> </HEADER>
<BODY> <BODY>
<WORKFLOW_NAME>Data Provision [OCEAN]</WORKFLOW_NAME> <WORKFLOW_NAME>Graph Construction [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE> <WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual"> <CONFIGURATION start="manual">

View File

@ -0,0 +1,73 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="5d750977-bec2-47f4-97bb-1b7500e4704e_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Graph to HiveDB [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setInputPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inputPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setHiveDbName" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">hiveDbName</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="aggregatorGraph"/>
</ARCS>
</NODE>
<NODE name="graph2hive" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputPath' : 'inputPath',
'hiveDbName' : 'hiveDbName'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/hive/oozie_app'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,98 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="8d36cc94-5b82-413c-923f-e7b3953e41ba_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Update Solr [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setInputPath" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">inputGraphRootPath</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setCollection" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">format</PARAM>
<PARAM function="validValues(['TMF', 'DMF'])" managedBy="user" name="parameterValue" required="true" type="string">TMF</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setIsLookUpUrl" type="SetEnvParameter">
<DESCRIPTION>Set the lookup address</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">isLookupUrl</PARAM>
<PARAM managedBy="system" name="parameterValue" required="true" type="string">http://beta.services.openaire.eu:8280/is/services/isLookUp?wsdl</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="updateSolr"/>
</ARCS>
</NODE>
<NODE name="updateSolr" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'inputGraphRootPath' : 'inputGraphRootPath',
'isLookupUrl' : 'isLookupUrl',
'format' : 'format'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/provision/oozie_app',
'maxRelations' : '100',
'relPartitions' : '3000',
'batchSize' : '2000',
'relationFilter' : 'isAuthorInstitutionOf,produces,hasAmongTopNSimilarDocuments',
'otherDsTypeId' : 'scholarcomminfra,infospace,pubsrepository::mock,entityregistry,entityregistry::projects,entityregistry::repositories,websource',
'resumeFrom' : 'prepare_relations',
'sparkDriverMemoryForJoining' : '3G',
'sparkExecutorMemoryForJoining' : '7G',
'sparkExecutorCoresForJoining' : '4',
'sparkDriverMemoryForIndexing' : '2G',
'sparkExecutorMemoryForIndexing' : '2G',
'sparkExecutorCoresForIndexing' : '64',
'sparkNetworkTimeout' : '600',
'workingDir' : '/tmp/beta_provision/working_dir/update_solr'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -0,0 +1,74 @@
<RESOURCE_PROFILE>
<HEADER>
<RESOURCE_IDENTIFIER value="65ca9122-f8fe-4aa6-9fb2-bc1e1ffb2dda_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
<RESOURCE_TYPE value="WorkflowDSResourceType"/>
<RESOURCE_KIND value="WorkflowDSResources"/>
<RESOURCE_URI value=""/>
<DATE_OF_CREATION value="2020-06-15T16:35:09+00:00"/>
</HEADER>
<BODY>
<WORKFLOW_NAME>Update Stats [OCEAN]</WORKFLOW_NAME>
<WORKFLOW_TYPE>Data Provision</WORKFLOW_TYPE>
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
<CONFIGURATION start="manual">
<NODE isStart="true" name="setGraphDbName" type="SetEnvParameter">
<DESCRIPTION>Set the path containing the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">openaire_db_name</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isStart="true" name="setStatsDbNameCollection" type="SetEnvParameter">
<DESCRIPTION>Set the target path to store the RAW graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="parameterName" required="true" type="string">stats_db_name</PARAM>
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
</PARAMETERS>
<ARCS>
<ARC to="waitConfig"/>
</ARCS>
</NODE>
<NODE isJoin="true" name="waitConfig">
<DESCRIPTION>wait configurations</DESCRIPTION>
<PARAMETERS/>
<ARCS>
<ARC to="updateStatsDB"/>
</ARCS>
</NODE>
<NODE name="updateStatsDB" type="SubmitHadoopJob">
<DESCRIPTION>create the AGGREGATOR graph</DESCRIPTION>
<PARAMETERS>
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
<PARAM managedBy="system" name="cluster" required="true" type="string">IIS</PARAM>
<PARAM managedBy="system" name="envParams" required="true" type="string">
{
'openaire_db_name' : 'openaire_db_name',
'stats_db_name' : 'stats_db_name'
}
</PARAM>
<PARAM managedBy="system" name="params" required="true" type="string">
{
'oozie.wf.application.path' : '/lib/dnet/oa/graph/stats/oozie_app',
'hive_timeout' : '3000'
}
</PARAM>
<PARAM managedBy="system" name="oozieReportActionsCsv" required="true" type="string">build-report</PARAM>
</PARAMETERS>
<ARCS>
<ARC to="success"/>
</ARCS>
</NODE>
</CONFIGURATION>
<STATUS>
<LAST_EXECUTION_ID>wf_20200615_163630_609</LAST_EXECUTION_ID>
<LAST_EXECUTION_DATE>2020-06-15T17:08:00+00:00</LAST_EXECUTION_DATE>
<LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
<LAST_EXECUTION_ERROR/>
</STATUS>
</BODY>
</RESOURCE_PROFILE>

View File

@ -6,7 +6,7 @@
<parent> <parent>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<relativePath>../</relativePath> <relativePath>../</relativePath>
</parent> </parent>

View File

@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>eu.dnetlib.dhp</groupId> <groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp</artifactId> <artifactId>dhp</artifactId>
<version>1.2.3-SNAPSHOT</version> <version>1.2.4-SNAPSHOT</version>
<packaging>pom</packaging> <packaging>pom</packaging>
<licenses> <licenses>