diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java index 4b563d381a..ab2735f2a8 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMissingProject.java @@ -15,7 +15,7 @@ public class EnrichMissingProject extends UpdateMatcher { super(20, prj -> Topic.ENRICH_MISSING_PROJECT, (p, prj) -> p.getProjects().add(prj), - prj -> prj.getFunder() + "::" + prj.getFundingProgram() + prj.getCode()); + prj -> prj.getOpenaireId()); } @Override diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java index 6a10f19be5..c38fd81a67 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/relatedProjects/EnrichMoreProject.java @@ -18,31 +18,25 @@ public class EnrichMoreProject extends UpdateMatcher { super(20, prj -> Topic.ENRICH_MORE_PROJECT, (p, prj) -> p.getProjects().add(prj), - prj -> projectAsString(prj)); - } - - private static String projectAsString(final OaBrokerProject prj) { - return prj.getFunder() + "::" + prj.getFundingProgram() + "::" + prj.getCode(); + prj -> prj.getOpenaireId()); } @Override protected List findDifferences(final OaBrokerMainEntity source, final OaBrokerMainEntity target) { - if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) { - return new ArrayList<>(); - } + if (target.getProjects().size() >= BrokerConstants.MAX_LIST_SIZE) { return new ArrayList<>(); } final Set existingProjects = target .getProjects() .stream() - .map(EnrichMoreProject::projectAsString) + .map(p -> p.getOpenaireId()) .collect(Collectors.toSet()); return source .getProjects() .stream() - .filter(p -> !existingProjects.contains(projectAsString(p))) + .filter(p -> !existingProjects.contains(p.getOpenaireId())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java index 1ce84283af..3a2cdc5f34 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/ConversionUtils.java @@ -44,9 +44,7 @@ public class ConversionUtils { private static final Logger log = LoggerFactory.getLogger(ConversionUtils.class); public static List oafInstanceToBrokerInstances(final Instance i) { - if (i == null) { - return new ArrayList<>(); - } + if (i == null) { return new ArrayList<>(); } return mappedList(i.getUrl(), url -> { final OaBrokerInstance res = new OaBrokerInstance(); @@ -67,9 +65,7 @@ public class ConversionUtils { } public static final OaBrokerRelatedDataset oafDatasetToBrokerDataset(final Dataset d) { - if (d == null) { - return null; - } + if (d == null) { return null; } final OaBrokerRelatedDataset res = new OaBrokerRelatedDataset(); res.setOpenaireId(d.getId()); @@ -82,9 +78,7 @@ public class ConversionUtils { } public static OaBrokerRelatedPublication oafPublicationToBrokerPublication(final Publication p) { - if (p == null) { - return null; - } + if (p == null) { return null; } final OaBrokerRelatedPublication res = new OaBrokerRelatedPublication(); res.setOpenaireId(p.getId()); @@ -98,9 +92,7 @@ public class ConversionUtils { } public static final OaBrokerMainEntity oafResultToBrokerResult(final Result result) { - if (result == null) { - return null; - } + if (result == null) { return null; } final OaBrokerMainEntity res = new OaBrokerMainEntity(); @@ -117,8 +109,7 @@ public class ConversionUtils { res.setEmbargoenddate(fieldValue(result.getEmbargoenddate())); res.setContributor(fieldList(result.getContributor())); res - .setJournal( - result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); + .setJournal(result instanceof Publication ? oafJournalToBrokerJournal(((Publication) result).getJournal()) : null); res.setCollectedFromId(mappedFirst(result.getCollectedfrom(), KeyValue::getKey)); res.setCollectedFromName(mappedFirst(result.getCollectedfrom(), KeyValue::getValue)); res.setPids(mappedList(result.getPid(), ConversionUtils::oafPidToBrokerPid)); @@ -130,9 +121,7 @@ public class ConversionUtils { } private static OaBrokerAuthor oafAuthorToBrokerAuthor(final Author author) { - if (author == null) { - return null; - } + if (author == null) { return null; } final String pids = author.getPid() != null ? author .getPid() @@ -142,6 +131,7 @@ public class ConversionUtils { .filter(pid -> pid.getQualifier().getClassid() != null) .filter(pid -> pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) .map(pid -> pid.getValue()) + .map(pid -> cleanOrcid(pid)) .filter(StringUtils::isNotBlank) .findFirst() .orElse(null) : null; @@ -149,10 +139,13 @@ public class ConversionUtils { return new OaBrokerAuthor(author.getFullname(), pids); } + private static String cleanOrcid(final String s) { + final String match = "//orcid.org/"; + return s.contains(match) ? StringUtils.substringAfter(s, match) : s; + } + private static OaBrokerJournal oafJournalToBrokerJournal(final Journal journal) { - if (journal == null) { - return null; - } + if (journal == null) { return null; } final OaBrokerJournal res = new OaBrokerJournal(); res.setName(journal.getName()); @@ -164,9 +157,7 @@ public class ConversionUtils { } private static OaBrokerExternalReference oafExtRefToBrokerExtRef(final ExternalReference ref) { - if (ref == null) { - return null; - } + if (ref == null) { return null; } final OaBrokerExternalReference res = new OaBrokerExternalReference(); res.setRefidentifier(ref.getRefidentifier()); @@ -177,9 +168,7 @@ public class ConversionUtils { } public static final OaBrokerProject oafProjectToBrokerProject(final Project p) { - if (p == null) { - return null; - } + if (p == null) { return null; } final OaBrokerProject res = new OaBrokerProject(); res.setOpenaireId(p.getId()); @@ -203,9 +192,7 @@ public class ConversionUtils { } public static final OaBrokerRelatedSoftware oafSoftwareToBrokerSoftware(final Software sw) { - if (sw == null) { - return null; - } + if (sw == null) { return null; } final OaBrokerRelatedSoftware res = new OaBrokerRelatedSoftware(); res.setOpenaireId(sw.getId()); @@ -268,9 +255,7 @@ public class ConversionUtils { } private static List structPropTypedList(final List list) { - if (list == null) { - return new ArrayList<>(); - } + if (list == null) { return new ArrayList<>(); } return list .stream() @@ -280,9 +265,7 @@ public class ConversionUtils { } private static List mappedList(final List list, final Function func) { - if (list == null) { - return new ArrayList<>(); - } + if (list == null) { return new ArrayList<>(); } return list .stream() @@ -293,9 +276,7 @@ public class ConversionUtils { } private static List flatMappedList(final List list, final Function> func) { - if (list == null) { - return new ArrayList<>(); - } + if (list == null) { return new ArrayList<>(); } return list .stream() @@ -307,9 +288,7 @@ public class ConversionUtils { } private static T mappedFirst(final List list, final Function func) { - if (list == null) { - return null; - } + if (list == null) { return null; } return list .stream() diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index c72940deb8..180f9f8460 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -37,7 +37,7 @@ public class Deduper implements Serializable { public static JavaPairRDD createSortedBlocks( JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); - final int maxQueueSize = config.getWf().getGroupMaxSize(); + final int maxQueueSize = config.getWf().getQueueMaxSize(); return mapDocs // the reduce is just to be sure that we haven't document with same id diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml new file mode 100644 index 0000000000..66eaeeb263 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml @@ -0,0 +1,157 @@ + + + + reuseContent + false + should import content from the aggregator or reuse a previous version + + + contentPath + path location to store (or reuse) content from the aggregator + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + isLookupUrl + the address of the lookUp service + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_claims + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + --actionclaims + --dbschema${dbSchema} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/odf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -iclaim + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/oaf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -iclaim + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml deleted file mode 100644 index 1ac456976d..0000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml +++ /dev/null @@ -1,169 +0,0 @@ - - - - migrationClaimsPathStep1 - the base path to store hdfs file - - - migrationClaimsPathStep2 - the temporary path to store entities before dispatching - - - migrationClaimsPathStep3 - the graph Raw base path - - - postgresURL - the postgres URL to access to the database - - - postgresUser - the user postgres - - - postgresPassword - the password postgres - - - mongoURL - mongoDB url, example: mongodb://[username:password@]host[:port] - - - mongoDb - mongo database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication - -p${migrationClaimsPathStep1}/db_claims - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - -aclaims - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/odf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fODF - -lstore - -iclaim - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/oaf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fOAF - -lstore - -iclaim - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimEntities - eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims - -t${migrationClaimsPathStep2}/claim_entities - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimGraph - eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep2}/claim_entities - -g${migrationClaimsPathStep3} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java index c16bbc6fba..528532eddd 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java @@ -1,9 +1,10 @@ package eu.dnetlib.dhp.oa.provision; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; -import eu.dnetlib.dhp.schema.oaf.Relation; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -19,9 +20,10 @@ import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareRelationsJobTest { @@ -74,14 +76,19 @@ public class PrepareRelationsJobTest { "-maxRelations", String.valueOf(maxRelations) }); - Dataset out = spark.read() - .parquet(testPath.toString()) - .as(Encoders.bean(Relation.class)) - .cache(); + Dataset out = spark + .read() + .parquet(testPath.toString()) + .as(Encoders.bean(Relation.class)) + .cache(); Assertions.assertEquals(10, out.count()); - Dataset freq = out.toDF().cube(SUBRELTYPE).count().filter((FilterFunction) value -> !value.isNullAt(0)); + Dataset freq = out + .toDF() + .cube(SUBRELTYPE) + .count() + .filter((FilterFunction) value -> !value.isNullAt(0)); long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count"); long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");