From 5d3739b5cf0ffd2a967ffc6f8e86082cafd39a3f Mon Sep 17 00:00:00 2001 From: Michele Artini Date: Wed, 19 Feb 2020 15:11:17 +0100 Subject: [PATCH] migration of claims --- .../migration/AbstractMigrationExecutor.java | 14 ++ .../dhp/migration/AbstractMongoExecutor.java | 2 + .../migration/ExtractEntitiesFromHDFSJob.java | 84 ++++--- .../MigrateDbEntitiesApplication.java | 113 +++++++-- ...extract_entities_from_hdfs_parameters.json | 10 +- .../migrate_db_entities_parameters.json | 6 + .../dhp/migration/oozie_app/workflow.xml | 215 ++++++------------ .../dnetlib/dhp/migration/sql/queryClaims.sql | 1 + 8 files changed, 238 insertions(+), 207 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java index e91a53045..b0db3c76f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMigrationExecutor.java @@ -227,7 +227,21 @@ public class AbstractMigrationExecutor implements Closeable { final String nsPrefix = StringUtils.substringBefore(originalId, "::"); final String rest = StringUtils.substringAfter(originalId, "::"); return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest)); + } + public static String createOpenaireId(final String type, final String originalId) { + switch (type) { + case "datasource": + return createOpenaireId(10, originalId); + case "organization": + return createOpenaireId(20, originalId); + case "person": + return createOpenaireId(30, originalId); + case "project": + return createOpenaireId(40, originalId); + default: + return createOpenaireId(50, originalId); + } } public static String asString(final Object o) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java index 00d1aa60d..0595726d4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/AbstractMongoExecutor.java @@ -398,6 +398,8 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor { protected DataInfo prepareDataInfo(final Document doc) { final Node n = doc.selectSingleNode("//oaf:datainfo"); + if (n == null) { return null; } + final String paClassId = n.valueOf("./oaf:provenanceaction/@classid"); final String paClassName = n.valueOf("./oaf:provenanceaction/@classname"); final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid"); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java index f2d9caebf..22b61798e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/ExtractEntitiesFromHDFSJob.java @@ -1,56 +1,68 @@ package eu.dnetlib.dhp.migration; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.util.Arrays; +import java.util.List; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import scala.Tuple2; -import java.util.Arrays; -import java.util.List; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Dataset; +import eu.dnetlib.dhp.schema.oaf.Datasource; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Software; +import scala.Tuple2; public class ExtractEntitiesFromHDFSJob { + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils.toString(MigrateMongoMdstoresApplication.class + .getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); + parser.parseArgument(args); - private static List folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities"); + final SparkSession spark = SparkSession + .builder() + .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) + .master(parser.get("master")) + .getOrCreate(); - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json"))); - parser.parseArgument(args); + final List sourcePaths = Arrays.asList(parser.get("sourcePaths").split(",")); + final String targetPath = parser.get("graphRawPath"); - final SparkSession spark = SparkSession - .builder() - .appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()) - .master(parser.get("master")) - .getOrCreate(); + try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) { + processEntity(sc, Publication.class, sourcePaths, targetPath); + processEntity(sc, Dataset.class, sourcePaths, targetPath); + processEntity(sc, Software.class, sourcePaths, targetPath); + processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath); + processEntity(sc, Datasource.class, sourcePaths, targetPath); + processEntity(sc, Organization.class, sourcePaths, targetPath); + processEntity(sc, Project.class, sourcePaths, targetPath); + processEntity(sc, Relation.class, sourcePaths, targetPath); + } + } - final String sourcePath = parser.get("sourcePath"); - final String targetPath = parser.get("graphRawPath"); - final String entity = parser.get("entity"); + private static void processEntity(final JavaSparkContext sc, final Class clazz, final List sourcePaths, final String targetPath) { + final String type = clazz.getSimpleName().toLowerCase(); + final JavaRDD inputRdd = sc.emptyRDD(); + sourcePaths.forEach(sourcePath -> inputRdd.union(sc.sequenceFile(sourcePath, Text.class, Text.class) + .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) + .filter(k -> isEntityType(k._1(), type)) + .map(Tuple2::_2))); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + inputRdd.saveAsTextFile(targetPath + "/" + type); + } - - JavaRDD inputRdd = sc.emptyRDD(); - - - folderNames.forEach(p -> inputRdd.union( - sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class) - .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) - .filter(k -> isEntityType(k._1(), entity)) - .map(Tuple2::_2)) - ); - - inputRdd.saveAsTextFile(targetPath+"/"+entity); - } - - - private static boolean isEntityType(final String item, final String entity) { - return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); - } + private static boolean isEntityType(final String item, final String entity) { + return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java index d22e8e5b3..1ccfd09ef 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/MigrateDbEntitiesApplication.java @@ -17,15 +17,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Dataset; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Field; import eu.dnetlib.dhp.schema.oaf.Journal; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.schema.oaf.Software; import eu.dnetlib.dhp.schema.oaf.StructuredProperty; public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable { @@ -53,22 +59,28 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl final String hdfsNameNode = parser.get("namenode"); final String hdfsUser = parser.get("hdfsUser"); + final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims"); + try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) { - log.info("Processing datasources..."); - smdbe.execute("queryDatasources.sql", smdbe::processDatasource); + if (processClaims) { + log.info("Processing claims..."); + smdbe.execute("queryClaims.sql", smdbe::processClaims); + } else { + log.info("Processing datasources..."); + smdbe.execute("queryDatasources.sql", smdbe::processDatasource); - log.info("Processing projects..."); - smdbe.execute("queryProjects.sql", smdbe::processProject); + log.info("Processing projects..."); + smdbe.execute("queryProjects.sql", smdbe::processProject); - log.info("Processing orgs..."); - smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); + log.info("Processing orgs..."); + smdbe.execute("queryOrganizations.sql", smdbe::processOrganization); - log.info("Processing relations ds <-> orgs ..."); - smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); - - log.info("Processing projects <-> orgs ..."); - smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + log.info("Processing relations ds <-> orgs ..."); + smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization); + log.info("Processing projects <-> orgs ..."); + smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization); + } log.info("All done."); } } @@ -377,7 +389,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl r2.setTarget(dsId); r2.setCollectedFrom(collectedFrom); r2.setDataInfo(info); - r1.setLastupdatetimestamp(lastUpdateTimestamp); + r2.setLastupdatetimestamp(lastUpdateTimestamp); emitOaf(r2); // rs.getString("datasource"); @@ -426,7 +438,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl r2.setTarget(projectId); r2.setCollectedFrom(collectedFrom); r2.setDataInfo(info); - r1.setLastupdatetimestamp(lastUpdateTimestamp); + r2.setLastupdatetimestamp(lastUpdateTimestamp); emitOaf(r2); // rs.getString("project"); @@ -450,6 +462,81 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl } } + public void processClaims(final ResultSet rs) { + + final DataInfo info = + dataInfo(false, null, false, false, qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9"); + + try { + + if (rs.getString("source_type").equals("context")) { + final Result r; + + if (rs.getString("target_type").equals("dataset")) { + r = new Dataset(); + } else if (rs.getString("target_type").equals("software")) { + r = new Software(); + } else if (rs.getString("target_type").equals("other")) { + r = new OtherResearchProduct(); + } else { + r = new Publication(); + } + r.setId(createOpenaireId(50, rs.getString("target_id"))); + r.setLastupdatetimestamp(lastUpdateTimestamp); + r.setContext(prepareContext(rs.getString("source_id"), info)); + r.setDataInfo(info); + emitOaf(r); + } else { + final String sourceId = createOpenaireId(rs.getString("source_type"), rs.getString("source_id")); + final String targetId = createOpenaireId(rs.getString("target_type"), rs.getString("target_id")); + + final Relation r1 = new Relation(); + final Relation r2 = new Relation(); + + if (rs.getString("source_type").equals("project")) { + r1.setRelType("resultProject"); + r1.setSubRelType("outcome"); + r1.setRelClass("produces"); + + r2.setRelType("resultProject"); + r2.setSubRelType("outcome"); + r2.setRelClass("isProducedBy"); + } else { + r1.setRelType("resultResult"); + r1.setSubRelType("relationship"); + r1.setRelClass("isRelatedTo"); + + r2.setRelType("resultResult"); + r2.setSubRelType("relationship"); + r2.setRelClass("isRelatedTo"); + } + + r1.setSource(sourceId); + r1.setTarget(targetId); + r1.setDataInfo(info); + r1.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r1); + + r2.setSource(targetId); + r2.setTarget(sourceId); + r2.setDataInfo(info); + r2.setLastupdatetimestamp(lastUpdateTimestamp); + emitOaf(r2); + + } + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + private List prepareContext(final String id, final DataInfo dataInfo) { + final Context context = new Context(); + context.setId(id); + context.setDataInfo(Arrays.asList(dataInfo)); + return Arrays.asList(context); + } + private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException { final Boolean deletedbyinference = rs.getBoolean("deletedbyinference"); final String inferenceprovenance = rs.getString("inferenceprovenance"); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json index f179ee0f8..0039493e7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json @@ -1,8 +1,8 @@ [ { "paramName": "s", - "paramLongName": "sourcePath", - "paramDescription": "the HDFS source path which contains the sequential file", + "paramLongName": "sourcePaths", + "paramDescription": "the HDFS source paths which contains the sequential file (comma separated)", "paramRequired": true }, { @@ -16,11 +16,5 @@ "paramLongName": "graphRawPath", "paramDescription": "the path of the graph Raw in hdfs", "paramRequired": true - }, - { - "paramName": "e", - "paramLongName": "entity", - "paramDescription": "The entity to extract", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json index 5e9f378f5..4506e2ae1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/migrate_db_entities_parameters.json @@ -34,5 +34,11 @@ "paramLongName": "postgresPassword", "paramDescription": "postgres password", "paramRequired": false + }, + { + "paramName": "a", + "paramLongName": "action", + "paramDescription": "process claims", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml index ff23fff4a..b11cddfcf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/oozie_app/workflow.xml @@ -43,8 +43,7 @@ - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -71,6 +70,23 @@ -dbuser${postgresUser} -dbpasswd${postgresPassword} + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication + -p${workingPath}/db_claims + -n${nameNode} + -u${hdfsUser} + -dburl${postgresURL} + -dbuser${postgresUser} + -dbpasswd${postgresPassword} + -aclaims + @@ -113,170 +129,69 @@ -pguser${postgresUser} -pgpasswd${postgresPassword} - + + + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/odf_claims + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fODF + -lstore + -iclaim + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + + ${jobTracker} + ${nameNode} + eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication + -p${workingPath}/oaf_claims + -n${nameNode} + -u${hdfsUser} + -mongourl${mongourl} + -db${mongoDb} + -fOAF + -lstore + -iclaim + -pgurl${postgresURL} + -pguser${postgresUser} + -pgpasswd${postgresPassword} + + + + - + ${jobTracker} ${nameNode} yarn-cluster cluster - ExtractEntities: publication + ExtractEntities eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob dhp-aggregation-${projectVersion}.jar --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/publication - -epublication - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: dataset - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/dataset - -edataset - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: software - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/software - -esoftware - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: otherresearchproduct - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/otherresearchproduct - -eotherresearchproduct - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: datasource - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/datasource - -edatasource - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: organization - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/organization - -eorganization - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: project - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/project - -eproject - - - - - - - - - ${jobTracker} - ${nameNode} - yarn-cluster - cluster - ExtractEntities: relation - eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob - dhp-aggregation-${projectVersion}.jar - --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse" - -mt yarn-cluster - --sourcePath${workingPath} - -g${graphRawPath}/relation - -erelation + -s${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities + -g${graphRawPath} - - + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql new file mode 100644 index 000000000..0390c11aa --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/sql/queryClaims.sql @@ -0,0 +1 @@ +SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE; \ No newline at end of file