code formatting

This commit is contained in:
Claudio Atzori 2020-07-13 16:46:13 +02:00
parent 8d2102d7d2
commit c6f6fb0f28
4 changed files with 19 additions and 13 deletions

View File

@ -78,9 +78,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction {
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
int cut = Optional int cut = Optional
.ofNullable(parser.get("cutConnectedComponent")) .ofNullable(parser.get("cutConnectedComponent"))
.map(Integer::valueOf) .map(Integer::valueOf)
.orElse(0); .orElse(0);
log.info("connected component cut: '{}'", cut); log.info("connected component cut: '{}'", cut);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);
log.info("isLookUpUrl: '{}'", isLookUpUrl); log.info("isLookUpUrl: '{}'", isLookUpUrl);

View File

@ -65,9 +65,9 @@ public class SparkCreateSimRels extends AbstractSparkAction {
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
final int numPartitions = Optional final int numPartitions = Optional
.ofNullable(parser.get("numPartitions")) .ofNullable(parser.get("numPartitions"))
.map(Integer::valueOf) .map(Integer::valueOf)
.orElse(NUM_PARTITIONS); .orElse(NUM_PARTITIONS);
log.info("numPartitions: '{}'", numPartitions); log.info("numPartitions: '{}'", numPartitions);
log.info("graphBasePath: '{}'", graphBasePath); log.info("graphBasePath: '{}'", graphBasePath);

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col; import static org.apache.spark.sql.functions.col;
import com.google.common.base.Joiner;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -13,6 +12,8 @@ import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.DataInfo;
@ -99,17 +100,21 @@ public class SparkPropagateRelation extends AbstractSparkAction {
getDeletedFn()); getDeletedFn());
save( save(
distinctRelations(newRels distinctRelations(
.union(updated) newRels
.union(mergeRels) .union(updated)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))), .union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
outputRelationPath, SaveMode.Overwrite); outputRelationPath, SaveMode.Overwrite);
} }
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) { private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels return rels
.filter(getRelationFilterFunction()) .filter(getRelationFilterFunction())
.groupByKey((MapFunction<Relation, String>) r -> String.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()) .groupByKey(
(MapFunction<Relation, String>) r -> String
.join(r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.agg(new RelationAggregator().toColumn()) .agg(new RelationAggregator().toColumn())
.map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class)); .map((MapFunction<Tuple2<String, Relation>, Relation>) t -> t._2(), Encoders.bean(Relation.class));
} }

View File

@ -292,7 +292,8 @@ public class SparkDedupTest implements Serializable {
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel")); FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel")); FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel")); FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
FileUtils.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel")); FileUtils
.deleteDirectory(new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
} }
@Test @Test