1
0
Fork 0

code formatting

This commit is contained in:
Claudio Atzori 2023-08-29 11:11:00 +02:00
parent 0515d81c7c
commit bf35280ea6
5 changed files with 173 additions and 163 deletions

View File

@ -226,19 +226,19 @@ public class GraphCleaningFunctions extends CleaningFunctions {
public static <T extends Oaf> boolean filter(T value) {
if (!(value instanceof Relation) && (Boolean.TRUE
.equals(
Optional
.ofNullable(value)
.map(
o -> Optional
.ofNullable(o.getDataInfo())
.map(
d -> Optional
.ofNullable(d.getInvisible())
.orElse(true))
.orElse(false))
.orElse(true)))) {
return true;
.equals(
Optional
.ofNullable(value)
.map(
o -> Optional
.ofNullable(o.getDataInfo())
.map(
d -> Optional
.ofNullable(d.getInvisible())
.orElse(true))
.orElse(false))
.orElse(true)))) {
return true;
}
if (value instanceof Datasource) {
@ -292,9 +292,10 @@ public class GraphCleaningFunctions extends CleaningFunctions {
} else if (value instanceof Result) {
Result r = (Result) value;
if (Objects.nonNull(r.getFulltext()) && (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
r.setFulltext(null);
if (Objects.nonNull(r.getFulltext())
&& (ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
r.setFulltext(null);
}
@ -326,10 +327,13 @@ public class GraphCleaningFunctions extends CleaningFunctions {
if (StringUtils.isBlank(r.getPublisher().getValue())) {
r.setPublisher(null);
} else {
r.getPublisher().setValue(
r.getPublisher().getValue()
.replaceAll(NAME_CLEANING_REGEX, " ")
);
r
.getPublisher()
.setValue(
r
.getPublisher()
.getValue()
.replaceAll(NAME_CLEANING_REGEX, " "));
}
}
if (Objects.isNull(r.getLanguage()) || StringUtils.isBlank(r.getLanguage().getClassid())) {
@ -498,8 +502,8 @@ public class GraphCleaningFunctions extends CleaningFunctions {
}
}
if (StringUtils.isNotBlank(i.getFulltext()) &&
(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()) ||
ModelConstants.DATASET_RESULTTYPE_CLASSID.equals(r.getResulttype().getClassid()))) {
i.setFulltext(null);
}
}
@ -623,25 +627,28 @@ public class GraphCleaningFunctions extends CleaningFunctions {
private static Author cleanupAuthor(Author author) {
if (StringUtils.isNotBlank(author.getFullname())) {
author.setFullname(
author.getFullname()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\"")
);
author
.setFullname(
author
.getFullname()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\""));
}
if (StringUtils.isNotBlank(author.getName())) {
author.setName(
author.getName()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\"")
);
author
.setName(
author
.getName()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\""));
}
if (StringUtils.isNotBlank(author.getSurname())) {
author.setSurname(
author.getSurname()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\"")
);
author
.setSurname(
author
.getSurname()
.replaceAll(NAME_CLEANING_REGEX, " ")
.replace("\"", "\\\""));
}
return author;

View File

@ -18,7 +18,6 @@ package eu.dnetlib.pace.util;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;

View File

@ -1,13 +1,13 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import static org.apache.spark.sql.functions.col;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
@ -18,139 +18,141 @@ import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import scala.Tuple2;
import scala.Tuple3;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import static org.apache.spark.sql.functions.col;
public class SparkPropagateRelation extends AbstractSparkAction {
private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);
private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);
private static Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
private static Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
private static Encoder<Relation> REL_KRYO_ENC = Encoders.kryo(Relation.class);
private static Encoder<Relation> REL_KRYO_ENC = Encoders.kryo(Relation.class);
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkPropagateRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
public static void main(String[] args) throws Exception {
ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
SparkPropagateRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
parser.parseArgument(args);
parser.parseArgument(args);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
new SparkPropagateRelation(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
new SparkPropagateRelation(parser, getSparkSession(conf))
.run(ISLookupClientFactory.getLookUpService(parser.get("isLookUpUrl")));
}
@Override
public void run(ISLookUpService isLookUpService) {
@Override
public void run(ISLookUpService isLookUpService) {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String graphOutputPath = parser.get("graphOutputPath");
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String graphOutputPath = parser.get("graphOutputPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("graphOutputPath: '{}'", graphOutputPath);
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("graphOutputPath: '{}'", graphOutputPath);
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark
.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);
Dataset<Relation> mergeRels = spark
.read()
.load(DedupUtility.createMergeRelPath(workingPath, "*", "*"))
.as(REL_BEAN_ENC);
// <mergedObjectID, dedupID>
Dataset<Row> mergedIds = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct()
.cache();
// <mergedObjectID, dedupID>
Dataset<Row> mergedIds = mergeRels
.where(col("relClass").equalTo(ModelConstants.MERGES))
.select(col("source").as("dedupID"), col("target").as("mergedObjectID"))
.distinct()
.cache();
Dataset<Row> allRels = spark.read()
.schema(REL_BEAN_ENC.schema())
.json(DedupUtility.createEntityPath(graphBasePath, "relation"));
Dataset<Row> allRels = spark
.read()
.schema(REL_BEAN_ENC.schema())
.json(DedupUtility.createEntityPath(graphBasePath, "relation"));
Dataset<Relation> dedupedRels = allRels
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC);
Dataset<Relation> dedupedRels = allRels
.joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer")
.select("_1._1", "_1._2.dedupID", "_2.dedupID")
.as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING()))
.flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC);
Dataset<Relation> processedRelations = distinctRelations(dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
Dataset<Relation> processedRelations = distinctRelations(
dedupedRels.union(mergeRels.map((MapFunction<Relation, Relation>) r -> r, REL_KRYO_ENC)))
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget()));
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
}
save(processedRelations, outputRelationPath, SaveMode.Overwrite);
}
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
Relation existingRel = t._1();
String newSource = t._2();
String newTarget = t._3();
private static Iterator<Relation> addInferredRelations(Tuple3<Relation, String, String> t) throws Exception {
Relation existingRel = t._1();
String newSource = t._2();
String newTarget = t._3();
if (newSource == null && newTarget == null) {
return Collections.singleton(t._1()).iterator();
}
if (newSource == null && newTarget == null) {
return Collections.singleton(t._1()).iterator();
}
// update existing relation
if (existingRel.getDataInfo() == null) {
existingRel.setDataInfo(new DataInfo());
}
existingRel.getDataInfo().setDeletedbyinference(true);
// update existing relation
if (existingRel.getDataInfo() == null) {
existingRel.setDataInfo(new DataInfo());
}
existingRel.getDataInfo().setDeletedbyinference(true);
// Create new relation inferred by dedupIDs
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
// Create new relation inferred by dedupIDs
Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel);
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
inferredRel.getDataInfo().setDeletedbyinference(false);
inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo()));
inferredRel.getDataInfo().setDeletedbyinference(false);
if (newSource != null)
inferredRel.setSource(newSource);
if (newSource != null)
inferredRel.setSource(newSource);
if (newTarget != null)
inferredRel.setTarget(newTarget);
if (newTarget != null)
inferredRel.setTarget(newTarget);
return Arrays.asList(existingRel, inferredRel).iterator();
}
return Arrays.asList(existingRel, inferredRel).iterator();
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
.groupByKey(
(MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.reduceGroups((ReduceFunction<Relation>) (b, a) -> {
b.mergeFrom(a);
return b;
}
)
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
}
private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
return rels
.filter(getRelationFilterFunction())
.groupByKey(
(MapFunction<Relation, String>) r -> String
.join(" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()),
Encoders.STRING())
.reduceGroups((ReduceFunction<Relation>) (b, a) -> {
b.mergeFrom(a);
return b;
})
.map((MapFunction<Tuple2<String, Relation>, Relation>) Tuple2::_2, REL_BEAN_ENC);
}
private FilterFunction<Relation> getRelationFilterFunction() {
return r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelType()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
}
private FilterFunction<Relation> getRelationFilterFunction() {
return r -> StringUtils.isNotBlank(r.getSource()) ||
StringUtils.isNotBlank(r.getTarget()) ||
StringUtils.isNotBlank(r.getRelType()) ||
StringUtils.isNotBlank(r.getSubRelType()) ||
StringUtils.isNotBlank(r.getRelClass());
}
}

View File

@ -371,7 +371,7 @@ public class GraphCleaningFunctionsTest {
@Test
public void testFilterProject() throws IOException {
String json = IOUtils
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/project.json"));
.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/oa/graph/clean/project.json"));
Project p_in = MAPPER.readValue(json, Project.class);
Assertions.assertEquals(false, GraphCleaningFunctions.filter(p_in));

View File

@ -1,14 +1,14 @@
package eu.dnetlib.dhp.oa.graph.group;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -19,13 +19,15 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import static org.junit.jupiter.api.Assertions.assertEquals;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob;
import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.utils.DHPUtils;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class GroupEntitiesSparkJobTest {