implementation of the mechanism that checks the existance of a mergerel file

This commit is contained in:
miconis 2020-03-23 17:13:30 +01:00
parent c20e179f5a
commit f7890a90df
6 changed files with 137 additions and 94 deletions

View File

@ -4,10 +4,10 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil; import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaPairRDD;
@ -18,13 +18,14 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2; import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
public class SparkUpdateEntity { public class SparkUpdateEntity {
final String IDJSONPATH = "$.id";
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/updateEntity_parameters.json")));
parser.parseArgument(args); parser.parseArgument(args);
@ -32,65 +33,82 @@ public class SparkUpdateEntity {
new SparkUpdateEntity().run(parser); new SparkUpdateEntity().run(parser);
} }
public void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException { public boolean mergeRelExists(String basePath, String entity) throws IOException {
boolean result = false;
FileSystem fileSystem = FileSystem.get(new Configuration());
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(basePath));
for (FileStatus fs : fileStatuses) {
if (fs.isDirectory())
if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity))))
result = true;
}
return result;
}
public void run(ArgumentApplicationParser parser) throws IOException {
final String graphBasePath = parser.get("graphBasePath"); final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath"); final String dedupGraphPath = parser.get("dedupGraphPath");
final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId");
try (SparkSession spark = getSparkSession(parser)) { try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { //for each entity
for (OafEntityType entity: OafEntityType.values()) {
String subEntity = dedupConf.getWf().getSubEntityValue(); JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, entity.toString()));
final Dataset<Relation> df = spark.read().load(DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class)); if (mergeRelExists(workingPath, entity.toString())) {
final JavaPairRDD<String, String> mergedIds = df
final Dataset<Relation> rel = spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", entity.toString())).as(Encoders.bean(Relation.class));
final JavaPairRDD<String, String> mergedIds = rel
.where("relClass == 'merges'") .where("relClass == 'merges'")
.select(df.col("target")) .select(rel.col("target"))
.distinct() .distinct()
.toJavaRDD() .toJavaRDD()
.mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d")); .mapToPair((PairFunction<Row, String, String>) r -> new Tuple2<>(r.getString(0), "d"));
final JavaRDD<String> sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)); final JavaRDD<String> dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, "*", entity.toString()));
final JavaRDD<String> dedupEntity = sc.textFile(DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity)); JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
JavaPairRDD<String, String> entitiesWithId = sourceEntity.mapToPair((PairFunction<String, String, String>) s -> new Tuple2<>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s), s)); JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), getOafClass(entity)) : k._2()._1());
sourceEntity = map.union(dedupEntity);
}
Class<? extends Oaf> mainClass; sourceEntity.saveAsTextFile(dedupGraphPath + "/" + entity, GzipCodec.class);
switch (subEntity) {
}
}
}
public Class<? extends Oaf> getOafClass(OafEntityType className) {
switch (className.toString()) {
case "publication": case "publication":
mainClass = Publication.class; return Publication.class;
break;
case "dataset": case "dataset":
mainClass = eu.dnetlib.dhp.schema.oaf.Dataset.class; return eu.dnetlib.dhp.schema.oaf.Dataset.class;
break;
case "datasource": case "datasource":
mainClass = Datasource.class; return Datasource.class;
break;
case "software": case "software":
mainClass = Software.class; return Software.class;
break;
case "organization": case "organization":
mainClass = Organization.class; return Organization.class;
break;
case "otherresearchproduct": case "otherresearchproduct":
mainClass = OtherResearchProduct.class; return OtherResearchProduct.class;
break; case "project":
return Project.class;
default: default:
throw new IllegalArgumentException("Illegal type " + subEntity); throw new IllegalArgumentException("Illegal type " + className);
} }
JavaRDD<String> map = entitiesWithId.leftOuterJoin(mergedIds).map(k -> k._2()._2().isPresent() ? updateDeletedByInference(k._2()._1(), mainClass) : k._2()._1());
map.union(dedupEntity).saveAsTextFile(dedupGraphPath + "/" + subEntity, GzipCodec.class);
}
}
} }
private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) { private static <T extends Oaf> String updateDeletedByInference(final String json, final Class<T> clazz) {

View File

@ -1,12 +1,20 @@
<workflow-app name="Create Similarity Relations" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Build Root Records" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>graphBasePath</name> <name>graphBasePath</name>
<description>the raw graph base path</description> <description>the raw graph base path</description>
</property> </property>
<property>
<name>isLookUpUrl</name>
<description>the address of the lookUp service</description>
</property>
<property>
<name>actionSetId</name>
<description>id of the actionSet</description>
</property>
<property> <property>
<name>workingPath</name> <name>workingPath</name>
<description>path for the working directory</description> <description>path of the working directory</description>
</property> </property>
<property> <property>
<name>dedupGraphPath</name> <name>dedupGraphPath</name>
@ -26,12 +34,41 @@
</property> </property>
</parameters> </parameters>
<start to="PropagateRelation"/> <start to="UpdateEntity"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path='${dedupGraphPath}'/>
</prepare>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory ${sparkExecutorMemory}
--executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="PropagateRelation"/>
<error to="Kill"/>
</action>
<action name="PropagateRelation"> <action name="PropagateRelation">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<prepare> <prepare>

View File

@ -82,11 +82,13 @@
<name>Create Dedup Record</name> <name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkCreateDedupRecord</class> <class>eu.dnetlib.dhp.dedup.SparkCreateDedupRecord</class>
<jar>dhp-dedup-${projectVersion}.jar</jar> <jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} <spark-opts>
--driver-memory=${sparkDriverMemory} --conf --executor-memory ${sparkExecutorMemory}
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf --executor-cores ${sparkExecutorCores}
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf --driver-memory=${sparkDriverMemory}
spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener"
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
@ -94,32 +96,6 @@
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg> <arg>--asi</arg><arg>${actionSetId}</arg>
</spark> </spark>
<ok to="UpdateEntity"/>
<error to="Kill"/>
</action>
<action name="UpdateEntity">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create Dedup Record</name>
<class>eu.dnetlib.dhp.dedup.SparkUpdateEntity</class>
<jar>dhp-dedup-${projectVersion}.jar</jar>
<spark-opts>--executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores}
--driver-memory=${sparkDriverMemory} --conf
spark.extraListeners="com.cloudera.spark.lineage.NavigatorAppListener" --conf
spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf
spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg>
<arg>--asi</arg><arg>${actionSetId}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
</spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>

