WIP: fixing dedup workflows

This commit is contained in:
Claudio Atzori 2020-03-20 19:17:24 +01:00
parent 6cb0a9bff0
commit a4c52661a0
6 changed files with 25 additions and 5 deletions

View File

@ -65,6 +65,15 @@
<groupId>com.arakelian</groupId>
<artifactId>java-jq</artifactId>
</dependency>
<dependency>
<groupId>dom4j</groupId>
<artifactId>dom4j</artifactId>
</dependency>
<dependency>
<groupId>jaxen</groupId>
<artifactId>jaxen</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib</groupId>

View File

@ -15,8 +15,6 @@ import scala.Tuple2;
import java.util.Collection;
import static java.util.stream.Collectors.toMap;
public class DedupRecordFactory {
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {

View File

@ -34,8 +34,11 @@ public class SparkCreateDedupRecord {
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
String subEntity = dedupConf.getWf().getSubEntityValue();
final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
final OafEntityType entityType = OafEntityType.valueOf(subEntity);
final JavaRDD<OafEntity> dedupRecord =
DedupRecordFactory.createDedupRecord(sc, spark, DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity), DedupUtility.createEntityPath(graphBasePath, subEntity), OafEntityType.valueOf(subEntity), dedupConf);
DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf);
dedupRecord.map(r -> {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(r);

View File

@ -47,6 +47,12 @@ public class SparkCreateSimRels implements Serializable {
final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
System.out.println(String.format("rawSet: '%s'", rawSet));
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl));
System.out.println(String.format("actionSetId: '%s'", actionSetId));
System.out.println(String.format("workingPath: '%s'", workingPath));
try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
@ -58,7 +64,7 @@ public class SparkCreateSimRels implements Serializable {
final String entity = dedupConf.getWf().getEntityType();
final String subEntity = dedupConf.getWf().getSubEntityValue();
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(graphBasePath + "/" + subEntity)
JavaPairRDD<String, MapDocument> mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity))
.mapToPair(s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);
return new Tuple2<>(d.getIdentifier(), d);

View File

@ -19,7 +19,7 @@
},
{
"paramName": "i",
"paramLongName": "rawGraphBasePath",
"paramLongName": "graphBasePath",
"paramDescription": "the base path of the raw graph",
"paramRequired": true
},

View File

@ -65,6 +65,10 @@
<action name="DuplicateScan">
<spark xmlns="uri:oozie:spark-action:0.2">
<prepare>
<delete path="${rawSet}"/>
<delete path="${workingPath}/${actionSetId}/*_simrel"/>
</prepare>
<master>yarn</master>
<mode>cluster</mode>
<name>Create Similarity Relations</name>