diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java new file mode 100644 index 0000000000..1a1e9764bb --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/model/ShortEventMessageWithGroupId.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.broker.model; + +import java.io.Serializable; + +import eu.dnetlib.broker.api.ShortEventMessage; + +public class ShortEventMessageWithGroupId extends ShortEventMessage implements Serializable { + + /** + * + */ + private static final long serialVersionUID = 4704889388757626630L; + + private String group; + + public String getGroup() { + return group; + } + + public void setGroup(final String group) { + this.group = group; + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java index 8a9009f324..d5c53ea360 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -14,6 +14,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.TypedColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ public class GenerateStatsJob { .agg(aggr) .map(t -> t._2, Encoders.bean(DatasourceStats.class)) .write() + .mode(SaveMode.Overwrite) .jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties); log.info("*** updateStats"); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java index 0748624f77..da2c5bb78a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/PartitionEventsByDsIdJob.java @@ -20,12 +20,11 @@ import org.slf4j.LoggerFactory; import com.google.gson.Gson; -import eu.dnetlib.broker.api.ShortEventMessage; import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.ShortEventMessageWithGroupId; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import scala.Tuple2; public class PartitionEventsByDsIdJob { @@ -61,13 +60,11 @@ public class PartitionEventsByDsIdJob { .readPath(spark, eventsPath, Event.class) .filter(e -> StringUtils.isNotBlank(e.getMap().getTargetDatasourceId())) .filter(e -> e.getMap().getTargetDatasourceId().contains(OPENDOAR_NSPREFIX)) - .map( - e -> new Tuple2<>( - StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX), - messageFromNotification(e)), - Encoders.tuple(Encoders.STRING(), Encoders.bean(ShortEventMessage.class))) + .limit(10000) + .map(e -> messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)) + .coalesce(1) .write() - .partitionBy("_1") + .partitionBy("group") .mode(SaveMode.Overwrite) .json(partitionPath); @@ -77,7 +74,6 @@ public class PartitionEventsByDsIdJob { } private static void renameSubDirs(final String path) throws IOException { - final String prefix = "_1="; final FileSystem fs = FileSystem.get(new Configuration()); log.info("** Renaming subdirs of " + path); @@ -85,8 +81,8 @@ public class PartitionEventsByDsIdJob { if (fileStatus.isDirectory()) { final Path oldPath = fileStatus.getPath(); final String oldName = oldPath.getName(); - if (oldName.startsWith(prefix)) { - final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, prefix)); + if (oldName.contains("=")) { + final Path newPath = new Path(path + "/" + StringUtils.substringAfter(oldName, "=")); log.info(" * " + oldPath.getName() + " -> " + newPath.getName()); fs.rename(oldPath, newPath); } @@ -94,18 +90,19 @@ public class PartitionEventsByDsIdJob { } } - private static ShortEventMessage messageFromNotification(final Event e) { + private static ShortEventMessageWithGroupId messageFromNotification(final Event e) { final Gson gson = new Gson(); final OaBrokerEventPayload payload = gson.fromJson(e.getPayload(), OaBrokerEventPayload.class); - final ShortEventMessage res = new ShortEventMessage(); + final ShortEventMessageWithGroupId res = new ShortEventMessageWithGroupId(); res.setOriginalId(payload.getResult().getOriginalId()); res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null)); res.setTopic(e.getTopic()); res.setTrust(payload.getTrust()); res.generateMessageFromObject(payload.getHighlight()); + res.setGroup(StringUtils.substringAfter(e.getMap().getTargetDatasourceId(), OPENDOAR_NSPREFIX)); return res; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index 4184b71bd9..14e33b0916 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -44,6 +44,18 @@ brokerApiBaseUrl the url of the broker service api + + brokerDbUrl + the url of the broker database + + + brokerDbUser + the user of the broker database + + + brokerDbPassword + the password of the broker database + sparkDriverMemory memory for driver process @@ -509,8 +521,11 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - --graphPath${graphInputPath} --workingPath${workingPath} + --dbUrl${brokerDbUrl} + --dbUser${brokerDbUser} + --dbPassword${brokerDbPassword} + --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 407b9f42f7..8bae626f17 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -111,18 +111,18 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - GenerateStatsJob - eu.dnetlib.dhp.broker.oa.GenerateStatsJob + PartitionEventsByDsIdJob + eu.dnetlib.dhp.broker.oa.PartitionEventsByDsIdJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} @@ -134,11 +134,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 + --graphPath${graphInputPath} --workingPath${workingPath} - --dbUrl${brokerDbUrl} - --dbUser${brokerDbUser} - --dbPassword${brokerDbPassword} - --brokerApiBaseUrl${brokerApiBaseUrl} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 87c935d835..1e7b56ee9c 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -577,7 +577,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i final String lissn = StringUtils.isNotBlank(arr[2]) ? arr[2].trim() : null; if (issn != null || eissn != null || lissn != null) { - return journal(name, issn, eissn, eissn, null, null, null, null, null, null, null, info); + return journal(name, issn, eissn, lissn, null, null, null, null, null, null, null, info); } } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql index 43b0f8f4b1..7ca672835b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/sql/queryDatasources.sql @@ -3,7 +3,7 @@ SELECT d.id || array_agg(distinct di.pid) AS identities, d.officialname AS officialname, d.englishname AS englishname, - d.contactemail AS contactemail, + d.contactemail AS contactemail, CASE WHEN (array_agg(DISTINCT COALESCE (a.compatibility_override, a.compatibility):: TEXT) @> ARRAY ['openaire-cris_1.1']) THEN @@ -85,7 +85,7 @@ SELECT dc.officialname AS collectedfromname, d.typology||'@@@dnet:datasource_typologies' AS datasourcetype, 'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction, - d.issn || ' @@@ ' || d.eissn || ' @@@ ' || d.lissn AS journal + concat_ws(' @@@ ', d.issn, d.eissn, d.lissn) AS journal FROM dsm_datasources d diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 22fcb36c98..011cc18e6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -73,12 +73,16 @@ public class MigrateDbEntitiesApplicationTest { final Datasource ds = (Datasource) list.get(0); assertValidId(ds.getId()); assertValidId(ds.getCollectedfrom().get(0).getKey()); - assertEquals(ds.getOfficialname().getValue(), getValueAsString("officialname", fields)); - assertEquals(ds.getEnglishname().getValue(), getValueAsString("englishname", fields)); - assertEquals(ds.getContactemail().getValue(), getValueAsString("contactemail", fields)); - assertEquals(ds.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(ds.getNamespaceprefix().getValue(), getValueAsString("namespaceprefix", fields)); - assertEquals(ds.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("officialname", fields), ds.getOfficialname().getValue()); + assertEquals(getValueAsString("englishname", fields), ds.getEnglishname().getValue()); + assertEquals(getValueAsString("contactemail", fields), ds.getContactemail().getValue()); + assertEquals(getValueAsString("websiteurl", fields), ds.getWebsiteurl().getValue()); + assertEquals(getValueAsString("namespaceprefix", fields), ds.getNamespaceprefix().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), ds.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsString("officialname", fields), ds.getJournal().getName()); + assertEquals("2579-5449", ds.getJournal().getIssnPrinted()); + assertEquals("2597-6540", ds.getJournal().getIssnOnline()); + assertEquals(null, ds.getJournal().getIssnLinking()); } @Test @@ -92,9 +96,11 @@ public class MigrateDbEntitiesApplicationTest { final Project p = (Project) list.get(0); assertValidId(p.getId()); assertValidId(p.getCollectedfrom().get(0).getKey()); - assertEquals(p.getAcronym().getValue(), getValueAsString("acronym", fields)); - assertEquals(p.getTitle().getValue(), getValueAsString("title", fields)); - assertEquals(p.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("acronym", fields), p.getAcronym().getValue()); + assertEquals(getValueAsString("title", fields), p.getTitle().getValue()); + assertEquals(getValueAsString("collectedfromname", fields), p.getCollectedfrom().get(0).getValue()); + assertEquals(getValueAsFloat("fundedamount", fields), p.getFundedamount()); + assertEquals(getValueAsFloat("totalcost", fields), p.getTotalcost()); } @Test @@ -110,14 +116,14 @@ public class MigrateDbEntitiesApplicationTest { final Organization o = (Organization) list.get(0); assertValidId(o.getId()); assertValidId(o.getCollectedfrom().get(0).getKey()); - assertEquals(o.getLegalshortname().getValue(), getValueAsString("legalshortname", fields)); - assertEquals(o.getLegalname().getValue(), getValueAsString("legalname", fields)); - assertEquals(o.getWebsiteurl().getValue(), getValueAsString("websiteurl", fields)); - assertEquals(o.getCountry().getClassid(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getClassname(), getValueAsString("country", fields).split("@@@")[0]); - assertEquals(o.getCountry().getSchemeid(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCountry().getSchemename(), getValueAsString("country", fields).split("@@@")[1]); - assertEquals(o.getCollectedfrom().get(0).getValue(), getValueAsString("collectedfromname", fields)); + assertEquals(getValueAsString("legalshortname", fields), o.getLegalshortname().getValue()); + assertEquals(getValueAsString("legalname", fields), o.getLegalname().getValue()); + assertEquals(getValueAsString("websiteurl", fields), o.getWebsiteurl().getValue()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassid()); + assertEquals(getValueAsString("country", fields).split("@@@")[0], o.getCountry().getClassname()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemeid()); + assertEquals(getValueAsString("country", fields).split("@@@")[1], o.getCountry().getSchemename()); + assertEquals(getValueAsString("collectedfromname", fields), o.getCollectedfrom().get(0).getValue()); } @Test @@ -322,12 +328,20 @@ public class MigrateDbEntitiesApplicationTest { } private String getValueAsString(final String name, final List fields) { + return getValueAs(name, fields); + } + + private Float getValueAsFloat(final String name, final List fields) { + return new Float(getValueAs(name, fields).toString()); + } + + private T getValueAs(final String name, final List fields) { return fields .stream() .filter(f -> f.getField().equals(name)) .map(TypedField::getValue) .filter(Objects::nonNull) - .map(o -> o.toString()) + .map(o -> (T) o) .findFirst() .get(); } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/oaf_openapc.xml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json index d6109cac1f..a25215ca36 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/raw/projects_resultset_entry.json @@ -142,12 +142,12 @@ { "field": "totalcost", "type": "double", - "value": null + "value": 157846 }, { "field": "fundedamount", "type": "double", - "value": null + "value": 157846 }, { "field": "collectedfromid",