From 71813795f61a1ce23944d107145c4a994a7f9425 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Sat, 18 Apr 2020 12:06:23 +0200 Subject: [PATCH] various refactorings on the dnet-dedup-openaire workflow --- .../dhp/oa/dedup/AbstractSparkAction.java | 47 +-- .../dhp/oa/dedup/DedupRecordFactory.java | 303 ++++-------------- .../java/eu/dnetlib/dhp/oa/dedup/Deduper.java | 58 +--- .../dnetlib/dhp/oa/dedup/OafEntityType.java | 15 - .../dhp/oa/dedup/SparkCreateDedupRecord.java | 27 +- .../dhp/oa/dedup/SparkCreateMergeRels.java | 33 +- .../dhp/oa/dedup/SparkCreateSimRels.java | 45 ++- .../dhp/oa/dedup/SparkPropagateRelation.java | 35 +- .../dnetlib/dhp/oa/dedup/SparkReporter.java | 6 +- .../dhp/oa/dedup/SparkUpdateEntity.java | 24 +- .../eu/dnetlib/dhp/oa/dedup/model/Block.java | 76 +++++ .../dedup/consistency/oozie_app/workflow.xml | 1 - .../dhp/oa/dedup/createCC_parameters.json | 6 - .../dedup/createDedupRecord_parameters.json | 6 - .../oa/dedup/createSimRels_parameters.json | 6 - .../dedup/propagateRelation_parameters.json | 6 - .../dhp/oa/dedup/scan/oozie_app/workflow.xml | 4 - .../dhp/oa/dedup/updateEntity_parameters.json | 6 - 18 files changed, 262 insertions(+), 442 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java create mode 100644 dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 6213f342a..f52e4bb39 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -4,11 +4,14 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -73,47 +76,21 @@ abstract class AbstractSparkAction implements Serializable { abstract void run(ISLookUpService isLookUpService) throws DocumentException, IOException, ISLookUpException; - protected static SparkSession getSparkSession(ArgumentApplicationParser parser) { - SparkConf conf = new SparkConf(); - - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(new Class[] { - Author.class, - Context.class, - Country.class, - DataInfo.class, - Dataset.class, - Datasource.class, - ExternalReference.class, - ExtraInfo.class, - Field.class, - GeoLocation.class, - Instance.class, - Journal.class, - KeyValue.class, - Oaf.class, - OafEntity.class, - OAIProvenance.class, - Organization.class, - OriginDescription.class, - OtherResearchProduct.class, - Project.class, - Publication.class, - Qualifier.class, - Relation.class, - Result.class, - Software.class, - StructuredProperty.class - }); - + protected static SparkSession getSparkSession(SparkConf conf) { return SparkSession .builder() - .appName(SparkCreateSimRels.class.getSimpleName()) - .master(parser.get("master")) .config(conf) .getOrCreate(); } + protected static void save(Dataset dataset, String outPath, SaveMode mode) { + dataset + .write() + .option("compression", "gzip") + .mode(mode) + .json(outPath); + } + protected static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index df64d1011..da16e6dff 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -1,274 +1,95 @@ package eu.dnetlib.dhp.oa.dedup; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; -import org.codehaus.jackson.map.ObjectMapper; + import scala.Tuple2; import java.util.Collection; +import java.util.Iterator; public class DedupRecordFactory { - public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) { + protected static final ObjectMapper OBJECT_MAPPER = new com.fasterxml.jackson.databind.ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public static Dataset createDedupRecord( + final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final Class clazz, final DedupConfig dedupConf) { + long ts = System.currentTimeMillis(); + // - final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath) - .mapToPair((PairFunction) it -> - new Tuple2(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it) - ); + Dataset> entities = spark.read() + .textFile(entitiesInputPath) + .map((MapFunction>) it -> { + T entity = OBJECT_MAPPER.readValue(it, clazz); + return new Tuple2<>(entity.getId(), entity); + }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + //: source is the dedup_id, target is the id of the mergedIn - JavaPairRDD mergeRels = spark - .read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class)) - .where("relClass=='merges'") - .javaRDD() - .mapToPair( - (PairFunction) r -> - new Tuple2(r.getTarget(), r.getSource()) - ); + Dataset> mergeRels = spark + .read() + .load(mergeRelsInputPath) + .as(Encoders.bean(Relation.class)) + .where("relClass == 'merges'") + .map((MapFunction>) + r -> new Tuple2<>(r.getSource(), r.getTarget()), Encoders.tuple(Encoders.STRING(), Encoders.STRING())); // - final JavaPairRDD joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction>, String, String>) Tuple2::_2); - - JavaPairRDD> sortedJoinResult = joinResult.groupByKey(); - - switch (entityType) { - case publication: - return sortedJoinResult.map(p -> DedupRecordFactory.publicationMerger(p, ts)); - case dataset: - return sortedJoinResult.map(d -> DedupRecordFactory.datasetMerger(d, ts)); - case project: - return sortedJoinResult.map(p -> DedupRecordFactory.projectMerger(p, ts)); - case software: - return sortedJoinResult.map(s -> DedupRecordFactory.softwareMerger(s, ts)); - case datasource: - return sortedJoinResult.map(d -> DedupRecordFactory.datasourceMerger(d, ts)); - case organization: - return sortedJoinResult.map(o -> DedupRecordFactory.organizationMerger(o, ts)); - case otherresearchproduct: - return sortedJoinResult.map(o -> DedupRecordFactory.otherresearchproductMerger(o, ts)); - default: - return null; - } - + return mergeRels.joinWith(entities, mergeRels.col("_1").equalTo(entities.col("_1")), "left_outer") + .filter((FilterFunction, Tuple2>>) value -> value._2() != null) + .map((MapFunction, Tuple2>, T>) + value -> value._2()._2(), Encoders.kryo(clazz)) + .groupByKey((MapFunction) value -> value.getId(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) + (key, values) -> entityMerger(key, values, ts, clazz), Encoders.bean(clazz)); } - private static Publication publicationMerger(Tuple2> e, final long ts) { + private static T entityMerger(String id, Iterator entities, final long ts, Class clazz) { + try { + T entity = clazz.newInstance(); + entity.setId(id); + if (entity.getDataInfo() == null) { + entity.setDataInfo(new DataInfo()); + } + entity.getDataInfo().setTrust("0.9"); + entity.setLastupdatetimestamp(ts); - Publication p = new Publication(); //the result of the merge, to be returned at the end + final Collection dates = Lists.newArrayList(); + entities.forEachRemaining(e -> { + entity.mergeFrom(e); + if (ModelSupport.isSubClass(e, Result.class)) { + Result r1 = (Result) e; + Result er = (Result) entity; + er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); - p.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(pub -> { - try { - Publication publication = mapper.readValue(pub, Publication.class); - - p.mergeFrom(publication); - p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); - //add to the list if they are not null - if (publication.getDateofacceptance() != null) - dateofacceptance.add(publication.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - p.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (p.getDataInfo() == null) - p.setDataInfo(new DataInfo()); - p.getDataInfo().setTrust("0.9"); - p.setLastupdatetimestamp(ts); - return p; - } - - private static Dataset datasetMerger(Tuple2> e, final long ts) { - - Dataset d = new Dataset(); //the result of the merge, to be returned at the end - - d.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(dat -> { - try { - Dataset dataset = mapper.readValue(dat, Dataset.class); - - d.mergeFrom(dataset); - d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor())); - //add to the list if they are not null - if (dataset.getDateofacceptance() != null) - dateofacceptance.add(dataset.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - d.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (d.getDataInfo() == null) - d.setDataInfo(new DataInfo()); - d.getDataInfo().setTrust("0.9"); - d.setLastupdatetimestamp(ts); - return d; - } - - private static Project projectMerger(Tuple2> e, final long ts) { - - Project p = new Project(); //the result of the merge, to be returned at the end - - p.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - if (e._2() != null) - e._2().forEach(proj -> { - try { - Project project = mapper.readValue(proj, Project.class); - - p.mergeFrom(project); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (p.getDataInfo() == null) - p.setDataInfo(new DataInfo()); - p.getDataInfo().setTrust("0.9"); - p.setLastupdatetimestamp(ts); - return p; - } - - private static Software softwareMerger(Tuple2> e, final long ts) { - - Software s = new Software(); //the result of the merge, to be returned at the end - - s.setId(e._1()); - final ObjectMapper mapper = new ObjectMapper(); - final Collection dateofacceptance = Lists.newArrayList(); - if (e._2() != null) - e._2().forEach(soft -> { - try { - Software software = mapper.readValue(soft, Software.class); - - s.mergeFrom(software); - s.setAuthor(DedupUtility.mergeAuthor(s.getAuthor(), software.getAuthor())); - //add to the list if they are not null - if (software.getDateofacceptance() != null) - dateofacceptance.add(software.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - s.setDateofacceptance(DatePicker.pick(dateofacceptance)); - if (s.getDataInfo() == null) - s.setDataInfo(new DataInfo()); - s.getDataInfo().setTrust("0.9"); - s.setLastupdatetimestamp(ts); - return s; - } - - private static Datasource datasourceMerger(Tuple2> e, final long ts) { - Datasource d = new Datasource(); //the result of the merge, to be returned at the end - d.setId(e._1()); - final ObjectMapper mapper = new ObjectMapper(); - if (e._2() != null) - e._2().forEach(dat -> { - try { - Datasource datasource = mapper.readValue(dat, Datasource.class); - - d.mergeFrom(datasource); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (d.getDataInfo() == null) - d.setDataInfo(new DataInfo()); - d.getDataInfo().setTrust("0.9"); - d.setLastupdatetimestamp(ts); - return d; - } - - private static Organization organizationMerger(Tuple2> e, final long ts) { - - Organization o = new Organization(); //the result of the merge, to be returned at the end - - o.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - - StringBuilder trust = new StringBuilder("0.0"); - - if (e._2() != null) - e._2().forEach(pub -> { - try { - Organization organization = mapper.readValue(pub, Organization.class); - - final String currentTrust = organization.getDataInfo().getTrust(); - if (!"1.0".equals(currentTrust)) { - trust.setLength(0); - trust.append(currentTrust); + if (er.getDateofacceptance() != null) { + dates.add(r1.getDateofacceptance().getValue()); } - o.mergeFrom(organization); - - } catch (Exception exc) { - throw new RuntimeException(exc); } }); - if (o.getDataInfo() == null) - { - o.setDataInfo(new DataInfo()); + if (ModelSupport.isSubClass(entity, Result.class)) { + ((Result) entity).setDateofacceptance(DatePicker.pick(dates)); + } + return entity; + } catch (IllegalAccessException | InstantiationException e) { + throw new RuntimeException(e); } - if (o.getDataInfo() == null) - o.setDataInfo(new DataInfo()); - o.getDataInfo().setTrust("0.9"); - o.setLastupdatetimestamp(ts); - - return o; - } - - private static OtherResearchProduct otherresearchproductMerger(Tuple2> e, final long ts) { - - OtherResearchProduct o = new OtherResearchProduct(); //the result of the merge, to be returned at the end - - o.setId(e._1()); - - final ObjectMapper mapper = new ObjectMapper(); - - final Collection dateofacceptance = Lists.newArrayList(); - - if (e._2() != null) - e._2().forEach(orp -> { - try { - OtherResearchProduct otherResearchProduct = mapper.readValue(orp, OtherResearchProduct.class); - - o.mergeFrom(otherResearchProduct); - o.setAuthor(DedupUtility.mergeAuthor(o.getAuthor(), otherResearchProduct.getAuthor())); - //add to the list if they are not null - if (otherResearchProduct.getDateofacceptance() != null) - dateofacceptance.add(otherResearchProduct.getDateofacceptance().getValue()); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - }); - if (o.getDataInfo() == null) - o.setDataInfo(new DataInfo()); - o.setDateofacceptance(DatePicker.pick(dateofacceptance)); - o.getDataInfo().setTrust("0.9"); - o.setLastupdatetimestamp(ts); - return o; } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java index 1f9f84c55..28b85853f 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/Deduper.java @@ -1,74 +1,50 @@ package eu.dnetlib.dhp.oa.dedup; +import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.BlockProcessor; -import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFlatMapFunction; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.util.LongAccumulator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Serializable; import scala.Tuple2; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class Deduper implements Serializable { - private static final Logger log = LoggerFactory.getLogger(Deduper.class); - - public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD> blocks, DedupConfig config) { + public static JavaPairRDD computeRelations(JavaSparkContext context, JavaPairRDD blocks, DedupConfig config) { Map accumulators = DedupUtility.constructAccumulator(config, context.sc()); - return blocks.flatMapToPair((PairFlatMapFunction>, String, String>) it -> { - try { - final SparkReporter reporter = new SparkReporter(accumulators); - new BlockProcessor(config).processSortedBlock(it._1(), it._2(), reporter); - return reporter.getRelations().iterator(); - } catch (Exception e) { - throw new RuntimeException(it._2().get(0).getIdentifier(), e); - } - }).mapToPair( - (PairFunction, String, Tuple2>) item -> - new Tuple2>(item._1() + item._2(), item)) + return blocks + .flatMapToPair(it -> { + final SparkReporter reporter = new SparkReporter(accumulators); + new BlockProcessor(config).processSortedBlock(it._1(), it._2().getDocuments(), reporter); + return reporter.getRelations().iterator(); + }) + .mapToPair(it -> new Tuple2<>(it._1() + it._2(), it)) .reduceByKey((a, b) -> a) - .mapToPair((PairFunction>, String, String>) Tuple2::_2); + .mapToPair(Tuple2::_2); } - public static JavaPairRDD> createSortedBlocks(JavaSparkContext context, JavaPairRDD mapDocs, DedupConfig config) { + public static JavaPairRDD createSortedBlocks(JavaPairRDD mapDocs, DedupConfig config) { final String of = config.getWf().getOrderField(); final int maxQueueSize = config.getWf().getGroupMaxSize(); + return mapDocs //the reduce is just to be sure that we haven't document with same id .reduceByKey((a, b) -> a) .map(Tuple2::_2) //Clustering: from to List - .flatMapToPair((PairFlatMapFunction>) a -> - DedupUtility.getGroupingKeys(config, a) + .flatMap(a -> DedupUtility.getGroupingKeys(config, a) .stream() - .map(it -> { - List tmp = new ArrayList<>(); - tmp.add(a); - return new Tuple2<>(it, tmp); - } - ) + .map(it -> Block.from(it, a)) .collect(Collectors.toList()) .iterator()) - .reduceByKey((Function2, List, List>) (v1, v2) -> { - v1.addAll(v2); - v1.sort(Comparator.comparing(a -> a.getFieldMap().get(of).stringValue())); - if (v1.size() > maxQueueSize) - return new ArrayList<>(v1.subList(0, maxQueueSize)); - return v1; - }); + .mapToPair(block -> new Tuple2<>(block.getKey(), block)) + .reduceByKey((b1, b2) -> Block.from(b1, b2, of, maxQueueSize)); } + } \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java deleted file mode 100644 index da2bc3a37..000000000 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/OafEntityType.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.dnetlib.dhp.oa.dedup; - -public enum OafEntityType { - - datasource, - organization, - project, - dataset, - otherresearchproduct, - software, - publication - - - -} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 02cf6587a..c2b1fc9c2 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,14 +1,21 @@ package eu.dnetlib.dhp.oa.dedup; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.dom4j.DocumentException; import org.slf4j.Logger; @@ -30,7 +37,12 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"))); parser.parseArgument(args); - new SparkCreateDedupRecord(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCreateDedupRecord(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override @@ -46,8 +58,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { String subEntity = dedupConf.getWf().getSubEntityValue(); log.info("Creating deduprecords for: '{}'", subEntity); @@ -57,11 +67,14 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); final String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity); - final OafEntityType entityType = OafEntityType.valueOf(subEntity); - final JavaRDD dedupRecord = - DedupRecordFactory.createDedupRecord(sc, spark, mergeRelPath, entityPath, entityType, dedupConf); - dedupRecord.map(r -> OBJECT_MAPPER.writeValueAsString(r)).saveAsTextFile(outputPath); + Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); + + DedupRecordFactory.createDedupRecord(spark, mergeRelPath, entityPath, clazz, dedupConf) + .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) + .write() + .mode(SaveMode.Overwrite) + .json(outputPath); } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index ef82b4eaf..9c46404b7 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -4,6 +4,7 @@ import com.google.common.hash.Hashing; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -13,8 +14,8 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.graphx.Edge; @@ -52,7 +53,11 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String isLookUpUrl = parser.get("isLookUpUrl"); log.info("isLookupUrl {}", isLookUpUrl); - new SparkCreateMergeRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkCreateMergeRels(parser, getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService(isLookUpUrl)); } @Override @@ -79,32 +84,30 @@ public class SparkCreateMergeRels extends AbstractSparkAction { log.info("Max iterations {}", maxIterations); final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); + final JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity) .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) - s -> new Tuple2<>(getHashcode(s), s)); + .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); - final Dataset similarityRelations = spark + final RDD> edgeRdd = spark .read() .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) - .as(Encoders.bean(Relation.class)); - - final RDD> edgeRdd = similarityRelations + .as(Encoders.bean(Relation.class)) .javaRDD() - .map(it -> new Edge<>(getHashcode(it.getSource()), getHashcode(it.getTarget()), it.getRelClass())) + .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); - final RDD connectedComponents = GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) + final Dataset mergeRels = spark + .createDataset(GraphProcessor.findCCs(vertexes.rdd(), edgeRdd, maxIterations) .toJavaRDD() .filter(k -> k.getDocIds().size() > 1) .flatMap(cc -> ccToMergeRel(cc, dedupConf)) - .rdd(); + .rdd(), Encoders.bean(Relation.class)); - spark - .createDataset(connectedComponents, Encoders.bean(Relation.class)) + mergeRels .write() .mode(SaveMode.Append) - .save(mergeRelPath); + .parquet(mergeRelPath); } } @@ -148,7 +151,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { return r; } - public static long getHashcode(final String id) { + public static long hash(final String id) { return Hashing.murmur3_128().hashString(id).asLong(); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index de842d822..d02aef64c 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -1,23 +1,26 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.oa.dedup.model.Block; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.FieldListImpl; +import eu.dnetlib.pace.model.FieldValueImpl; import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.io.Text; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -27,7 +30,6 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.util.List; public class SparkCreateSimRels extends AbstractSparkAction { @@ -43,7 +45,17 @@ public class SparkCreateSimRels extends AbstractSparkAction { SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"))); parser.parseArgument(args); - new SparkCreateSimRels(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(new Class[] { + MapDocument.class, + FieldListImpl.class, + FieldValueImpl.class, + Block.class + }); + + new SparkCreateSimRels(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @Override @@ -60,8 +72,6 @@ public class SparkCreateSimRels extends AbstractSparkAction { log.info("actionSetId: '{}'", actionSetId); log.info("workingPath: '{}'", workingPath); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - //for each dedup configuration for (DedupConfig dedupConf: getConfigurations(isLookUpService, actionSetId)) { @@ -72,29 +82,30 @@ public class SparkCreateSimRels extends AbstractSparkAction { final String outputPath = DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity); removeOutputDir(spark, outputPath); - JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) - .mapToPair((PairFunction) s -> { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaPairRDD mapDocuments = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair((PairFunction) s -> { MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); return new Tuple2<>(d.getIdentifier(), d); }); //create blocks for deduplication - JavaPairRDD> blocks = Deduper.createSortedBlocks(sc, mapDocument, dedupConf); + JavaPairRDD blocks = Deduper.createSortedBlocks(mapDocuments, dedupConf); //create relations by comparing only elements in the same group - final JavaPairRDD dedupRels = Deduper.computeRelations(sc, blocks, dedupConf); - - JavaRDD relationsRDD = dedupRels.map(r -> createSimRel(r._1(), r._2(), entity)); + JavaRDD relations = Deduper.computeRelations(sc, blocks, dedupConf) + .map(t -> createSimRel(t._1(), t._2(), entity)); //save the simrel in the workingdir - spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)) + spark.createDataset(relations.rdd(), Encoders.bean(Relation.class)) .write() .mode(SaveMode.Append) .save(outputPath); } } - public Relation createSimRel(String source, String target, String entity) { + private Relation createSimRel(String source, String target, String entity) { final Relation r = new Relation(); r.setSource(source); r.setTarget(target); @@ -102,7 +113,7 @@ public class SparkCreateSimRels extends AbstractSparkAction { r.setRelClass("isSimilarTo"); r.setDataInfo(new DataInfo()); - switch(entity){ + switch(entity) { case "result": r.setRelType("resultResult"); break; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 371d70ba2..86d19d96d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.oa.dedup; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -10,11 +11,9 @@ import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Tuple2; @@ -43,7 +42,11 @@ public class SparkPropagateRelation extends AbstractSparkAction { parser.parseArgument(args); - new SparkPropagateRelation(parser, getSparkSession(parser)) + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkPropagateRelation(parser, getSparkSession(conf)) .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } @@ -90,7 +93,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { processDataset(rels, mergedIds, FieldType.SOURCE, getDeletedFn()), mergedIds, FieldType.TARGET, getDeletedFn()); - save(newRels.union(updated), outputRelationPath); + save(newRels.union(updated), outputRelationPath, SaveMode.Overwrite); } @@ -164,26 +167,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { }; } - private void deletePath(String path) { - try { - Path p = new Path(path); - FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); - - if (fs.exists(p)) { - fs.delete(p, true); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private static void save(Dataset dataset, String outPath) { - dataset - .write() - .option("compression", "gzip") - .json(outPath); - } - private static boolean containsDedup(final Relation r) { return r.getSource().toLowerCase().contains("dedup") || r.getTarget().toLowerCase().contains("dedup"); } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java index cc03db385..98ee37e14 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkReporter.java @@ -13,9 +13,9 @@ import java.util.Map; public class SparkReporter implements Serializable, Reporter { - final List> relations = new ArrayList<>(); - private static final Log log = LogFactory.getLog(SparkReporter.class); - Map accumulators; + private final List> relations = new ArrayList<>(); + + private Map accumulators; public SparkReporter(Map accumulators){ this.accumulators = accumulators; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 5347489e7..ea0a06bbe 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -1,13 +1,17 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.oa.dedup.model.Block; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Oaf; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.model.FieldListImpl; +import eu.dnetlib.pace.model.FieldValueImpl; +import eu.dnetlib.pace.model.MapDocument; import eu.dnetlib.pace.util.MapDocumentUtil; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; @@ -15,6 +19,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -28,7 +33,6 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import java.io.IOException; -import java.util.Map; public class SparkUpdateEntity extends AbstractSparkAction { @@ -43,10 +47,16 @@ public class SparkUpdateEntity extends AbstractSparkAction { public static void main(String[] args) throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( IOUtils.toString( - SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); + SparkUpdateEntity.class.getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"))); parser.parseArgument(args); - new SparkUpdateEntity(parser, getSparkSession(parser)).run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + + new SparkUpdateEntity(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); } public void run(ISLookUpService isLookUpService) throws IOException { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java new file mode 100644 index 000000000..e1ccf143c --- /dev/null +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Block.java @@ -0,0 +1,76 @@ +package eu.dnetlib.dhp.oa.dedup.model; + +import com.google.common.collect.Lists; +import eu.dnetlib.pace.model.MapDocument; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class Block implements Serializable { + + private String key; + + private List documents; + + public Block() { + super(); + } + + public static Block from(String key, MapDocument doc) { + Block block = new Block(); + block.setKey(key); + block.setDocuments(Lists.newArrayList(doc)); + return block; + } + + public static Block from(String key, Iterator blocks, String orderField, int maxSize) { + Block block = new Block(); + block.setKey(key); + + Iterable it = () -> blocks; + + block.setDocuments( + StreamSupport.stream(it.spliterator(), false) + .flatMap(b -> b.getDocuments().stream()) + .sorted(Comparator.comparing(a -> a.getFieldMap().get(orderField).stringValue())) + .limit(maxSize) + .collect(Collectors.toCollection(ArrayList::new))); + return block; + } + + public static Block from(Block b1, Block b2, String orderField, int maxSize) { + Block block = new Block(); + block.setKey(b1.getKey()); + block.setDocuments( + Stream.concat( + b1.getDocuments().stream(), + b2.getDocuments().stream()) + .sorted(Comparator.comparing(a -> a.getFieldMap().get(orderField).stringValue())) + .limit(maxSize) + .collect(Collectors.toCollection(ArrayList::new))); + + return block; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public List getDocuments() { + return documents; + } + + public void setDocuments(List documents) { + this.documents = documents; + } +} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index df6877dfa..926287032 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -91,7 +91,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=7680 - -mtyarn --i${graphBasePath} --o${dedupGraphPath} --w${workingPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json index 42ef2b78e..6eedd5432 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "asi", "paramLongName": "actionSetId", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json index f7bf5e518..3d48cb2d8 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "i", "paramLongName": "graphBasePath", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json index 8cffa86dc..ce38dc6f0 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "la", "paramLongName": "isLookUpUrl", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json index 721a783e1..6a2a48746 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json @@ -1,10 +1,4 @@ [ - { - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true - }, { "paramName": "i", "paramLongName": "graphBasePath", diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml index 6790dde32..2451947a1 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/scan/oozie_app/workflow.xml @@ -98,7 +98,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --la${isLookUpUrl} --asi${actionSetId} @@ -125,7 +124,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl} @@ -152,7 +150,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --la${isLookUpUrl} @@ -179,7 +176,6 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.shuffle.partitions=3840 - -mtyarn --i${graphBasePath} --w${workingPath} --o${dedupGraphPath} diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json index 06b67f732..c91f3c04b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json @@ -1,10 +1,4 @@ [ -{ - "paramName": "mt", - "paramLongName": "master", - "paramDescription": "should be local or yarn", - "paramRequired": true -}, { "paramName": "i", "paramLongName": "graphBasePath",