diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index c72940deb..180f9f846 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -37,7 +37,7 @@ public class Deduper implements Serializable { public static JavaPairRDD createSortedBlocks( JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); - final int maxQueueSize = config.getWf().getGroupMaxSize(); + final int maxQueueSize = config.getWf().getQueueMaxSize(); return mapDocs // the reduce is just to be sure that we haven't document with same id diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml similarity index 100% rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/config-default.xml rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml new file mode 100644 index 000000000..66eaeeb26 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/oozie_app/workflow.xml @@ -0,0 +1,157 @@ + + + + reuseContent + false + should import content from the aggregator or reuse a previous version + + + contentPath + path location to store (or reuse) content from the aggregator + + + postgresURL + the postgres URL to access to the database + + + postgresUser + the user postgres + + + postgresPassword + the password postgres + + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + + + mongoURL + mongoDB url, example: mongodb://[username:password@]host[:port] + + + mongoDb + mongo database + + + isLookupUrl + the address of the lookUp service + + + + sparkDriverMemory + memory for driver process + + + sparkExecutorMemory + memory for individual executor + + + sparkExecutorCores + number of cores used by single executor + + + oozieActionShareLibForSpark2 + oozie action sharelib for spark 2.* + + + spark2ExtraListeners + com.cloudera.spark.lineage.NavigatorAppListener + spark 2.* extra listeners classname + + + spark2SqlQueryExecutionListeners + com.cloudera.spark.lineage.NavigatorQueryListener + spark 2.* sql query execution listeners classname + + + spark2YarnHistoryServerAddress + spark 2.* yarn history server address + + + spark2EventLogDir + spark 2.* event log dir location + + + + + ${jobTracker} + ${nameNode} + + + mapreduce.job.queuename + ${queueName} + + + oozie.launcher.mapred.job.queue.name + ${oozieLauncherQueueName} + + + oozie.action.sharelib.for.spark + ${oozieActionShareLibForSpark2} + + + + + + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateDbEntitiesApplication + --hdfsPath${contentPath}/db_claims + --postgresUrl${postgresURL} + --postgresUser${postgresUser} + --postgresPassword${postgresPassword} + --isLookupUrl${isLookupUrl} + --actionclaims + --dbschema${dbSchema} + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/odf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fODF + -lstore + -iclaim + + + + + + + + + + + eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication + -p${contentPath}/oaf_claims + -mongourl${mongoURL} + -mongodb${mongoDb} + -fOAF + -lstore + -iclaim + + + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml deleted file mode 100644 index 1ac456976..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_claims/workflow.xml +++ /dev/null @@ -1,169 +0,0 @@ - - - - migrationClaimsPathStep1 - the base path to store hdfs file - - - migrationClaimsPathStep2 - the temporary path to store entities before dispatching - - - migrationClaimsPathStep3 - the graph Raw base path - - - postgresURL - the postgres URL to access to the database - - - postgresUser - the user postgres - - - postgresPassword - the password postgres - - - mongoURL - mongoDB url, example: mongodb://[username:password@]host[:port] - - - mongoDb - mongo database - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication - -p${migrationClaimsPathStep1}/db_claims - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - -aclaims - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/odf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fODF - -lstore - -iclaim - - - - - - - - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication - -p${migrationClaimsPathStep1}/oaf_claims - -mongourl${mongoURL} - -mongodb${mongoDb} - -fOAF - -lstore - -iclaim - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimEntities - eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep1}/db_claims,${migrationClaimsPathStep1}/oaf_claims,${migrationClaimsPathStep1}/odf_claims - -t${migrationClaimsPathStep2}/claim_entities - -pgurl${postgresURL} - -pguser${postgresUser} - -pgpasswd${postgresPassword} - - - - - - - - - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - GenerateClaimGraph - eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - -s${migrationClaimsPathStep2}/claim_entities - -g${migrationClaimsPathStep3} - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java index c16bbc6fb..528532edd 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java @@ -1,9 +1,10 @@ package eu.dnetlib.dhp.oa.provision; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; -import eu.dnetlib.dhp.schema.oaf.Relation; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; @@ -19,9 +20,10 @@ import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; public class PrepareRelationsJobTest { @@ -74,14 +76,19 @@ public class PrepareRelationsJobTest { "-maxRelations", String.valueOf(maxRelations) }); - Dataset out = spark.read() - .parquet(testPath.toString()) - .as(Encoders.bean(Relation.class)) - .cache(); + Dataset out = spark + .read() + .parquet(testPath.toString()) + .as(Encoders.bean(Relation.class)) + .cache(); Assertions.assertEquals(10, out.count()); - Dataset freq = out.toDF().cube(SUBRELTYPE).count().filter((FilterFunction) value -> !value.isNullAt(0)); + Dataset freq = out + .toDF() + .cube(SUBRELTYPE) + .count() + .filter((FilterFunction) value -> !value.isNullAt(0)); long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count"); long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");