broker #78

Merged
claudio.atzori merged 10 commits from broker into master 2020-12-15 10:03:46 +01:00
30 changed files with 128 additions and 123 deletions
Showing only changes of commit 2e7df07328 - Show all commits

View File

@ -31,15 +31,15 @@ public class CheckDuplictedIdsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString(CheckDuplictedIdsJob.class .toString(CheckDuplictedIdsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/check_duplicates.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String countPath = parser.get("workingPath") + "/counts"; final String countPath = parser.get("outputDir") + "/counts";
log.info("countPath: {}", countPath); log.info("countPath: {}", countPath);
final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); final SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

View File

@ -33,8 +33,7 @@ public class GenerateEventsJob {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(GenerateEventsJob.class
GenerateEventsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_events.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -44,10 +43,10 @@ public class GenerateEventsJob {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String eventsPath = workingPath + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final Set<String> dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist"); final Set<String> dsIdWhitelist = ClusterUtils.parseParamAsList(parser, "datasourceIdWhitelist");
@ -70,13 +69,11 @@ public class GenerateEventsJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_events"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_events");
final Dataset<ResultGroup> groups = ClusterUtils final Dataset<ResultGroup> groups = ClusterUtils
.readPath(spark, workingPath + "/duplicates", ResultGroup.class); .readPath(spark, workingDir + "/duplicates", ResultGroup.class);
final Dataset<Event> dataset = groups final Dataset<Event> dataset = groups
.map( .map(g -> EventFinder
g -> EventFinder .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders
.generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators),
Encoders
.bean(EventGroup.class)) .bean(EventGroup.class))
.flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class));

View File

@ -33,8 +33,7 @@ public class GenerateStatsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(GenerateStatsJob.class
GenerateStatsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -46,7 +45,7 @@ public class GenerateStatsJob {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String dbUrl = parser.get("dbUrl"); final String dbUrl = parser.get("dbUrl");

View File

@ -39,14 +39,13 @@ public class IndexEventSubsetJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(IndexEventSubsetJob.class
IndexEventSubsetJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index"); final String index = parser.get("index");

View File

@ -47,14 +47,13 @@ public class IndexNotificationsJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(IndexNotificationsJob.class
IndexNotificationsJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index"); final String index = parser.get("index");
@ -117,8 +116,7 @@ public class IndexNotificationsJob {
final long date) { final long date) {
final List<Notification> list = subscriptions final List<Notification> list = subscriptions
.stream() .stream()
.filter( .filter(s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic()))
.filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap()))
.map(s -> generateNotification(s, e, date)) .map(s -> generateNotification(s, e, date))
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -149,17 +147,14 @@ public class IndexNotificationsJob {
if (conditions.containsKey("trust") if (conditions.containsKey("trust")
&& !SubscriptionUtils && !SubscriptionUtils
.verifyFloatRange( .verifyFloatRange(map.getTrust(), conditions.get("trust").get(0).getValue(), conditions.get("trust").get(0).getOtherValue())) {
map.getTrust(), conditions.get("trust").get(0).getValue(),
conditions.get("trust").get(0).getOtherValue())) {
return false; return false;
} }
if (conditions.containsKey("targetDateofacceptance") && !conditions if (conditions.containsKey("targetDateofacceptance") && !conditions
.get("targetDateofacceptance") .get("targetDateofacceptance")
.stream() .stream()
.anyMatch( .anyMatch(c -> SubscriptionUtils
c -> SubscriptionUtils
.verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) {
return false; return false;
} }

View File

@ -29,14 +29,13 @@ public class IndexOnESJob {
final ArgumentApplicationParser parser = new ArgumentApplicationParser( final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils IOUtils
.toString( .toString(IndexOnESJob.class
IndexOnESJob.class
.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json"))); .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_es.json")));
parser.parseArgument(args); parser.parseArgument(args);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String index = parser.get("index"); final String index = parser.get("index");

View File

@ -42,10 +42,10 @@ public class JoinStep0Job {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step0"; final String joinedEntitiesPath = workingDir + "/joinedEntities_step0";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -57,10 +57,10 @@ public class JoinStep0Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/simpleEntities", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/simpleEntities", OaBrokerMainEntity.class);
final Dataset<RelatedDatasource> typedRels = ClusterUtils final Dataset<RelatedDatasource> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedDatasources", RelatedDatasource.class); .readPath(spark, workingDir + "/relatedDatasources", RelatedDatasource.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDatasource>, OaBrokerMainEntity> aggr = new RelatedDatasourceAggregator()
.toColumn(); .toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep1Job {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step1"; final String joinedEntitiesPath = workingDir + "/joinedEntities_step1";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep1Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step0", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step0", OaBrokerMainEntity.class);
final Dataset<RelatedProject> typedRels = ClusterUtils final Dataset<RelatedProject> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedProjects", RelatedProject.class); .readPath(spark, workingDir + "/relatedProjects", RelatedProject.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedProject>, OaBrokerMainEntity> aggr = new RelatedProjectAggregator()
.toColumn(); .toColumn();

View File

@ -39,10 +39,10 @@ public class JoinStep2Job {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step2"; final String joinedEntitiesPath = workingDir + "/joinedEntities_step2";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -54,10 +54,10 @@ public class JoinStep2Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step1", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step1", OaBrokerMainEntity.class);
final Dataset<RelatedSoftware> typedRels = ClusterUtils final Dataset<RelatedSoftware> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedSoftwares", RelatedSoftware.class); .readPath(spark, workingDir + "/relatedSoftwares", RelatedSoftware.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedSoftware>, OaBrokerMainEntity> aggr = new RelatedSoftwareAggregator()
.toColumn(); .toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep3Job {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step3"; final String joinedEntitiesPath = workingDir + "/joinedEntities_step3";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep3Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step2", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step2", OaBrokerMainEntity.class);
final Dataset<RelatedDataset> typedRels = ClusterUtils final Dataset<RelatedDataset> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedDatasets", RelatedDataset.class); .readPath(spark, workingDir + "/relatedDatasets", RelatedDataset.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedDataset>, OaBrokerMainEntity> aggr = new RelatedDatasetAggregator()
.toColumn(); .toColumn();

View File

@ -40,10 +40,10 @@ public class JoinStep4Job {
.orElse(Boolean.TRUE); .orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String joinedEntitiesPath = workingPath + "/joinedEntities_step4"; final String joinedEntitiesPath = workingDir + "/joinedEntities_step4";
log.info("joinedEntitiesPath: {}", joinedEntitiesPath); log.info("joinedEntitiesPath: {}", joinedEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -55,10 +55,10 @@ public class JoinStep4Job {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_entities");
final Dataset<OaBrokerMainEntity> sources = ClusterUtils final Dataset<OaBrokerMainEntity> sources = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step3", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step3", OaBrokerMainEntity.class);
final Dataset<RelatedPublication> typedRels = ClusterUtils final Dataset<RelatedPublication> typedRels = ClusterUtils
.readPath(spark, workingPath + "/relatedPublications", RelatedPublication.class); .readPath(spark, workingDir + "/relatedPublications", RelatedPublication.class);
final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator() final TypedColumn<Tuple2<OaBrokerMainEntity, RelatedPublication>, OaBrokerMainEntity> aggr = new RelatedPublicationAggregator()
.toColumn(); .toColumn();

View File

@ -54,10 +54,10 @@ public class PartitionEventsByDsIdJob {
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
final String eventsPath = parser.get("workingPath") + "/events"; final String eventsPath = parser.get("outputDir") + "/events";
log.info("eventsPath: {}", eventsPath); log.info("eventsPath: {}", eventsPath);
final String partitionPath = parser.get("workingPath") + "/eventsByOpendoarId"; final String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId";
log.info("partitionPath: {}", partitionPath); log.info("partitionPath: {}", partitionPath);
final String opendoarIds = parser.get("opendoarIds"); final String opendoarIds = parser.get("opendoarIds");

View File

@ -45,10 +45,10 @@ public class PrepareGroupsJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String groupsPath = workingPath + "/duplicates"; final String groupsPath = workingDir + "/duplicates";
log.info("groupsPath: {}", groupsPath); log.info("groupsPath: {}", groupsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();
@ -60,7 +60,7 @@ public class PrepareGroupsJob {
final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups"); final LongAccumulator total = spark.sparkContext().longAccumulator("total_groups");
final Dataset<OaBrokerMainEntity> results = ClusterUtils final Dataset<OaBrokerMainEntity> results = ClusterUtils
.readPath(spark, workingPath + "/joinedEntities_step4", OaBrokerMainEntity.class); .readPath(spark, workingDir + "/joinedEntities_step4", OaBrokerMainEntity.class);
final Dataset<Relation> mergedRels = ClusterUtils final Dataset<Relation> mergedRels = ClusterUtils
.readPath(spark, graphPath + "/relation", Relation.class) .readPath(spark, graphPath + "/relation", Relation.class)

View File

@ -42,10 +42,10 @@ public class PrepareRelatedDatasetsJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedDatasets"; final String relsPath = workingDir + "/relatedDatasets";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -48,10 +48,10 @@ public class PrepareRelatedDatasourcesJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedDatasources"; final String relsPath = workingDir + "/relatedDatasources";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -44,10 +44,10 @@ public class PrepareRelatedProjectsJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedProjects"; final String relsPath = workingDir + "/relatedProjects";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -43,10 +43,10 @@ public class PrepareRelatedPublicationsJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedPublications"; final String relsPath = workingDir + "/relatedPublications";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -44,10 +44,10 @@ public class PrepareRelatedSoftwaresJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String relsPath = workingPath + "/relatedSoftwares"; final String relsPath = workingDir + "/relatedSoftwares";
log.info("relsPath: {}", relsPath); log.info("relsPath: {}", relsPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -44,10 +44,10 @@ public class PrepareSimpleEntititiesJob {
final String graphPath = parser.get("graphPath"); final String graphPath = parser.get("graphPath");
log.info("graphPath: {}", graphPath); log.info("graphPath: {}", graphPath);
final String workingPath = parser.get("workingPath"); final String workingDir = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingDir: {}", workingDir);
final String simpleEntitiesPath = workingPath + "/simpleEntities"; final String simpleEntitiesPath = workingDir + "/simpleEntities";
log.info("simpleEntitiesPath: {}", simpleEntitiesPath); log.info("simpleEntitiesPath: {}", simpleEntitiesPath);
final SparkConf conf = new SparkConf(); final SparkConf conf = new SparkConf();

View File

@ -0,0 +1,9 @@
[
{
"paramName": "o",
"paramLongName": "outputDir",
"paramDescription": "the path where the data are stored",
"paramRequired": true
}
]

View File

@ -7,7 +7,7 @@
}, },
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "workingDir",
"paramDescription": "the path where the temporary data will be stored", "paramDescription": "the path where the temporary data will be stored",
"paramRequired": true "paramRequired": true
} }

View File

@ -6,7 +6,7 @@
<description>the path where the graph is stored</description> <description>the path where the graph is stored</description>
</property> </property>
<property> <property>
<name>workingPath</name> <name>outputDir</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data will be stored</description>
</property> </property>
<property> <property>
@ -119,7 +119,7 @@
<action name="ensure_working_path"> <action name="ensure_working_path">
<fs> <fs>
<mkdir path='${workingPath}'/> <mkdir path='${workingDir}'/>
</fs> </fs>
<ok to="start_entities_and_rels"/> <ok to="start_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -152,7 +152,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -176,7 +176,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -201,7 +201,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -225,7 +225,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -249,7 +249,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -273,7 +273,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="wait_entities_and_rels"/> <ok to="wait_entities_and_rels"/>
<error to="Kill"/> <error to="Kill"/>
@ -299,7 +299,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="join_entities_step1"/> <ok to="join_entities_step1"/>
<error to="Kill"/> <error to="Kill"/>
@ -323,7 +323,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="join_entities_step2"/> <ok to="join_entities_step2"/>
<error to="Kill"/> <error to="Kill"/>
@ -347,7 +347,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="join_entities_step3"/> <ok to="join_entities_step3"/>
<error to="Kill"/> <error to="Kill"/>
@ -371,7 +371,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="join_entities_step4"/> <ok to="join_entities_step4"/>
<error to="Kill"/> <error to="Kill"/>
@ -395,7 +395,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="prepare_groups"/> <ok to="prepare_groups"/>
<error to="Kill"/> <error to="Kill"/>
@ -419,7 +419,7 @@
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--graphPath</arg><arg>${graphInputPath}</arg> <arg>--graphPath</arg><arg>${graphInputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
</spark> </spark>
<ok to="generate_events"/> <ok to="generate_events"/>
<error to="Kill"/> <error to="Kill"/>
@ -442,7 +442,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg> <arg>--datasourceIdWhitelist</arg><arg>${datasourceIdWhitelist}</arg>
<arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg> <arg>--datasourceTypeWhitelist</arg><arg>${datasourceTypeWhitelist}</arg>
<arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg> <arg>--datasourceIdBlacklist</arg><arg>${datasourceIdBlacklist}</arg>
@ -468,7 +469,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esEventIndexName}</arg> <arg>--index</arg><arg>${esEventIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg> <arg>--maxEventsForTopic</arg><arg>${maxIndexedEventsForDsAndTopic}</arg>
@ -495,7 +496,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg> <arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg> <arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>
@ -521,7 +522,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--dbUrl</arg><arg>${brokerDbUrl}</arg> <arg>--dbUrl</arg><arg>${brokerDbUrl}</arg>
<arg>--dbUser</arg><arg>${brokerDbUser}</arg> <arg>--dbUser</arg><arg>${brokerDbUser}</arg>
<arg>--dbPassword</arg><arg>${brokerDbPassword}</arg> <arg>--dbPassword</arg><arg>${brokerDbPassword}</arg>

View File

@ -1,7 +1,13 @@
[ [
{
"paramName": "wp",
"paramLongName": "workingDir",
"paramDescription": "the path where the temporary data are stored",
"paramRequired": true
},
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the path where the generated events will be stored", "paramDescription": "the path where the generated events will be stored",
"paramRequired": true "paramRequired": true
}, },

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the workinh path", "paramDescription": "the data path",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the workinh path", "paramDescription": "the path where the generated data are stored",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the workinh path", "paramDescription": "the dir that contains the events folder",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -6,8 +6,8 @@
<description>the path where the graph is stored</description> <description>the path where the graph is stored</description>
</property> </property>
<property> <property>
<name>workingPath</name> <name>outputDir</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data are stored</description>
</property> </property>
<property> <property>
<name>datasourceIdWhitelist</name> <name>datasourceIdWhitelist</name>
@ -122,7 +122,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--outputDir</arg><arg>${outputDir}</arg>
<arg>--index</arg><arg>${esNotificationsIndexName}</arg> <arg>--index</arg><arg>${esNotificationsIndexName}</arg>
<arg>--esHost</arg><arg>${esIndexHost}</arg> <arg>--esHost</arg><arg>${esIndexHost}</arg>
<arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg> <arg>--brokerApiBaseUrl</arg><arg>${brokerApiBaseUrl}</arg>

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "o", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the path where the temporary data will be stored", "paramDescription": "the path where the data will be stored",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -6,7 +6,7 @@
<description>the opendoar IDs whitelist (comma separated)</description> <description>the opendoar IDs whitelist (comma separated)</description>
</property> </property>
<property> <property>
<name>workingPath</name> <name>outputDir</name>
<description>the path where the the generated data will be stored</description> <description>the path where the the generated data will be stored</description>
</property> </property>
<property> <property>
@ -87,7 +87,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840 --conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--workingPath</arg><arg>${workingPath}</arg> <arg>--workingDir</arg><arg>${workingDir}</arg>
<arg>--opendoarIds</arg><arg>${opendoarIds}</arg> <arg>--opendoarIds</arg><arg>${opendoarIds}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>

View File

@ -1,8 +1,8 @@
[ [
{ {
"paramName": "wp", "paramName": "o",
"paramLongName": "workingPath", "paramLongName": "outputDir",
"paramDescription": "the working path", "paramDescription": "the path where generated data are stored",
"paramRequired": true "paramRequired": true
}, },
{ {