Merge branch 'beta' of https://code-repo.d4science.org/D-Net/dnet-hadoop into beta
This commit is contained in:
commit
8893389895
|
@ -23,6 +23,8 @@ import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import me.xuender.unidecode.Unidecode;
|
import me.xuender.unidecode.Unidecode;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.sql.Encoders;
|
||||||
|
|
||||||
public class GraphCleaningFunctions extends CleaningFunctions {
|
public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
|
|
||||||
|
@ -201,6 +203,13 @@ public class GraphCleaningFunctions extends CleaningFunctions {
|
||||||
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
|
.filter(sp -> StringUtils.isNotBlank(sp.getValue()))
|
||||||
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
.filter(sp -> Objects.nonNull(sp.getQualifier()))
|
||||||
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
.filter(sp -> StringUtils.isNotBlank(sp.getQualifier().getClassid()))
|
||||||
|
.map(s -> {
|
||||||
|
if ("dnet:result_subject".equals(s.getQualifier().getClassid())) {
|
||||||
|
s.getQualifier().setClassid(ModelConstants.DNET_SUBJECT_TYPOLOGIES);
|
||||||
|
s.getQualifier().setClassname(ModelConstants.DNET_SUBJECT_TYPOLOGIES);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
})
|
||||||
.map(GraphCleaningFunctions::cleanValue)
|
.map(GraphCleaningFunctions::cleanValue)
|
||||||
.collect(
|
.collect(
|
||||||
Collectors
|
Collectors
|
||||||
|
|
|
@ -1,100 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.merge;
|
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.FileReader;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
class AuthorMergerTest {
|
|
||||||
|
|
||||||
private String publicationsBasePath;
|
|
||||||
|
|
||||||
private List<List<Author>> authors;
|
|
||||||
|
|
||||||
@BeforeEach
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
|
|
||||||
publicationsBasePath = Paths
|
|
||||||
.get(AuthorMergerTest.class.getResource("/eu/dnetlib/dhp/oa/merge").toURI())
|
|
||||||
.toFile()
|
|
||||||
.getAbsolutePath();
|
|
||||||
|
|
||||||
authors = readSample(publicationsBasePath + "/publications_with_authors.json", Publication.class)
|
|
||||||
.stream()
|
|
||||||
.map(p -> p._2().getAuthor())
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
void mergeTest() { // used in the dedup: threshold set to 0.95
|
|
||||||
|
|
||||||
for (List<Author> authors1 : authors) {
|
|
||||||
System.out.println("List " + (authors.indexOf(authors1) + 1));
|
|
||||||
for (Author author : authors1) {
|
|
||||||
System.out.println(authorToString(author));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Author> merge = AuthorMerger.merge(authors);
|
|
||||||
|
|
||||||
System.out.println("Merge ");
|
|
||||||
for (Author author : merge) {
|
|
||||||
System.out.println(authorToString(author));
|
|
||||||
}
|
|
||||||
|
|
||||||
Assertions.assertEquals(7, merge.size());
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
|
|
||||||
List<Tuple2<String, T>> res = new ArrayList<>();
|
|
||||||
BufferedReader reader;
|
|
||||||
try {
|
|
||||||
reader = new BufferedReader(new FileReader(path));
|
|
||||||
String line = reader.readLine();
|
|
||||||
while (line != null) {
|
|
||||||
res
|
|
||||||
.add(
|
|
||||||
new Tuple2<>(
|
|
||||||
MapDocumentUtil.getJPathString("$.id", line),
|
|
||||||
new ObjectMapper().readValue(line, clazz)));
|
|
||||||
// read next line
|
|
||||||
line = reader.readLine();
|
|
||||||
}
|
|
||||||
reader.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String authorToString(Author a) {
|
|
||||||
|
|
||||||
String print = "Fullname = ";
|
|
||||||
print += a.getFullname() + " pid = [";
|
|
||||||
if (a.getPid() != null)
|
|
||||||
for (StructuredProperty sp : a.getPid()) {
|
|
||||||
print += sp.toComparableString() + " ";
|
|
||||||
}
|
|
||||||
print += "]";
|
|
||||||
return print;
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because one or more lines are too long
|
@ -228,10 +228,15 @@ public class PropagationConstant {
|
||||||
|
|
||||||
public static <R> Dataset<R> readPath(
|
public static <R> Dataset<R> readPath(
|
||||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
return spark
|
|
||||||
.read()
|
if (HdfsSupport.exists(inputPath, spark.sparkContext().hadoopConfiguration())) {
|
||||||
.textFile(inputPath)
|
return spark
|
||||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
.read()
|
||||||
|
.textFile(inputPath)
|
||||||
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
|
} else {
|
||||||
|
return spark.emptyDataset(Encoders.bean(clazz));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue