From 9e3e93c6b6a919878293adbfe27406be7abd5677 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 10:39:16 +0200 Subject: [PATCH 1/7] setting the correct issn type in the datasource.journal element --- .../dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java | 2 +- .../test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 87c935d835..1e7b56ee9c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -577,7 +577,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null; if (issn != null || eissn != null || lissn != null) { - return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); + return journal(name, issn, eissn, lissn, null, null, null, null, null, null, null, info); } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml new file mode 100644 index 0000000000..e69de29bb2 From 9a7e72d528d093f2ff338df32c9e7f99f87158e1 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 10:42:47 +0200 Subject: [PATCH 2/7] using concat_ws to join textual columns from PSQL. When using || to perform the concatenation, Null columns makes the operation result to be Null --- .../resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index 43b0f8f4b1..d6eae3b555 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - d.issn || ' @@@ ' || d.eissn || ' @@@ ' || d.lissn AS journal + concat_ws(' @@@ ', issn, lissn, eissn) AS journal FROM dsm_datasources d From 42f55395c80dca6c4cfb09c554b7afe402415ee0 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:09:58 +0200 Subject: [PATCH 3/7] fixed order of the ISSNs returned by the SQL query --- .../resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index d6eae3b555..3033b9f879 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - concat_ws(' @@@ ', issn, lissn, eissn) AS journal + concat_ws(' @@@ ', issn, eissn, lissn) AS journal FROM dsm_datasources d From fb22f4d70b08ca7fbd3b1572f36ee9addba1b00a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:10:59 +0200 Subject: [PATCH 4/7] included values for projects fundedamount and totalcost fields in the mapping tests. Swapped expected and actual values in junit test assertions --- .../raw/MigrateDbEntitiesApplicationTest.java | 62 ++++++++++++------- .../graph/raw/projects_resultset_entry.json | 4 +- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 22fcb36c98..e8059f5069 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -73,12 +73,16 @@ public class MigrateDbEntitiesApplicationTest { final Datasource ds = (Datasource) list.get(0); assertValidId(ds.getId()); assertValidId(ds.getCollectedfrom().get(0).getKey()); - assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields)); - assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields)); - assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields)); - assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields)); - assertEquals(ds.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("officialname", fields), ds.getOfficialname().getValue()); + assertEquals(getValueAsString("englishname", fields), ds.getEnglishname().getValue()); + assertEquals(getValueAsString("contactemail", fields), ds.getContactemail().getValue()); + assertEquals(getValueAsString("websiteurl", fields), ds.getWebsiteurl().getValue()); + assertEquals(getValueAsString("namespaceprefix", fields), ds.getNamespaceprefix().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), ds.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsString("officialname", fields), ds.getJournal().getName()); + assertEquals("2579-5449", ds.getJournal().getIssnPrinted()); + assertEquals("2597-6540", ds.getJournal().getIssnOnline()); + assertEquals(null, ds.getJournal().getIssnLinking()); } @Test @@ -92,9 +96,11 @@ public class MigrateDbEntitiesApplicationTest { final Project p = (Project) list.get(0); assertValidId(p.getId()); assertValidId(p.getCollectedfrom().get(0).getKey()); - assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields)); - assertEquals(p.getTitle().getValue(), getValueAsString("title", fields)); - assertEquals(p.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("acronym", fields), p.getAcronym().getValue()); + assertEquals(getValueAsString("title", fields), p.getTitle().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), p.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsFloat("fundedamount", fields), p.getFundedamount()); + assertEquals(getValueAsFloat("totalcost", fields), p.getTotalcost()); } @Test @@ -110,14 +116,14 @@ public class MigrateDbEntitiesApplicationTest { final Organization o = (Organization) list.get(0); assertValidId(o.getId()); assertValidId(o.getCollectedfrom().get(0).getKey()); - assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields)); - assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields)); - assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(o.getCountry().getClassid(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getClassname(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getSchemeid(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCountry().getSchemename(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("legalshortname", fields), o.getLegalshortname().getValue()); + assertEquals(getValueAsString("legalname", fields), o.getLegalname().getValue()); + assertEquals(getValueAsString("websiteurl", fields), o.getWebsiteurl().getValue()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassid()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassname()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemeid()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemename()); + assertEquals(getValueAsString("collectedfromname", fields), o.getCollectedfrom().get(0).getValue()); } @Test @@ -322,14 +328,22 @@ public class MigrateDbEntitiesApplicationTest { } private String getValueAsString(final String name, final List fields) { + return getValueAs(name, fields); + } + + private Float getValueAsFloat(final String name, final List fields) { + return new Float(getValueAs(name, fields).toString()); + } + + private T getValueAs(final String name, final List fields) { return fields - .stream() - .filter(f -> f.getField().equals(name)) - .map(TypedField::getValue) - .filter(Objects::nonNull) - .map(o -> o.toString()) - .findFirst() - .get(); + .stream() + .filter(f -> f.getField().equals(name)) + .map(TypedField::getValue) + .filter(Objects::nonNull) + .map(o -> (T) o) + .findFirst() + .get(); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json index d6109cac1f..a25215ca36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json @@ -142,12 +142,12 @@ { "field": "totalcost", "type": "double", - "value": null + "value": 157846 }, { "field": "fundedamount", "type": "double", - "value": null + "value": 157846 }, { "field": "collectedfromid", From 27df1cea6d5c658c77156f0324a5b0f4e9189dbb Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 12:16:00 +0200 Subject: [PATCH 5/7] code formatting --- .../raw/MigrateDbEntitiesApplicationTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index e8059f5069..011cc18e6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -337,13 +337,13 @@ public class MigrateDbEntitiesApplicationTest { private T getValueAs(final String name, final List fields) { return fields - .stream() - .filter(f -> f.getField().equals(name)) - .map(TypedField::getValue) - .filter(Objects::nonNull) - .map(o -> (T) o) - .findFirst() - .get(); + .stream() + .filter(f -> f.getField().equals(name)) + .map(TypedField::getValue) + .filter(Objects::nonNull) + .map(o -> (T) o) + .findFirst() + .get(); } } From 044d3a021418e8b53cc15a1c8dd3b93bd89b0816 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 24 Sep 2020 13:48:58 +0200 Subject: [PATCH 6/7] fixed query used to load datasources in the Graph --- .../eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index 3033b9f879..7ca672835b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -3,7 +3,7 @@ SELECT d.id || array_agg(distinct di.pid) AS identities, d.officialname AS officialname, d.englishname AS englishname, - d.contactemail AS contactemail, + d.contactemail AS contactemail, CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1']) THEN @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - concat_ws(' @@@ ', issn, eissn, lissn) AS journal + concat_ws(' @@@ ', d.issn, d.eissn, d.lissn) AS journal FROM dsm_datasources d From c96598aaa42df419d735e9fe29aa8ba60393c6c4 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 25 Sep 2020 09:02:58 +0200 Subject: [PATCH 7/7] opendoar partition --- .../model/ShortEventMessageWithGroupId.java | 25 +++++++++++++++++++ .../dhp/broker/oa/GenerateStatsJob.java | 2 ++ .../broker/oa/PartitionEventsByDsIdJob.java | 23 ++++++++--------- .../oa/generate_all/oozie_app/workflow.xml | 17 ++++++++++++- .../broker/oa/partial/oozie_app/workflow.xml | 13 ++++------ 5 files changed, 58 insertions(+), 22 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java new file mode 100644 index 0000000000..1a1e9764bb --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +import eu.dnetlib.broker.api.ShortEventMessage; + +public class ShortEventMessageWithGroupId extends ShortEventMessage implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 4704889388757626630L; + + private String group; + + public String getGroup() { + return group; + } + + public void setGroup(final String group) { + this.group = group; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a9009f324..d5c53ea360 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -14,6 +14,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ public class GenerateStatsJob { .agg(aggr) .map(t -> t._2, Encoders.bean(DatasourceStats.class)) .write() + .mode(SaveMode.Overwrite) .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); log.info("*** updateStats"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 0748624f77..da2c5bb78a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -20,12 +20,11 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; -import eu.dnetlib.broker.api.ShortEventMessage; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.ShortEventMessageWithGroupId; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import scala.Tuple2; public class PartitionEventsByDsIdJob { @@ -61,13 +60,11 @@ public class PartitionEventsByDsIdJob { .readPath(spark, eventsPath, Event.class) .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) - .map( - e -> new Tuple2<>( - StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), - messageFromNotification(e)), - Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .limit(10000) + .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .coalesce(1) .write() - .partitionBy("_1") + .partitionBy("group") .mode(SaveMode.Overwrite) .json(partitionPath); @@ -77,7 +74,6 @@ public class PartitionEventsByDsIdJob { } private static void renameSubDirs(final String path) throws IOException { - final String prefix = "_1="; final FileSystem fs = FileSystem.get(new Configuration()); log.info("** Renaming subdirs of " + path); @@ -85,8 +81,8 @@ public class PartitionEventsByDsIdJob { if (fileStatus.isDirectory()) { final Path oldPath = fileStatus.getPath(); final String oldName = oldPath.getName(); - if (oldName.startsWith(prefix)) { - final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + if (oldName.contains("=")) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, "=")); log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); fs.rename(oldPath, newPath); } @@ -94,18 +90,19 @@ public class PartitionEventsByDsIdJob { } } - private static ShortEventMessage messageFromNotification(final Event e) { + private static ShortEventMessageWithGroupId messageFromNotification(final Event e) { final Gson gson = new Gson(); final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); - final ShortEventMessage res = new ShortEventMessage(); + final ShortEventMessageWithGroupId res = new ShortEventMessageWithGroupId(); res.setOriginalId(payload.getResult().getOriginalId()); res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(e.getTopic()); res.setTrust(payload.getTrust()); res.generateMessageFromObject(payload.getHighlight()); + res.setGroup(StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX)); return res; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 4184b71bd9..14e33b0916 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -509,8 +521,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 407b9f42f7..8bae626f17 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -111,18 +111,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - GenerateStatsJob - eu.dnetlib.dhp.broker.oa.GenerateStatsJob + PartitionEventsByDsIdJob + eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -134,11 +134,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --dbUrl${brokerDbUrl} - --dbUser${brokerDbUser} - --dbPassword${brokerDbPassword} - --brokerApiBaseUrl${brokerApiBaseUrl}