diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java index 6b1d14829..8afa41f95 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java @@ -226,19 +226,19 @@ public class GraphCleaningFunctions extends CleaningFunctions { public static boolean filter(T value) { if (!(value instanceof Relation) && (Boolean.TRUE - .equals( - Optional - .ofNullable(value) - .map( - o -> Optional - .ofNullable(o.getDataInfo()) - .map( - d -> Optional - .ofNullable(d.getInvisible()) - .orElse(true)) - .orElse(false)) - .orElse(true)))) { - return true; + .equals( + Optional + .ofNullable(value) + .map( + o -> Optional + .ofNullable(o.getDataInfo()) + .map( + d -> Optional + .ofNullable(d.getInvisible()) + .orElse(true)) + .orElse(false)) + .orElse(true)))) { + return true; } if (value instanceof Datasource) { @@ -292,9 +292,10 @@ public class GraphCleaningFunctions extends CleaningFunctions { } else if (value instanceof Result) { Result r = (Result) value; - if (Objects.nonNull(r.getFulltext()) && (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) || - ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) { - r.setFulltext(null); + if (Objects.nonNull(r.getFulltext()) + && (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) || + ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) { + r.setFulltext(null); } @@ -326,10 +327,13 @@ public class GraphCleaningFunctions extends CleaningFunctions { if (StringUtils.isBlank(r.getPublisher().getValue())) { r.setPublisher(null); } else { - r.getPublisher().setValue( - r.getPublisher().getValue() - .replaceAll(NAME_CLEANING_REGEX, " ") - ); + r + .getPublisher() + .setValue( + r + .getPublisher() + .getValue() + .replaceAll(NAME_CLEANING_REGEX, " ")); } } if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) { @@ -498,8 +502,8 @@ public class GraphCleaningFunctions extends CleaningFunctions { } } if (StringUtils.isNotBlank(i.getFulltext()) && - (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) || - ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) { + (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) || + ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) { i.setFulltext(null); } } @@ -623,25 +627,28 @@ public class GraphCleaningFunctions extends CleaningFunctions { private static Author cleanupAuthor(Author author) { if (StringUtils.isNotBlank(author.getFullname())) { - author.setFullname( - author.getFullname() - .replaceAll(NAME_CLEANING_REGEX, " ") - .replace("\"", "\\\"") - ); + author + .setFullname( + author + .getFullname() + .replaceAll(NAME_CLEANING_REGEX, " ") + .replace("\"", "\\\"")); } if (StringUtils.isNotBlank(author.getName())) { - author.setName( - author.getName() - .replaceAll(NAME_CLEANING_REGEX, " ") - .replace("\"", "\\\"") - ); + author + .setName( + author + .getName() + .replaceAll(NAME_CLEANING_REGEX, " ") + .replace("\"", "\\\"")); } if (StringUtils.isNotBlank(author.getSurname())) { - author.setSurname( - author.getSurname() - .replaceAll(NAME_CLEANING_REGEX, " ") - .replace("\"", "\\\"") - ); + author + .setSurname( + author + .getSurname() + .replaceAll(NAME_CLEANING_REGEX, " ") + .replace("\"", "\\\"")); } return author; diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java index c885f2aeb..84d49bd5c 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/DiffPatchMatch.java @@ -18,7 +18,6 @@ package eu.dnetlib.pace.util; * See the License for the specific language governing permissions and * limitations under the License. */ - import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.net.URLEncoder; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index d16a3a63d..175ebf8a6 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -1,13 +1,13 @@ package eu.dnetlib.dhp.oa.dedup; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import static org.apache.spark.sql.functions.col; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Objects; + import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -18,139 +18,141 @@ import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; import scala.Tuple3; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.Objects; - -import static org.apache.spark.sql.functions.col; - public class SparkPropagateRelation extends AbstractSparkAction { - private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); + private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class); - private static Encoder REL_BEAN_ENC = Encoders.bean(Relation.class); + private static Encoder REL_BEAN_ENC = Encoders.bean(Relation.class); - private static Encoder REL_KRYO_ENC = Encoders.kryo(Relation.class); + private static Encoder REL_KRYO_ENC = Encoders.kryo(Relation.class); - public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) { - super(parser, spark); - } + public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) { + super(parser, spark); + } - public static void main(String[] args) throws Exception { - ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - SparkPropagateRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); + public static void main(String[] args) throws Exception { + ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + SparkPropagateRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"))); - parser.parseArgument(args); + parser.parseArgument(args); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - new SparkPropagateRelation(parser, getSparkSession(conf)) - .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); - } + new SparkPropagateRelation(parser, getSparkSession(conf)) + .run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl"))); + } - @Override - public void run(ISLookUpService isLookUpService) { + @Override + public void run(ISLookUpService isLookUpService) { - final String graphBasePath = parser.get("graphBasePath"); - final String workingPath = parser.get("workingPath"); - final String graphOutputPath = parser.get("graphOutputPath"); + final String graphBasePath = parser.get("graphBasePath"); + final String workingPath = parser.get("workingPath"); + final String graphOutputPath = parser.get("graphOutputPath"); - log.info("graphBasePath: '{}'", graphBasePath); - log.info("workingPath: '{}'", workingPath); - log.info("graphOutputPath: '{}'", graphOutputPath); + log.info("graphBasePath: '{}'", graphBasePath); + log.info("workingPath: '{}'", workingPath); + log.info("graphOutputPath: '{}'", graphOutputPath); - final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation"); - removeOutputDir(spark, outputRelationPath); + final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation"); + removeOutputDir(spark, outputRelationPath); - Dataset mergeRels = spark - .read() - .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) - .as(REL_BEAN_ENC); + Dataset mergeRels = spark + .read() + .load(DedupUtility.createMergeRelPath(workingPath, "*", "*")) + .as(REL_BEAN_ENC); - // - Dataset mergedIds = mergeRels - .where(col("relClass").equalTo(ModelConstants.MERGES)) - .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) - .distinct() - .cache(); + // + Dataset mergedIds = mergeRels + .where(col("relClass").equalTo(ModelConstants.MERGES)) + .select(col("source").as("dedupID"), col("target").as("mergedObjectID")) + .distinct() + .cache(); - Dataset allRels = spark.read() - .schema(REL_BEAN_ENC.schema()) - .json(DedupUtility.createEntityPath(graphBasePath, "relation")); + Dataset allRels = spark + .read() + .schema(REL_BEAN_ENC.schema()) + .json(DedupUtility.createEntityPath(graphBasePath, "relation")); - Dataset dedupedRels = allRels - .joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .select("_1._1", "_1._2.dedupID", "_2.dedupID") - .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); + Dataset dedupedRels = allRels + .joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") + .joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") + .select("_1._1", "_1._2.dedupID", "_2.dedupID") + .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) + .flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); - Dataset processedRelations = distinctRelations(dedupedRels.union(mergeRels.map((MapFunction) r -> r, REL_KRYO_ENC))) - .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())); + Dataset processedRelations = distinctRelations( + dedupedRels.union(mergeRels.map((MapFunction) r -> r, REL_KRYO_ENC))) + .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())); - save(processedRelations, outputRelationPath, SaveMode.Overwrite); - } + save(processedRelations, outputRelationPath, SaveMode.Overwrite); + } - private static Iterator addInferredRelations(Tuple3 t) throws Exception { - Relation existingRel = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + private static Iterator addInferredRelations(Tuple3 t) throws Exception { + Relation existingRel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - if (newSource == null && newTarget == null) { - return Collections.singleton(t._1()).iterator(); - } + if (newSource == null && newTarget == null) { + return Collections.singleton(t._1()).iterator(); + } - // update existing relation - if (existingRel.getDataInfo() == null) { - existingRel.setDataInfo(new DataInfo()); - } - existingRel.getDataInfo().setDeletedbyinference(true); + // update existing relation + if (existingRel.getDataInfo() == null) { + existingRel.setDataInfo(new DataInfo()); + } + existingRel.getDataInfo().setDeletedbyinference(true); - // Create new relation inferred by dedupIDs - Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); + // Create new relation inferred by dedupIDs + Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); - inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); - inferredRel.getDataInfo().setDeletedbyinference(false); + inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); + inferredRel.getDataInfo().setDeletedbyinference(false); - if (newSource != null) - inferredRel.setSource(newSource); + if (newSource != null) + inferredRel.setSource(newSource); - if (newTarget != null) - inferredRel.setTarget(newTarget); + if (newTarget != null) + inferredRel.setTarget(newTarget); - return Arrays.asList(existingRel, inferredRel).iterator(); - } + return Arrays.asList(existingRel, inferredRel).iterator(); + } - private Dataset distinctRelations(Dataset rels) { - return rels - .filter(getRelationFilterFunction()) - .groupByKey( - (MapFunction) r -> String - .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), - Encoders.STRING()) - .reduceGroups((ReduceFunction) (b, a) -> { - b.mergeFrom(a); - return b; - } - ) - .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); - } + private Dataset distinctRelations(Dataset rels) { + return rels + .filter(getRelationFilterFunction()) + .groupByKey( + (MapFunction) r -> String + .join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), + Encoders.STRING()) + .reduceGroups((ReduceFunction) (b, a) -> { + b.mergeFrom(a); + return b; + }) + .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); + } - private FilterFunction getRelationFilterFunction() { - return r -> StringUtils.isNotBlank(r.getSource()) || - StringUtils.isNotBlank(r.getTarget()) || - StringUtils.isNotBlank(r.getRelType()) || - StringUtils.isNotBlank(r.getSubRelType()) || - StringUtils.isNotBlank(r.getRelClass()); - } + private FilterFunction getRelationFilterFunction() { + return r -> StringUtils.isNotBlank(r.getSource()) || + StringUtils.isNotBlank(r.getTarget()) || + StringUtils.isNotBlank(r.getRelType()) || + StringUtils.isNotBlank(r.getSubRelType()) || + StringUtils.isNotBlank(r.getRelClass()); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java index 1492c60fe..8d10508a9 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/GraphCleaningFunctionsTest.java @@ -371,7 +371,7 @@ public class GraphCleaningFunctionsTest { @Test public void testFilterProject() throws IOException { String json = IOUtils - .toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/project.json")); + .toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/project.json")); Project p_in = MAPPER.readValue(json, Project.class); Assertions.assertEquals(false, GraphCleaningFunctions.filter(p_in)); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 4c86da47e..61baf80dc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.oa.graph.group; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -19,13 +19,15 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; -import static org.junit.jupiter.api.Assertions.assertEquals; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest {