refactoring wf
This commit is contained in:
parent
9cd27183b6
commit
ed787398b3
|
@ -1,190 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.TypedColumn;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class GenerateEventsApplication {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateEventsApplication.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateEventsApplication.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_broker_events.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String eventsPath = parser.get("eventsPath");
|
||||
log.info("eventsPath: {}", eventsPath);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final String dedupConfigProfileId = parser.get("dedupConfProfile");
|
||||
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
// conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
||||
// conf.registerKryoClasses(BrokerConstants.getModelClasses());
|
||||
|
||||
// TODO UNCOMMENT
|
||||
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
||||
final DedupConfig dedupConfig = null;
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, eventsPath);
|
||||
|
||||
// TODO REMOVE THIS
|
||||
|
||||
expandResultsWithRelations(spark, graphPath, Publication.class)
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(eventsPath);
|
||||
|
||||
// TODO UNCOMMENT THIS
|
||||
// spark
|
||||
// .emptyDataset(Encoders.bean(Event.class))
|
||||
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
||||
// .write()
|
||||
// .mode(SaveMode.Overwrite)
|
||||
// .option("compression", "gzip")
|
||||
// .json(eventsPath);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<Event> generateEvents(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass,
|
||||
final DedupConfig dedupConfig) {
|
||||
|
||||
final Dataset<OaBrokerMainEntity> results = expandResultsWithRelations(spark, graphPath, sourceClass);
|
||||
|
||||
final Dataset<Relation> mergedRels = ClusterUtils
|
||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
|
||||
.toColumn();
|
||||
|
||||
return results
|
||||
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map((MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
|
||||
.filter(rg -> rg.getData().size() > 1)
|
||||
.map(
|
||||
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||
Encoders.bean(EventGroup.class))
|
||||
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass) {
|
||||
|
||||
// final Dataset<eu.dnetlib.dhp.schema.oaf.Dataset> datasets = readPath(
|
||||
// spark, graphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
|
||||
// final Dataset<Software> softwares = readPath(spark, graphPath + "/software", Software.class);
|
||||
// final Dataset<Publication> publications = readPath(spark, graphPath + "/publication", Publication.class);
|
||||
|
||||
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
|
||||
.readPath(spark, graphPath + "/" + sourceClass.getSimpleName().toLowerCase(), sourceClass)
|
||||
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||
.map(ConversionUtils::oafResultToBrokerResult, Encoders.bean(OaBrokerMainEntity.class));
|
||||
|
||||
// TODO UNCOMMENT THIS
|
||||
// final Dataset<OaBrokerMainEntity> r1 = join(r0, relatedProjects(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r2 = join(r1, relatedDataset(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r3 = join(r2, relatedPublications(spark, graphPath));
|
||||
// final Dataset<OaBrokerMainEntity> r4 = join(r3, relatedSoftwares(spark, graphPath));
|
||||
|
||||
return r0; // TODO it should be r4
|
||||
}
|
||||
|
||||
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
|
||||
final Dataset<T> typedRels) {
|
||||
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
|
||||
.toColumn();
|
||||
|
||||
return sources
|
||||
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
||||
|
||||
}
|
||||
|
||||
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
||||
|
||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
final String conf = isLookUpService
|
||||
.getResourceProfileByQuery(
|
||||
String
|
||||
.format(
|
||||
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
|
||||
profId));
|
||||
|
||||
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
||||
dedupConfig.getPace().initModel();
|
||||
dedupConfig.getPace().initTranslationMap();
|
||||
// dedupConfig.getWf().setConfigurationId("???");
|
||||
|
||||
return dedupConfig;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.model.Event;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventFinder;
|
||||
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.config.DedupConfig;
|
||||
|
||||
public class GenerateEventsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateEventsJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateEventsJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String isLookupUrl = parser.get("isLookupUrl");
|
||||
log.info("isLookupUrl: {}", isLookupUrl);
|
||||
|
||||
final String dedupConfigProfileId = parser.get("dedupConfProfile");
|
||||
log.info("dedupConfigProfileId: {}", dedupConfigProfileId);
|
||||
|
||||
final String eventsPath = workingPath + "/eventsPath";
|
||||
log.info("eventsPath: {}", eventsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
// TODO UNCOMMENT
|
||||
// final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId);
|
||||
final DedupConfig dedupConfig = null;
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, eventsPath);
|
||||
|
||||
final Dataset<ResultGroup> groups = ClusterUtils
|
||||
.readPath(spark, graphPath + "/relation", ResultGroup.class);
|
||||
|
||||
final Dataset<Event> events = groups
|
||||
.map(
|
||||
(MapFunction<ResultGroup, EventGroup>) g -> EventFinder.generateEvents(g, dedupConfig),
|
||||
Encoders.bean(EventGroup.class))
|
||||
.flatMap(group -> group.getData().iterator(), Encoders.bean(Event.class));
|
||||
|
||||
events.write().mode(SaveMode.Overwrite).json(eventsPath);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception {
|
||||
|
||||
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
final String conf = isLookUpService
|
||||
.getResourceProfileByQuery(
|
||||
String
|
||||
.format(
|
||||
"for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()",
|
||||
profId));
|
||||
|
||||
final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class);
|
||||
dedupConfig.getPace().initModel();
|
||||
dedupConfig.getPace().initTranslationMap();
|
||||
// dedupConfig.getWf().setConfigurationId("???");
|
||||
|
||||
return dedupConfig;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.TypedColumn;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.OaBrokerMainEntityAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class JoinEntitiesJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
JoinEntitiesJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String joinedEntitiesPath = workingPath + "/joinedEntities";
|
||||
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||
|
||||
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
|
||||
.readPath(spark, graphPath + "/simpleEntities", OaBrokerMainEntity.class);
|
||||
|
||||
final Dataset<OaBrokerMainEntity> r1 = join(
|
||||
r0, ClusterUtils.readPath(spark, graphPath + "/relatedProjects", RelatedProject.class));
|
||||
final Dataset<OaBrokerMainEntity> r2 = join(
|
||||
r1, ClusterUtils.readPath(spark, graphPath + "/relatedDatasets", RelatedDataset.class));
|
||||
final Dataset<OaBrokerMainEntity> r3 = join(
|
||||
r2, ClusterUtils.readPath(spark, graphPath + "/relatedPublications", RelatedPublication.class));
|
||||
final Dataset<OaBrokerMainEntity> r4 = join(
|
||||
r3, ClusterUtils.readPath(spark, graphPath + "/relatedSoftwares", RelatedSoftware.class));
|
||||
|
||||
r4.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
|
||||
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
|
||||
final Dataset<T> typedRels) {
|
||||
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity> aggr = new OaBrokerMainEntityAggregator<T>()
|
||||
.toColumn();
|
||||
|
||||
return sources
|
||||
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
|
||||
package eu.dnetlib.dhp.broker.oa;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.TypedColumn;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultAggregator;
|
||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class PrepareGroupsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareGroupsJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
JoinEntitiesJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String groupsPath = workingPath + "/groups";
|
||||
log.info("groupsPath: {}", groupsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, groupsPath);
|
||||
|
||||
final Dataset<OaBrokerMainEntity> results = ClusterUtils
|
||||
.readPath(spark, graphPath + "/joinedEntities", OaBrokerMainEntity.class);
|
||||
|
||||
final Dataset<Relation> mergedRels = ClusterUtils
|
||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
|
||||
|
||||
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()
|
||||
.toColumn();
|
||||
|
||||
final Dataset<ResultGroup> groups = results
|
||||
.joinWith(mergedRels, results.col("openaireId").equalTo(mergedRels.col("source")), "inner")
|
||||
.groupByKey(
|
||||
(MapFunction<Tuple2<OaBrokerMainEntity, Relation>, String>) t -> t._2.getTarget(),
|
||||
Encoders.STRING())
|
||||
.agg(aggr)
|
||||
.map(
|
||||
(MapFunction<Tuple2<String, ResultGroup>, ResultGroup>) t -> t._2, Encoders.bean(ResultGroup.class))
|
||||
.filter(rg -> rg.getData().size() > 1);
|
||||
|
||||
groups
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(groupsPath);
|
||||
|
||||
});
|
||||
}
|
||||
|
||||
}
|
|
@ -19,16 +19,16 @@ import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
|||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedDatasets {
|
||||
public class PrepareRelatedDatasetsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedDatasets.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedDatasetsJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedDatasets.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
PrepareRelatedDatasetsJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
|
@ -40,7 +40,10 @@ public class GenerateRelatedDatasets {
|
|||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String relsPath = workingPath + "/relatedDatasets";
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
|
@ -23,9 +23,9 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
|||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedProjects {
|
||||
public class PrepareRelatedProjectsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedProjects.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -33,8 +33,8 @@ public class GenerateRelatedProjects {
|
|||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedProjects.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
PrepareRelatedProjectsJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
|
@ -46,7 +46,10 @@ public class GenerateRelatedProjects {
|
|||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String relsPath = workingPath + "/relatedProjects";
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
|
@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
|||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
|
||||
public class GenerateRelatedPublications {
|
||||
public class PrepareRelatedPublicationsJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedPublications.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -32,8 +32,8 @@ public class GenerateRelatedPublications {
|
|||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedPublications.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
PrepareRelatedPublicationsJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
|
@ -45,7 +45,10 @@ public class GenerateRelatedPublications {
|
|||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String relsPath = workingPath + "/relatedPublications";
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
|
@ -22,9 +22,9 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
|||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class GenerateRelatedSoftwares {
|
||||
public class PrepareRelatedSoftwaresJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateRelatedSoftwares.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
|
||||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -32,8 +32,8 @@ public class GenerateRelatedSoftwares {
|
|||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateRelatedSoftwares.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_relations.json")));
|
||||
PrepareRelatedSoftwaresJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
|
@ -45,7 +45,10 @@ public class GenerateRelatedSoftwares {
|
|||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String relsPath = parser.get("relsPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String relsPath = workingPath + "/relatedSoftwares";
|
||||
log.info("relsPath: {}", relsPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
@ -53,6 +56,7 @@ public class GenerateRelatedSoftwares {
|
|||
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||
|
||||
ClusterUtils.removeDir(spark, relsPath);
|
||||
|
||||
final Dataset<Software> softwares = ClusterUtils.readPath(spark, graphPath + "/software", Software.class);
|
||||
|
||||
final Dataset<Relation> rels = ClusterUtils.readPath(spark, graphPath + "/relation", Relation.class);
|
|
@ -18,19 +18,21 @@ import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
|||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||
import eu.dnetlib.dhp.broker.oa.util.ConversionUtils;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
|
||||
public class GenerateSimpleEntitities {
|
||||
public class PrepareSimpleEntititiesJob {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GenerateSimpleEntitities.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(PrepareSimpleEntititiesJob.class);
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils
|
||||
.toString(
|
||||
GenerateSimpleEntitities.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_simple_entities.json")));
|
||||
PrepareSimpleEntititiesJob.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
final Boolean isSparkSessionManaged = Optional
|
||||
|
@ -42,7 +44,10 @@ public class GenerateSimpleEntitities {
|
|||
final String graphPath = parser.get("graphPath");
|
||||
log.info("graphPath: {}", graphPath);
|
||||
|
||||
final String simpleEntitiesPath = parser.get("simpleEntitiesPath");
|
||||
final String workingPath = parser.get("workingPath");
|
||||
log.info("workingPath: {}", workingPath);
|
||||
|
||||
final String simpleEntitiesPath = workingPath + "/simpleEntities";
|
||||
log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
|
||||
|
||||
final SparkConf conf = new SparkConf();
|
||||
|
@ -51,27 +56,18 @@ public class GenerateSimpleEntitities {
|
|||
|
||||
ClusterUtils.removeDir(spark, simpleEntitiesPath);
|
||||
|
||||
expandResultsWithRelations(spark, graphPath, Publication.class)
|
||||
prepareSimpleEntities(spark, graphPath, Publication.class)
|
||||
.union(prepareSimpleEntities(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class))
|
||||
.union(prepareSimpleEntities(spark, graphPath, Software.class))
|
||||
.union(prepareSimpleEntities(spark, graphPath, OtherResearchProduct.class))
|
||||
.write()
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(simpleEntitiesPath);
|
||||
|
||||
// TODO UNCOMMENT THIS
|
||||
// spark
|
||||
// .emptyDataset(Encoders.bean(Event.class))
|
||||
// .union(generateEvents(spark, graphPath, Publication.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, eu.dnetlib.dhp.schema.oaf.Dataset.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, Software.class, dedupConfig))
|
||||
// .union(generateEvents(spark, graphPath, OtherResearchProduct.class, dedupConfig))
|
||||
// .write()
|
||||
// .mode(SaveMode.Overwrite)
|
||||
// .option("compression", "gzip")
|
||||
// .json(eventsPath);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
private static <SRC extends Result> Dataset<OaBrokerMainEntity> expandResultsWithRelations(
|
||||
private static <SRC extends Result> Dataset<OaBrokerMainEntity> prepareSimpleEntities(
|
||||
final SparkSession spark,
|
||||
final String graphPath,
|
||||
final Class<SRC> sourceClass) {
|
|
@ -14,6 +14,10 @@ public class ClusterUtils {
|
|||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
public static void createDirIfMissing(final SparkSession spark, final String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
||||
public static void removeDir(final SparkSession spark, final String path) {
|
||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||
}
|
||||
|
|
|
@ -7,8 +7,8 @@
|
|||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "relsPath",
|
||||
"paramDescription": "the path where the generated relations will be stored",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the path where the temporary data will be stored",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -6,8 +6,8 @@
|
|||
<description>the path where the graph is stored</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>eventsOutputPath</name>
|
||||
<description>the path where the the events will be stored</description>
|
||||
<name>workingPath</name>
|
||||
<description>the path where the the generated data will be stored</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>isLookupUrl</name>
|
||||
|
@ -73,18 +73,35 @@
|
|||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="generate_events"/>
|
||||
<start to="ensure_working_path"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="generate_events">
|
||||
|
||||
<action name="ensure_working_path">
|
||||
<fs>
|
||||
<mkdir path='${workingPath}'/>
|
||||
</fs>
|
||||
<ok to="start_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<fork name="start_entities_and_rels">
|
||||
<path start="prepare_simple_entities"/>
|
||||
<path start="prepare_related_datasets"/>
|
||||
<path start="prepare_related_projects"/>
|
||||
<path start="prepare_related_publications"/>
|
||||
<path start="prepare_related_softwares"/>
|
||||
</fork>
|
||||
|
||||
<action name="prepare_simple_entities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateEvents</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsApplication</class>
|
||||
<name>PrepareSimpleEntititiesJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareSimpleEntititiesJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
|
@ -97,7 +114,183 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--eventsPath</arg><arg>${eventsOutputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="prepare_related_datasets">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareRelatedDatasetsJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedDatasetsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="prepare_related_projects">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareRelatedProjectsJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedProjectsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="prepare_related_publications">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareRelatedPublicationsJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedPublicationsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="prepare_related_softwares">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareRelatedSoftwaresJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareRelatedSoftwaresJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="wait_entities_and_rels"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<join name="wait_entities_and_rels" to="join_entities"/>
|
||||
|
||||
<action name="join_entities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>JoinEntitiesJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="prepare_groups"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="prepare_groups">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareGroupsJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
</spark>
|
||||
<ok to="generate_events"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="generate_events">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>GenerateEventsJob</name>
|
||||
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
|
||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
||||
</spark>
|
||||
|
@ -105,6 +298,9 @@
|
|||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "eventsPath",
|
||||
"paramLongName": "workingPath",
|
||||
"paramDescription": "the path where the generated events will be stored",
|
||||
"paramRequired": true
|
||||
},
|
|
@ -1,14 +0,0 @@
|
|||
[
|
||||
{
|
||||
"paramName": "g",
|
||||
"paramLongName": "graphPath",
|
||||
"paramDescription": "the path where there the graph is stored",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "o",
|
||||
"paramLongName": "simpleEntitiesPath",
|
||||
"paramDescription": "the path where the generated simple entities (without relations) will be stored",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue