Merge pull request 'broker' (#78) from broker into master

The changes look good to me.
This commit is contained in:
Claudio Atzori 2020-12-15 10:03:45 +01:00
commit 9f1181290e
36 changed files with 512 additions and 129 deletions

View File

@ -32,15 +32,15 @@ public class CheckDuplictedIdsJob {
IOUtils
.toString(
CheckDuplictedIdsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json")));
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json")));
parser.parseArgument(args);
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String countPath = parser.get("workingPath") + "/counts";
final String countPath = parser.get("outputDir") + "/counts";
log.info("countPath: {}", countPath);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
@ -59,6 +59,7 @@ public class CheckDuplictedIdsJob {
.map(o -> ClusterUtils.incrementAccumulator(o, total), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(countPath);
;

View File

@ -44,10 +44,10 @@ public class GenerateEventsJob {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String eventsPath = workingPath + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final Set<String> dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist");
@ -59,6 +59,9 @@ public class GenerateEventsJob {
final Set<String> dsIdBlacklist = ClusterUtils.parseParamAsList(parser, "datasourceIdBlacklist");
log.info("datasourceIdBlacklist: {}", StringUtils.join(dsIdBlacklist, ","));
final Set<String> topicWhitelist = ClusterUtils.parseParamAsList(parser, "topicWhitelist");
log.info("topicWhitelist: {}", StringUtils.join(topicWhitelist, ","));
final SparkConf conf = new SparkConf();
runWithSparkSession(conf, isSparkSessionManaged, spark -> {
@ -70,12 +73,12 @@ public class GenerateEventsJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_events");
final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class);
.readPath(spark, workingDir + "/duplicates", ResultGroup.class);
final Dataset<Event> dataset = groups
.map(
g -> EventFinder
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators),
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, topicWhitelist, accumulators),
Encoders
.bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));

View File

@ -46,7 +46,7 @@ public class GenerateStatsJob {
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String dbUrl = parser.get("dbUrl");

View File

@ -46,7 +46,7 @@ public class IndexEventSubsetJob {
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
@ -55,6 +55,18 @@ public class IndexEventSubsetJob {
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount");
log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount);
final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait");
log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait);
final String esBatchSizeEntries = parser.get("esBatchSizeEntries");
log.info("esBatchSizeEntries: {}", esBatchSizeEntries);
final String esNodesWanOnly = parser.get("esNodesWanOnly");
log.info("esNodesWanOnly: {}", esNodesWanOnly);
final int maxEventsForTopic = NumberUtils.toInt(parser.get("maxEventsForTopic"));
log.info("maxEventsForTopic: {}", maxEventsForTopic);
@ -86,10 +98,10 @@ public class IndexEventSubsetJob {
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount);
esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait);
esCfg.put("es.batch.size.entries", esBatchSizeEntries);
esCfg.put("es.nodes.wan.only", esNodesWanOnly);
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);

View File

@ -54,7 +54,7 @@ public class IndexNotificationsJob {
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
@ -63,6 +63,18 @@ public class IndexNotificationsJob {
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount");
log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount);
final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait");
log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait);
final String esBatchSizeEntries = parser.get("esBatchSizeEntries");
log.info("esBatchSizeEntries: {}", esBatchSizeEntries);
final String esNodesWanOnly = parser.get("esNodesWanOnly");
log.info("esNodesWanOnly: {}", esNodesWanOnly);
final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl);
@ -92,10 +104,10 @@ public class IndexNotificationsJob {
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount);
esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait);
esCfg.put("es.batch.size.entries", esBatchSizeEntries);
esCfg.put("es.nodes.wan.only", esNodesWanOnly);
log.info("*** Start indexing");
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);

View File

@ -36,7 +36,7 @@ public class IndexOnESJob {
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index");
@ -45,6 +45,18 @@ public class IndexOnESJob {
final String indexHost = parser.get("esHost");
log.info("indexHost: {}", indexHost);
final String esBatchWriteRetryCount = parser.get("esBatchWriteRetryCount");
log.info("esBatchWriteRetryCount: {}", esBatchWriteRetryCount);
final String esBatchWriteRetryWait = parser.get("esBatchWriteRetryWait");
log.info("esBatchWriteRetryWait: {}", esBatchWriteRetryWait);
final String esBatchSizeEntries = parser.get("esBatchSizeEntries");
log.info("esBatchSizeEntries: {}", esBatchSizeEntries);
final String esNodesWanOnly = parser.get("esNodesWanOnly");
log.info("esNodesWanOnly: {}", esNodesWanOnly);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
final JavaRDD<String> inputRdd = ClusterUtils
@ -53,15 +65,13 @@ public class IndexOnESJob {
.javaRDD();
final Map<String, String> esCfg = new HashMap<>();
// esCfg.put("es.nodes", "10.19.65.51, 10.19.65.52, 10.19.65.53, 10.19.65.54");
esCfg.put("es.index.auto.create", "false");
esCfg.put("es.nodes", indexHost);
esCfg.put("es.mapping.id", "eventId"); // THE PRIMARY KEY
esCfg.put("es.batch.write.retry.count", "8");
esCfg.put("es.batch.write.retry.wait", "60s");
esCfg.put("es.batch.size.entries", "200");
esCfg.put("es.nodes.wan.only", "true");
esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount);
esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait);
esCfg.put("es.batch.size.entries", esBatchSizeEntries);
esCfg.put("es.nodes.wan.only", esNodesWanOnly);
JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg);
}

View File

@ -42,10 +42,10 @@ public class JoinStep0Job {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step0";
final String joinedEntitiesPath = workingDir + "/joinedEntities_step0";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
@ -57,10 +57,10 @@ public class JoinStep0Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<RelatedDatasource> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class);
.readPath(spark, workingDir + "/relatedDatasources", RelatedDatasource.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
.toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep1Job {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step1";
final String joinedEntitiesPath = workingDir + "/joinedEntities_step1";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep1Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/joinedEntities_step0", OaBrokerMainEntity.class);
final Dataset<RelatedProject> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class);
.readPath(spark, workingDir + "/relatedProjects", RelatedProject.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
.toColumn();

View File

@ -39,10 +39,10 @@ public class JoinStep2Job {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step2";
final String joinedEntitiesPath = workingDir + "/joinedEntities_step2";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
@ -54,10 +54,10 @@ public class JoinStep2Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/joinedEntities_step1", OaBrokerMainEntity.class);
final Dataset<RelatedSoftware> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class);
.readPath(spark, workingDir + "/relatedSoftwares", RelatedSoftware.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
.toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep3Job {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step3";
final String joinedEntitiesPath = workingDir + "/joinedEntities_step3";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep3Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/joinedEntities_step2", OaBrokerMainEntity.class);
final Dataset<RelatedDataset> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class);
.readPath(spark, workingDir + "/relatedDatasets", RelatedDataset.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
.toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep4Job {
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step4";
final String joinedEntitiesPath = workingDir + "/joinedEntities_step4";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep4Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/joinedEntities_step3", OaBrokerMainEntity.class);
final Dataset<RelatedPublication> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class);
.readPath(spark, workingDir + "/relatedPublications", RelatedPublication.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
.toColumn();

View File

@ -55,10 +55,10 @@ public class PartitionEventsByDsIdJob {
final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events";
final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath);
final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId";
final String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId";
log.info("partitionPath: {}", partitionPath);
final String opendoarIds = parser.get("opendoarIds");
@ -91,6 +91,7 @@ public class PartitionEventsByDsIdJob {
.write()
.partitionBy("group")
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(partitionPath);
});

View File

@ -45,10 +45,10 @@ public class PrepareGroupsJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String groupsPath = workingPath + "/duplicates";
final String groupsPath = workingDir + "/duplicates";
log.info("groupsPath: {}", groupsPath);
final SparkConf conf = new SparkConf();
@ -60,10 +60,10 @@ public class PrepareGroupsJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups");
final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class);
.readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.loadRelations(graphPath, spark)
.filter(r -> r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS));
final TypedColumn<Tuple2<OaBrokerMainEntity, Relation>, ResultGroup> aggr = new ResultAggregator()

View File

@ -42,10 +42,10 @@ public class PrepareRelatedDatasetsJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedDatasets";
final String relsPath = workingDir + "/relatedDatasets";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@ -62,7 +62,7 @@ public class PrepareRelatedDatasetsJob {
.map(ConversionUtils::oafDatasetToBrokerDataset, Encoders.bean(OaBrokerRelatedDataset.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
@ -72,7 +72,8 @@ public class PrepareRelatedDatasetsJob {
final Dataset<RelatedDataset> dataset = rels
.joinWith(datasets, datasets.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> {
final RelatedDataset rel = new RelatedDataset(t._1.getSource(), t._2);
final RelatedDataset rel = new RelatedDataset(t._1.getSource(),
t._2);
rel.getRelDataset().setRelType(t._1.getRelClass());
return rel;
}, Encoders.bean(RelatedDataset.class));

View File

@ -48,10 +48,10 @@ public class PrepareRelatedDatasourcesJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedDatasources";
final String relsPath = workingDir + "/relatedDatasources";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();

View File

@ -44,10 +44,10 @@ public class PrepareRelatedProjectsJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedProjects";
final String relsPath = workingDir + "/relatedProjects";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@ -64,7 +64,7 @@ public class PrepareRelatedProjectsJob {
.map(ConversionUtils::oafProjectToBrokerProject, Encoders.bean(OaBrokerProject.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_PROJECT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))

View File

@ -43,10 +43,10 @@ public class PrepareRelatedPublicationsJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedPublications";
final String relsPath = workingDir + "/relatedPublications";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@ -65,7 +65,7 @@ public class PrepareRelatedPublicationsJob {
Encoders.bean(OaBrokerRelatedPublication.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> ClusterUtils.isValidResultResultClass(r.getRelClass()))
@ -75,7 +75,8 @@ public class PrepareRelatedPublicationsJob {
final Dataset<RelatedPublication> dataset = rels
.joinWith(pubs, pubs.col("openaireId").equalTo(rels.col("target")), "inner")
.map(t -> {
final RelatedPublication rel = new RelatedPublication(t._1.getSource(), t._2);
final RelatedPublication rel = new RelatedPublication(
t._1.getSource(), t._2);
rel.getRelPublication().setRelType(t._1.getRelClass());
return rel;
}, Encoders.bean(RelatedPublication.class));

View File

@ -44,10 +44,10 @@ public class PrepareRelatedSoftwaresJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedSoftwares";
final String relsPath = workingDir + "/relatedSoftwares";
log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf();
@ -64,7 +64,7 @@ public class PrepareRelatedSoftwaresJob {
.map(ConversionUtils::oafSoftwareToBrokerSoftware, Encoders.bean(OaBrokerRelatedSoftware.class));
final Dataset<Relation> rels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.loadRelations(graphPath, spark)
.filter(r -> r.getDataInfo().getDeletedbyinference())
.filter(r -> r.getRelType().equals(ModelConstants.RESULT_RESULT))
.filter(r -> !r.getRelClass().equals(BrokerConstants.IS_MERGED_IN_CLASS))

View File

@ -44,10 +44,10 @@ public class PrepareSimpleEntititiesJob {
final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath");
log.info("workingPath: {}", workingPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
final String simpleEntitiesPath = workingPath + "/simpleEntities";
final String simpleEntitiesPath = workingDir + "/simpleEntities";
log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
final SparkConf conf = new SparkConf();

View File

@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class ClusterUtils {
@ -30,6 +31,16 @@ public class ClusterUtils {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static Dataset<Relation> loadRelations(final String graphPath, final SparkSession spark) {
return ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class)
.map(r -> {
r.setSource(ConversionUtils.cleanOpenaireId(r.getSource()));
r.setTarget(ConversionUtils.cleanOpenaireId(r.getTarget()));
return r;
}, Encoders.bean(Relation.class));
}
public static <R> Dataset<R> readPath(
final SparkSession spark,
final String inputPath,
@ -67,6 +78,7 @@ public class ClusterUtils {
.map(o -> ClusterUtils.incrementAccumulator(o, acc), Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(path);
}

View File

@ -74,7 +74,7 @@ public class ConversionUtils {
}
final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset();
res.setOpenaireId(d.getId());
res.setOpenaireId(cleanOpenaireId(d.getId()));
res.setOriginalId(first(d.getOriginalId()));
res.setTitle(structPropValue(d.getTitle()));
res.setPids(mappedList(d.getPid(), ConversionUtils::oafPidToBrokerPid));
@ -89,7 +89,7 @@ public class ConversionUtils {
}
final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication();
res.setOpenaireId(p.getId());
res.setOpenaireId(cleanOpenaireId(p.getId()));
res.setOriginalId(first(p.getOriginalId()));
res.setTitle(structPropValue(p.getTitle()));
res.setPids(mappedList(p.getPid(), ConversionUtils::oafPidToBrokerPid));
@ -106,7 +106,7 @@ public class ConversionUtils {
final OaBrokerMainEntity res = new OaBrokerMainEntity();
res.setOpenaireId(result.getId());
res.setOpenaireId(cleanOpenaireId(result.getId()));
res.setOriginalId(first(result.getOriginalId()));
res.setTypology(classId(result.getResulttype()));
res.setTitles(structPropList(result.getTitle()));
@ -129,6 +129,10 @@ public class ConversionUtils {
return res;
}
public static String cleanOpenaireId(final String id) {
return id.contains("|") ? StringUtils.substringAfter(id, "|") : id;
}
private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) {
if (author == null) {
return null;
@ -188,7 +192,7 @@ public class ConversionUtils {
}
final OaBrokerProject res = new OaBrokerProject();
res.setOpenaireId(p.getId());
res.setOpenaireId(cleanOpenaireId(p.getId()));
res.setTitle(fieldValue(p.getTitle()));
res.setAcronym(fieldValue(p.getAcronym()));
res.setCode(fieldValue(p.getCode()));
@ -214,7 +218,7 @@ public class ConversionUtils {
}
final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware();
res.setOpenaireId(sw.getId());
res.setOpenaireId(cleanOpenaireId(sw.getId()));
res.setName(structPropValue(sw.getTitle()));
res.setDescription(fieldValue(sw.getDescription()));
res.setRepository(fieldValue(sw.getCodeRepositoryUrl()));
@ -230,7 +234,7 @@ public class ConversionUtils {
final OaBrokerRelatedDatasource res = new OaBrokerRelatedDatasource();
res.setName(StringUtils.defaultIfBlank(fieldValue(ds.getOfficialname()), fieldValue(ds.getEnglishname())));
res.setOpenaireId(ds.getId());
res.setOpenaireId(cleanOpenaireId(ds.getId()));
res.setType(classId(ds.getDatasourcetype()));
return res;
}

View File

@ -59,9 +59,18 @@ public class DatasourceRelationsAccumulator implements Serializable {
final DatasourceRelationsAccumulator res = new DatasourceRelationsAccumulator();
collectedFromSet
.stream()
.map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.COLLECTED_FROM_REL))
.map(
s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s),
BrokerConstants.COLLECTED_FROM_REL))
.forEach(res::addTuple);
hostedBySet.stream().map(s -> new Tuple3<>(r.getId(), s, BrokerConstants.HOSTED_BY_REL)).forEach(res::addTuple);
hostedBySet
.stream()
.map(
s -> new Tuple3<>(ConversionUtils.cleanOpenaireId(r.getId()), ConversionUtils.cleanOpenaireId(s),
BrokerConstants.HOSTED_BY_REL))
.forEach(res::addTuple);
return res;
}

View File

@ -76,6 +76,7 @@ public class EventFinder {
final Set<String> dsIdWhitelist,
final Set<String> dsIdBlacklist,
final Set<String> dsTypeWhitelist,
final Set<String> topicWhitelist,
final Map<String, LongAccumulator> accumulators) {
final List<UpdateInfo<?>> list = new ArrayList<>();
@ -84,7 +85,13 @@ public class EventFinder {
for (final OaBrokerRelatedDatasource targetDs : target.getDatasources()) {
if (verifyTarget(targetDs, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) {
for (final UpdateMatcher<?> matcher : matchers) {
list.addAll(matcher.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators));
for (final UpdateInfo<?> info : matcher
.searchUpdatesForRecord(target, targetDs, results.getData(), accumulators)) {
if (topicWhitelist == null || topicWhitelist.isEmpty()
|| topicWhitelist.contains(info.getTopic().getPath())) {
list.add(info);
}
}
}
}
}

View File

@ -0,0 +1,9 @@
[
{
"paramName": "o",
"paramLongName": "outputDir",
"paramDescription": "the path where the data are stored",
"paramRequired": true
}
]

View File

@ -7,7 +7,7 @@
},
{
"paramName": "o",
"paramLongName": "workingPath",
"paramLongName": "workingDir",
"paramDescription": "the path where the temporary data will be stored",
"paramRequired": true
}

View File

@ -6,7 +6,7 @@
<description>the path where the graph is stored</description>
</property>
<property>
<name>workingPath</name>
<name>outputDir</name>
<description>the path where the the generated data will be stored</description>
</property>
<property>
@ -24,6 +24,11 @@
<value>-</value>
<description>a black list (comma separeted, - for empty list) of datasource ids</description>
</property>
<property>
<name>topicWhitelist</name>
<value>*</value>
<description>a white list (comma separeted, * for all) of topics</description>
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name for events</description>
@ -36,6 +41,26 @@
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>esBatchWriteRetryCount</name>
<value>8</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchWriteRetryWait</name>
<value>60s</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchSizeEntries</name>
<value>200</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esNodesWanOnly</name>
<value>true</value>
<description>an ES configuration property</description>
</property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
@ -111,15 +136,15 @@
</configuration>
</global>
<start to="ensure_working_path"/>
<start to="ensure_output_dir"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ensure_working_path">
<action name="ensure_output_dir">
<fs>
<mkdir path='${workingPath}'/>
<mkdir path='${outputDir}'/>
</fs>
<ok to="start_entities_and_rels"/>
<error to="Kill"/>
@ -152,7 +177,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -176,7 +201,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -201,7 +226,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -225,7 +250,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -249,7 +274,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -273,7 +298,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="wait_entities_and_rels"/>
<error to="Kill"/>
@ -299,7 +324,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="join_entities_step1"/>
<error to="Kill"/>
@ -323,7 +348,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="join_entities_step2"/>
<error to="Kill"/>
@ -347,7 +372,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="join_entities_step3"/>
<error to="Kill"/>
@ -371,7 +396,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="join_entities_step4"/>
<error to="Kill"/>
@ -395,7 +420,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="prepare_groups"/>
<error to="Kill"/>
@ -419,7 +444,7 @@
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
</spark>
<ok to="generate_events"/>
<error to="Kill"/>
@ -442,10 +467,12 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
<arg>--topicWhitelist</arg><arg>${topicWhitelist}</arg>
</spark>
<ok to="index_event_subset"/>
<error to="Kill"/>
@ -468,9 +495,13 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--esBatchWriteRetryCount</arg><arg>${esBatchWriteRetryCount}</arg>
<arg>--esBatchWriteRetryWait</arg><arg>${esBatchWriteRetryWait}</arg>
<arg>--esBatchSizeEntries</arg><arg>${esBatchSizeEntries}</arg>
<arg>--esNodesWanOnly</arg><arg>${esNodesWanOnly}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
@ -495,9 +526,13 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--esBatchWriteRetryCount</arg><arg>${esBatchWriteRetryCount}</arg>
<arg>--esBatchWriteRetryWait</arg><arg>${esBatchWriteRetryWait}</arg>
<arg>--esBatchSizeEntries</arg><arg>${esBatchSizeEntries}</arg>
<arg>--esNodesWanOnly</arg><arg>${esNodesWanOnly}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="stats"/>
@ -521,7 +556,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--dbUrl</arg><arg>${brokerDbUrl}</arg>
<arg>--dbUser</arg><arg>${brokerDbUser}</arg>
<arg>--dbPassword</arg><arg>${brokerDbPassword}</arg>

View File

@ -1,7 +1,13 @@
[
{
"paramName": "wp",
"paramLongName": "workingDir",
"paramDescription": "the path where the temporary data are stored",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "workingPath",
"paramLongName": "outputDir",
"paramDescription": "the path where the generated events will be stored",
"paramRequired": true
},
@ -22,5 +28,11 @@
"paramLongName": "datasourceIdBlacklist",
"paramDescription": "a black list (comma separeted, - for empty list) of datasource ids",
"paramRequired": true
},
{
"paramName": "topicWhitelist",
"paramLongName": "topicWhitelist",
"paramDescription": "a white list (comma separeted, * for all) of topics",
"paramRequired": true
}
]

View File

@ -1,8 +1,8 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramLongName": "outputDir",
"paramDescription": "the data path",
"paramRequired": true
},
{
@ -16,5 +16,29 @@
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryCount",
"paramLongName": "esBatchWriteRetryCount",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryWait",
"paramLongName": "esBatchWriteRetryWait",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchSizeEntries",
"paramLongName": "esBatchSizeEntries",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esNodesWanOnly",
"paramLongName": "esNodesWanOnly",
"paramDescription": "an ES configuration property",
"paramRequired": true
}
]

View File

@ -1,8 +1,8 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramLongName": "outputDir",
"paramDescription": "the path where the generated data are stored",
"paramRequired": true
},
{
@ -16,7 +16,31 @@
"paramLongName": "esHost",
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryCount",
"paramLongName": "esBatchWriteRetryCount",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryWait",
"paramLongName": "esBatchWriteRetryWait",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchSizeEntries",
"paramLongName": "esBatchSizeEntries",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esNodesWanOnly",
"paramLongName": "esNodesWanOnly",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "n",
"paramLongName": "maxEventsForTopic",

View File

@ -1,8 +1,8 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the workinh path",
"paramLongName": "outputDir",
"paramDescription": "the dir that contains the events folder",
"paramRequired": true
},
{
@ -17,6 +17,30 @@
"paramDescription": "the ES host",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryCount",
"paramLongName": "esBatchWriteRetryCount",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchWriteRetryWait",
"paramLongName": "esBatchWriteRetryWait",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esBatchSizeEntries",
"paramLongName": "esBatchSizeEntries",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "esNodesWanOnly",
"paramLongName": "esNodesWanOnly",
"paramDescription": "an ES configuration property",
"paramRequired": true
},
{
"paramName": "broker",
"paramLongName": "brokerApiBaseUrl",

View File

@ -6,8 +6,8 @@
<description>the path where the graph is stored</description>
</property>
<property>
<name>workingPath</name>
<description>the path where the the generated data will be stored</description>
<name>outputDir</name>
<description>the path where the the generated data are stored</description>
</property>
<property>
<name>datasourceIdWhitelist</name>
@ -36,6 +36,26 @@
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>esBatchWriteRetryCount</name>
<value>8</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchWriteRetryWait</name>
<value>60s</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchSizeEntries</name>
<value>200</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esNodesWanOnly</name>
<value>true</value>
<description>an ES configuration property</description>
</property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
@ -122,9 +142,13 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--esBatchWriteRetryCount</arg><arg>${esBatchWriteRetryCount}</arg>
<arg>--esBatchWriteRetryWait</arg><arg>${esBatchWriteRetryWait}</arg>
<arg>--esBatchSizeEntries</arg><arg>${esBatchSizeEntries}</arg>
<arg>--esNodesWanOnly</arg><arg>${esNodesWanOnly}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="End"/>

View File

@ -1,8 +1,8 @@
[
{
"paramName": "o",
"paramLongName": "workingPath",
"paramDescription": "the path where the temporary data will be stored",
"paramLongName": "outputDir",
"paramDescription": "the path where the data will be stored",
"paramRequired": true
},
{

View File

@ -6,7 +6,7 @@
<description>the opendoar IDs whitelist (comma separated)</description>
</property>
<property>
<name>workingPath</name>
<name>outputDir</name>
<description>the path where the the generated data will be stored</description>
</property>
<property>
@ -87,7 +87,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--opendoarIds</arg><arg>${opendoarIds}</arg>
</spark>
<ok to="End"/>

View File

@ -0,0 +1,18 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
</configuration>

View File

@ -0,0 +1,140 @@
<workflow-app name="reindex_events" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>outputDir</name>
<description>the path where the the generated data will be stored</description>
</property>
<property>
<name>esEventIndexName</name>
<description>the elasticsearch index name for events</description>
</property>
<property>
<name>esIndexHost</name>
<description>the elasticsearch host</description>
</property>
<property>
<name>esBatchWriteRetryCount</name>
<value>8</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchWriteRetryWait</name>
<value>60s</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esBatchSizeEntries</name>
<value>200</value>
<description>an ES configuration property</description>
</property>
<property>
<name>esNodesWanOnly</name>
<value>true</value>
<description>an ES configuration property</description>
</property>
<property>
<name>maxIndexedEventsForDsAndTopic</name>
<description>the max number of events for each couple (ds/topic)</description>
</property>
<property>
<name>brokerApiBaseUrl</name>
<description>the url of the broker service api</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>oozieActionShareLibForSpark2</name>
<description>oozie action sharelib for spark 2.*</description>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
<description>spark 2.* extra listeners classname</description>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
<description>spark 2.* sql query execution listeners classname</description>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<description>spark 2.* yarn history server address</description>
</property>
<property>
<name>spark2EventLogDir</name>
<description>spark 2.* event log dir location</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="index_event_subset"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="index_event_subset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>IndexEventSubsetOnESJob</name>
<class>eu.dnetlib.dhp.broker.oa.IndexEventSubsetJob</class>
<jar>dhp-broker-events-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.dynamicAllocation.maxExecutors="8"
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--esBatchWriteRetryCount</arg><arg>${esBatchWriteRetryCount}</arg>
<arg>--esBatchWriteRetryWait</arg><arg>${esBatchWriteRetryWait}</arg>
<arg>--esBatchSizeEntries</arg><arg>${esBatchSizeEntries}</arg>
<arg>--esNodesWanOnly</arg><arg>${esNodesWanOnly}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -1,8 +1,8 @@
[
{
"paramName": "wp",
"paramLongName": "workingPath",
"paramDescription": "the working path",
"paramName": "o",
"paramLongName": "outputDir",
"paramDescription": "the path where generated data are stored",
"paramRequired": true
},
{