forked from D-Net/dnet-hadoop
Splitted join wf
This commit is contained in:
parent
e53dd62e87
commit
202f6e62ff
|
@ -11,7 +11,7 @@ import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
import org.apache.spark.sql.expressions.Aggregator;
|
import org.apache.spark.sql.TypedColumn;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -22,15 +22,15 @@ import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProject;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedProjectAggregator;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
public class JoinEntitiesJob {
|
public class JoinStep1Job {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(JoinEntitiesJob.class);
|
private static final Logger log = LoggerFactory.getLogger(JoinStep1Job.class);
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
JoinEntitiesJob.class
|
JoinStep1Job.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ public class JoinEntitiesJob {
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
log.info("workingPath: {}", workingPath);
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
final String joinedEntitiesPath = workingPath + "/joinedEntities";
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step1";
|
||||||
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
final SparkConf conf = new SparkConf();
|
final SparkConf conf = new SparkConf();
|
||||||
|
@ -52,39 +52,28 @@ public class JoinEntitiesJob {
|
||||||
|
|
||||||
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> r0 = ClusterUtils
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> r1 = join(
|
final Dataset<RelatedProject> typedRels = ClusterUtils
|
||||||
r0, ClusterUtils.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class),
|
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
|
||||||
new RelatedProjectAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r2 = join(
|
|
||||||
// r1, ClusterUtils.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class), new
|
|
||||||
// RelatedDatasetAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r3 = join(
|
|
||||||
// r2, ClusterUtils.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class), new
|
|
||||||
// RelatedPublicationAggregator());
|
|
||||||
// final Dataset<OaBrokerMainEntity> r4 = join(
|
|
||||||
// r3, ClusterUtils.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class), new
|
|
||||||
// RelatedSoftwareAggregator());
|
|
||||||
|
|
||||||
r1.write().mode(SaveMode.Overwrite).json(joinedEntitiesPath);
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedProject>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> Dataset<OaBrokerMainEntity> join(final Dataset<OaBrokerMainEntity> sources,
|
|
||||||
final Dataset<T> typedRels,
|
|
||||||
final Aggregator<Tuple2<OaBrokerMainEntity, T>, OaBrokerMainEntity, OaBrokerMainEntity> aggr) {
|
|
||||||
|
|
||||||
return sources
|
|
||||||
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
|
||||||
.groupByKey(
|
|
||||||
(MapFunction<Tuple2<OaBrokerMainEntity, T>, String>) t -> t._1.getOpenaireId(), Encoders.STRING())
|
|
||||||
.agg(aggr.toColumn())
|
|
||||||
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftware;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedSoftwareAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep2Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep2Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep2Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step2";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedSoftware> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedSoftware>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDataset;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedDatasetAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep3Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep3Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep3Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step3";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedDataset> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedDataset>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,79 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.broker.oa;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
import org.apache.spark.sql.TypedColumn;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.broker.objects.OaBrokerMainEntity;
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublication;
|
||||||
|
import eu.dnetlib.dhp.broker.oa.util.aggregators.withRels.RelatedPublicationAggregator;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class JoinStep4Job {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(JoinStep4Job.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
JoinStep4Job.class
|
||||||
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final String workingPath = parser.get("workingPath");
|
||||||
|
log.info("workingPath: {}", workingPath);
|
||||||
|
|
||||||
|
final String joinedEntitiesPath = workingPath + "/joinedEntities_step4";
|
||||||
|
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
|
||||||
|
|
||||||
|
final SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
|
||||||
|
|
||||||
|
ClusterUtils.removeDir(spark, joinedEntitiesPath);
|
||||||
|
|
||||||
|
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
|
final Dataset<RelatedPublication> typedRels = ClusterUtils
|
||||||
|
.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class);
|
||||||
|
|
||||||
|
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
|
||||||
|
.toColumn();
|
||||||
|
|
||||||
|
sources
|
||||||
|
.joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer")
|
||||||
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<OaBrokerMainEntity, RelatedPublication>, String>) t -> t._1.getOpenaireId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.agg(aggr)
|
||||||
|
.map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class))
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.json(joinedEntitiesPath);
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -32,7 +32,7 @@ public class PrepareGroupsJob {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
.toString(
|
.toString(
|
||||||
JoinEntitiesJob.class
|
PrepareGroupsJob.class
|
||||||
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ public class PrepareGroupsJob {
|
||||||
ClusterUtils.removeDir(spark, groupsPath);
|
ClusterUtils.removeDir(spark, groupsPath);
|
||||||
|
|
||||||
final Dataset<OaBrokerMainEntity> results = ClusterUtils
|
final Dataset<OaBrokerMainEntity> results = ClusterUtils
|
||||||
.readPath(spark, workingPath + "/joinedEntities", OaBrokerMainEntity.class);
|
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class);
|
||||||
|
|
||||||
final Dataset<Relation> mergedRels = ClusterUtils
|
final Dataset<Relation> mergedRels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
|
|
@ -61,6 +61,7 @@ public class PrepareRelatedDatasetsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerProject;
|
import eu.dnetlib.broker.objects.OaBrokerProject;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
|
@ -29,8 +27,6 @@ public class PrepareRelatedProjectsJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedProjectsJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -67,6 +63,7 @@ public class PrepareRelatedProjectsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedPublication;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
|
||||||
|
@ -28,8 +26,6 @@ public class PrepareRelatedPublicationsJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedPublicationsJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -68,6 +64,7 @@ public class PrepareRelatedPublicationsJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -13,8 +13,6 @@ import org.apache.spark.sql.SaveMode;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
import eu.dnetlib.broker.objects.OaBrokerRelatedSoftware;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
import eu.dnetlib.dhp.broker.oa.util.BrokerConstants;
|
||||||
|
@ -29,8 +27,6 @@ public class PrepareRelatedSoftwaresJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
|
private static final Logger log = LoggerFactory.getLogger(PrepareRelatedSoftwaresJob.class);
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -67,6 +63,7 @@ public class PrepareRelatedSoftwaresJob {
|
||||||
|
|
||||||
final Dataset<Relation> rels = ClusterUtils
|
final Dataset<Relation> rels = ClusterUtils
|
||||||
.readPath(spark, graphPath + "/relation", Relation.class)
|
.readPath(spark, graphPath + "/relation", Relation.class)
|
||||||
|
.filter(r -> r.getDataInfo().getDeletedbyinference())
|
||||||
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
|
||||||
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))
|
||||||
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
.filter(r -> !ClusterUtils.isDedupRoot(r.getSource()))
|
||||||
|
|
|
@ -216,14 +216,86 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<join name="wait_entities_and_rels" to="join_entities"/>
|
<join name="wait_entities_and_rels" to="join_entities_step1"/>
|
||||||
|
|
||||||
<action name="join_entities">
|
<action name="join_entities_step1">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>JoinEntitiesJob</name>
|
<name>JoinStep1</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step2"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step2">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep2</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step3"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step3">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep3</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step4"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step4">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep4</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
|
|
@ -73,19 +73,19 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="join_entities"/>
|
<start to="join_entities_step1"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
<action name="join_entities">
|
<action name="join_entities_step1">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>JoinEntitiesJob</name>
|
<name>JoinStep1</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.JoinEntitiesJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep1Job</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
@ -100,16 +100,16 @@
|
||||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="prepare_groups"/>
|
<ok to="join_entities_step2"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="prepare_groups">
|
<action name="join_entities_step2">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>PrepareGroupsJob</name>
|
<name>JoinStep2</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.PrepareGroupsJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep2Job</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
@ -124,16 +124,16 @@
|
||||||
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="generate_events"/>
|
<ok to="join_entities_step3"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<action name="generate_events">
|
<action name="join_entities_step3">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>GenerateEventsJob</name>
|
<name>JoinStep3</name>
|
||||||
<class>eu.dnetlib.dhp.broker.oa.GenerateEventsJob</class>
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep3Job</class>
|
||||||
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
@ -145,9 +145,32 @@
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="join_entities_step4"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="join_entities_step4">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>JoinStep4</name>
|
||||||
|
<class>eu.dnetlib.dhp.broker.oa.JoinStep4Job</class>
|
||||||
|
<jar>dhp-broker-events-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
|
||||||
<arg>--dedupConfProfile</arg><arg>${dedupConfProfId}</arg>
|
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
Loading…
Reference in New Issue