migration of claims
This commit is contained in:
parent
173f1df1e5
commit
5d3739b5cf
|
@ -227,7 +227,21 @@ public class AbstractMigrationExecutor implements Closeable {
|
|||
final String nsPrefix = StringUtils.substringBefore(originalId, "::");
|
||||
final String rest = StringUtils.substringAfter(originalId, "::");
|
||||
return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(rest));
|
||||
}
|
||||
|
||||
public static String createOpenaireId(final String type, final String originalId) {
|
||||
switch (type) {
|
||||
case "datasource":
|
||||
return createOpenaireId(10, originalId);
|
||||
case "organization":
|
||||
return createOpenaireId(20, originalId);
|
||||
case "person":
|
||||
return createOpenaireId(30, originalId);
|
||||
case "project":
|
||||
return createOpenaireId(40, originalId);
|
||||
default:
|
||||
return createOpenaireId(50, originalId);
|
||||
}
|
||||
}
|
||||
|
||||
public static String asString(final Object o) {
|
||||
|
|
|
@ -398,6 +398,8 @@ public abstract class AbstractMongoExecutor extends AbstractMigrationExecutor {
|
|||
protected DataInfo prepareDataInfo(final Document doc) {
|
||||
final Node n = doc.selectSingleNode("//oaf:datainfo");
|
||||
|
||||
if (n == null) { return null; }
|
||||
|
||||
final String paClassId = n.valueOf("./oaf:provenanceaction/@classid");
|
||||
final String paClassName = n.valueOf("./oaf:provenanceaction/@classname");
|
||||
final String paSchemeId = n.valueOf("./oaf:provenanceaction/@schemeid");
|
||||
|
|
|
@ -1,56 +1,68 @@
|
|||
package eu.dnetlib.dhp.migration;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class ExtractEntitiesFromHDFSJob {
|
||||
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(MigrateMongoMdstoresApplication.class
|
||||
.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
|
||||
private static List<String> folderNames = Arrays.asList("db_entities", "oaf_entities", "odf_entities");
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||
IOUtils.toString(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
|
||||
parser.parseArgument(args);
|
||||
final List<String> sourcePaths = Arrays.asList(parser.get("sourcePaths").split(","));
|
||||
final String targetPath = parser.get("graphRawPath");
|
||||
|
||||
final SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName(ExtractEntitiesFromHDFSJob.class.getSimpleName())
|
||||
.master(parser.get("master"))
|
||||
.getOrCreate();
|
||||
try (final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())) {
|
||||
processEntity(sc, Publication.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Dataset.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Software.class, sourcePaths, targetPath);
|
||||
processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Datasource.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Organization.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Project.class, sourcePaths, targetPath);
|
||||
processEntity(sc, Relation.class, sourcePaths, targetPath);
|
||||
}
|
||||
}
|
||||
|
||||
final String sourcePath = parser.get("sourcePath");
|
||||
final String targetPath = parser.get("graphRawPath");
|
||||
final String entity = parser.get("entity");
|
||||
private static void processEntity(final JavaSparkContext sc, final Class<?> clazz, final List<String> sourcePaths, final String targetPath) {
|
||||
final String type = clazz.getSimpleName().toLowerCase();
|
||||
|
||||
final JavaRDD<String> inputRdd = sc.emptyRDD();
|
||||
sourcePaths.forEach(sourcePath -> inputRdd.union(sc.sequenceFile(sourcePath, Text.class, Text.class)
|
||||
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
|
||||
.filter(k -> isEntityType(k._1(), type))
|
||||
.map(Tuple2::_2)));
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
inputRdd.saveAsTextFile(targetPath + "/" + type);
|
||||
}
|
||||
|
||||
|
||||
JavaRDD<String> inputRdd = sc.emptyRDD();
|
||||
|
||||
|
||||
folderNames.forEach(p -> inputRdd.union(
|
||||
sc.sequenceFile(sourcePath+"/"+p, Text.class, Text.class)
|
||||
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
|
||||
.filter(k -> isEntityType(k._1(), entity))
|
||||
.map(Tuple2::_2))
|
||||
);
|
||||
|
||||
inputRdd.saveAsTextFile(targetPath+"/"+entity);
|
||||
}
|
||||
|
||||
|
||||
private static boolean isEntityType(final String item, final String entity) {
|
||||
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity);
|
||||
}
|
||||
private static boolean isEntityType(final String item, final String entity) {
|
||||
return StringUtils.substringAfter(item, ":").equalsIgnoreCase(entity);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,15 +17,21 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.Journal;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Organization;
|
||||
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.Software;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor implements Closeable {
|
||||
|
@ -53,22 +59,28 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
final String hdfsNameNode = parser.get("namenode");
|
||||
final String hdfsUser = parser.get("hdfsUser");
|
||||
|
||||
final boolean processClaims = parser.get("action") != null && parser.get("action").equalsIgnoreCase("claims");
|
||||
|
||||
try (final MigrateDbEntitiesApplication smdbe = new MigrateDbEntitiesApplication(hdfsPath, hdfsNameNode, hdfsUser, dbUrl, dbUser, dbPassword)) {
|
||||
log.info("Processing datasources...");
|
||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
||||
if (processClaims) {
|
||||
log.info("Processing claims...");
|
||||
smdbe.execute("queryClaims.sql", smdbe::processClaims);
|
||||
} else {
|
||||
log.info("Processing datasources...");
|
||||
smdbe.execute("queryDatasources.sql", smdbe::processDatasource);
|
||||
|
||||
log.info("Processing projects...");
|
||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
||||
log.info("Processing projects...");
|
||||
smdbe.execute("queryProjects.sql", smdbe::processProject);
|
||||
|
||||
log.info("Processing orgs...");
|
||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
||||
log.info("Processing orgs...");
|
||||
smdbe.execute("queryOrganizations.sql", smdbe::processOrganization);
|
||||
|
||||
log.info("Processing relations ds <-> orgs ...");
|
||||
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
|
||||
|
||||
log.info("Processing projects <-> orgs ...");
|
||||
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
|
||||
log.info("Processing relations ds <-> orgs ...");
|
||||
smdbe.execute("queryDatasourceOrganization.sql", smdbe::processDatasourceOrganization);
|
||||
|
||||
log.info("Processing projects <-> orgs ...");
|
||||
smdbe.execute("queryProjectOrganization.sql", smdbe::processProjectOrganization);
|
||||
}
|
||||
log.info("All done.");
|
||||
}
|
||||
}
|
||||
|
@ -377,7 +389,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
r2.setTarget(dsId);
|
||||
r2.setCollectedFrom(collectedFrom);
|
||||
r2.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
emitOaf(r2);
|
||||
|
||||
// rs.getString("datasource");
|
||||
|
@ -426,7 +438,7 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
r2.setTarget(projectId);
|
||||
r2.setCollectedFrom(collectedFrom);
|
||||
r2.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
emitOaf(r2);
|
||||
|
||||
// rs.getString("project");
|
||||
|
@ -450,6 +462,81 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationExecutor impl
|
|||
}
|
||||
}
|
||||
|
||||
public void processClaims(final ResultSet rs) {
|
||||
|
||||
final DataInfo info =
|
||||
dataInfo(false, null, false, false, qualifier("user:claim", "user:claim", "dnet:provenanceActions", "dnet:provenanceActions"), "0.9");
|
||||
|
||||
try {
|
||||
|
||||
if (rs.getString("source_type").equals("context")) {
|
||||
final Result r;
|
||||
|
||||
if (rs.getString("target_type").equals("dataset")) {
|
||||
r = new Dataset();
|
||||
} else if (rs.getString("target_type").equals("software")) {
|
||||
r = new Software();
|
||||
} else if (rs.getString("target_type").equals("other")) {
|
||||
r = new OtherResearchProduct();
|
||||
} else {
|
||||
r = new Publication();
|
||||
}
|
||||
r.setId(createOpenaireId(50, rs.getString("target_id")));
|
||||
r.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
r.setContext(prepareContext(rs.getString("source_id"), info));
|
||||
r.setDataInfo(info);
|
||||
emitOaf(r);
|
||||
} else {
|
||||
final String sourceId = createOpenaireId(rs.getString("source_type"), rs.getString("source_id"));
|
||||
final String targetId = createOpenaireId(rs.getString("target_type"), rs.getString("target_id"));
|
||||
|
||||
final Relation r1 = new Relation();
|
||||
final Relation r2 = new Relation();
|
||||
|
||||
if (rs.getString("source_type").equals("project")) {
|
||||
r1.setRelType("resultProject");
|
||||
r1.setSubRelType("outcome");
|
||||
r1.setRelClass("produces");
|
||||
|
||||
r2.setRelType("resultProject");
|
||||
r2.setSubRelType("outcome");
|
||||
r2.setRelClass("isProducedBy");
|
||||
} else {
|
||||
r1.setRelType("resultResult");
|
||||
r1.setSubRelType("relationship");
|
||||
r1.setRelClass("isRelatedTo");
|
||||
|
||||
r2.setRelType("resultResult");
|
||||
r2.setSubRelType("relationship");
|
||||
r2.setRelClass("isRelatedTo");
|
||||
}
|
||||
|
||||
r1.setSource(sourceId);
|
||||
r1.setTarget(targetId);
|
||||
r1.setDataInfo(info);
|
||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
emitOaf(r1);
|
||||
|
||||
r2.setSource(targetId);
|
||||
r2.setTarget(sourceId);
|
||||
r2.setDataInfo(info);
|
||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||
emitOaf(r2);
|
||||
|
||||
}
|
||||
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<Context> prepareContext(final String id, final DataInfo dataInfo) {
|
||||
final Context context = new Context();
|
||||
context.setId(id);
|
||||
context.setDataInfo(Arrays.asList(dataInfo));
|
||||
return Arrays.asList(context);
|
||||
}
|
||||
|
||||
private DataInfo prepareDataInfo(final ResultSet rs) throws SQLException {
|
||||
final Boolean deletedbyinference = rs.getBoolean("deletedbyinference");
|
||||
final String inferenceprovenance = rs.getString("inferenceprovenance");
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the HDFS source path which contains the sequential file",
|
||||
"paramLongName": "sourcePaths",
|
||||
"paramDescription": "the HDFS source paths which contains the sequential file (comma separated)",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
@ -16,11 +16,5 @@
|
|||
"paramLongName": "graphRawPath",
|
||||
"paramDescription": "the path of the graph Raw in hdfs",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "e",
|
||||
"paramLongName": "entity",
|
||||
"paramDescription": "The entity to extract",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -34,5 +34,11 @@
|
|||
"paramLongName": "postgresPassword",
|
||||
"paramDescription": "postgres password",
|
||||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "a",
|
||||
"paramLongName": "action",
|
||||
"paramDescription": "process claims",
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -43,8 +43,7 @@
|
|||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ImportODFEntitiesFromMongoDB"/>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
|
@ -71,6 +70,23 @@
|
|||
<arg>-dbuser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportClaimsFromPostgres"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportClaimsFromPostgres">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateDbEntitiesApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/db_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-dburl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-dbuser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-dbpasswd</arg><arg>${postgresPassword}</arg>
|
||||
<arg>-a</arg><arg>claims</arg>
|
||||
</java>
|
||||
<ok to="ImportODFEntitiesFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
@ -113,170 +129,69 @@
|
|||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ExtractPublication"/>
|
||||
<ok to="ImportODFClaimsFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportODFClaimsFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/odf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>ODF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ImportOAFClaimsFromMongoDB"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ImportOAFClaimsFromMongoDB">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication</main-class>
|
||||
<arg>-p</arg><arg>${workingPath}/oaf_claims</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-u</arg><arg>${hdfsUser}</arg>
|
||||
<arg>-mongourl</arg><arg>${mongourl}</arg>
|
||||
<arg>-db</arg><arg>${mongoDb}</arg>
|
||||
<arg>-f</arg><arg>OAF</arg>
|
||||
<arg>-l</arg><arg>store</arg>
|
||||
<arg>-i</arg><arg>claim</arg>
|
||||
<arg>-pgurl</arg><arg>${postgresURL}</arg>
|
||||
<arg>-pguser</arg><arg>${postgresUser}</arg>
|
||||
<arg>-pgpasswd</arg><arg>${postgresPassword}</arg>
|
||||
</java>
|
||||
<ok to="ExtractEntities"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ExtractPublication">
|
||||
<action name="ExtractEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: publication</name>
|
||||
<name>ExtractEntities</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/publication</arg>
|
||||
<arg>-e</arg><arg>publication</arg>
|
||||
</spark>
|
||||
<ok to="ExtractDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: dataset</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/dataset</arg>
|
||||
<arg>-e</arg><arg>dataset</arg>
|
||||
</spark>
|
||||
<ok to="ExtractSoftware"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractSoftware">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: software</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/software</arg>
|
||||
<arg>-e</arg><arg>software</arg>
|
||||
</spark>
|
||||
<ok to="ExtractORP"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractORP">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: otherresearchproduct</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/otherresearchproduct</arg>
|
||||
<arg>-e</arg><arg>otherresearchproduct</arg>
|
||||
</spark>
|
||||
<ok to="ExtractDatasource"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractDatasource">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: datasource</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/datasource</arg>
|
||||
<arg>-e</arg><arg>datasource</arg>
|
||||
</spark>
|
||||
<ok to="ExtractOrganization"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractOrganization">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: organization</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/organization</arg>
|
||||
<arg>-e</arg><arg>organization</arg>
|
||||
</spark>
|
||||
<ok to="ExtractProject"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractProject">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: project</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/project</arg>
|
||||
<arg>-e</arg><arg>project</arg>
|
||||
</spark>
|
||||
<ok to="ExtractRelation"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExtractRelation">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractEntities: relation</name>
|
||||
<class>eu.dnetlib.dhp.migration.ExtractEntitiesFromHDFSJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.warehouse.dir="/user/hive/warehouse"</spark-opts>
|
||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}/relation</arg>
|
||||
<arg>-e</arg><arg>relation</arg>
|
||||
<arg>-s</arg><arg>${workingPath}/db_entities,${workingPath}/oaf_entities,${workingPath}/odf_entities</arg>
|
||||
<arg>-g</arg><arg>${graphRawPath}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1 @@
|
|||
SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE;
|
Loading…
Reference in New Issue