View File

@ -16,18 +16,6 @@
"paramLongName": "workingPath", "paramLongName": "workingPath",
"paramDescription": "the working directory path", "paramDescription": "the working directory path",
"paramRequired": true "paramRequired": true
},
{
"paramName": "la",
"paramLongName": "isLookUpUrl",
"paramDescriptions": "the url of the lookup service",
"paramRequired": true
},
{
"paramName": "asi",
"paramLongName": "actionSetId",
"paramDescriptions": "the id of the actionset (orchestrator)",
"paramRequired": true
}, },
{ {
"paramName": "o", "paramName": "o",

View File

@ -4,6 +4,8 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -17,7 +19,8 @@ public class SparkCreateDedupTest {
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json")); // configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
configuration = "";
} }
@Test @Test
@ -62,11 +65,13 @@ public class SparkCreateDedupTest {
} }
@Test @Test
@Ignore
public void printConfiguration() throws Exception { public void printConfiguration() throws Exception {
System.out.println(ArgumentApplicationParser.compressArgument(configuration)); System.out.println(ArgumentApplicationParser.compressArgument(configuration));
} }
@Test @Test
@Ignore
public void testHashCode() { public void testHashCode() {
final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f"; final String s1 = "20|grid________::6031f94bef015a37783268ec1e75f17f";
final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46"; final String s2 = "20|nsf_________::b12be9edf414df8ee66b4c52a2d8da46";
@ -79,4 +84,23 @@ public class SparkCreateDedupTest {
System.out.println(hashFunction.hashString(s2).asLong()); System.out.println(hashFunction.hashString(s2).asLong());
} }
@Test
public void fileExistsTest() throws IOException {
boolean result = false;
FileSystem fileSystem = FileSystem.get(new Configuration());
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/tmp"));
for (FileStatus fs : fileStatuses) {
if (fs.isDirectory()) {
if (fileSystem.exists(new Path(DedupUtility.createMergeRelPath("/tmp", fs.getPath().getName(), "cicciopasticcio")))) {
System.out.println("fs = " + DedupUtility.createMergeRelPath("/tmp", fs.getPath().getName(), "cicciopasticcio"));
result = true;
}
}
}
}
} }