Compare commits

...

5 Commits

27 changed files with 148 additions and 84 deletions

View File

@ -195,10 +195,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(3432, orgs_simrel);
assertEquals(7152, pubs_simrel);
assertEquals(344, sw_simrel);
assertEquals(6944, pubs_simrel);
assertEquals(318, sw_simrel);
assertEquals(458, ds_simrel);
assertEquals(6750, orp_simrel);
assertEquals(6746, orp_simrel);
}
@Test
@ -344,10 +344,10 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(1276, orgs_mergerel);
assertEquals(1442, pubs_mergerel);
assertEquals(288, sw_mergerel);
assertEquals(1418, pubs_mergerel);
assertEquals(276, sw_mergerel);
assertEquals(472, ds_mergerel);
assertEquals(718, orp_mergerel);
assertEquals(716, orp_mergerel);
}
@Test
@ -391,8 +391,8 @@ public class SparkDedupTest implements Serializable {
.count();
assertEquals(82, orgs_deduprecord);
assertEquals(66, pubs_deduprecord);
assertEquals(51, sw_deduprecord);
assertEquals(65, pubs_deduprecord);
assertEquals(50, sw_deduprecord);
assertEquals(96, ds_deduprecord);
assertEquals(89, orp_deduprecord);
}
@ -473,11 +473,11 @@ public class SparkDedupTest implements Serializable {
.distinct()
.count();
assertEquals(897, publications);
assertEquals(896, publications);
assertEquals(835, organizations);
assertEquals(100, projects);
assertEquals(100, datasource);
assertEquals(200, softwares);
assertEquals(199, softwares);
assertEquals(388, dataset);
assertEquals(517, otherresearchproduct);
@ -533,7 +533,7 @@ public class SparkDedupTest implements Serializable {
long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
assertEquals(4866, relations);
assertEquals(4828, relations);
// check deletedbyinference
final Dataset<Relation> mergeRels = spark

View File

@ -168,10 +168,10 @@ public class SparkStatsTest implements Serializable {
.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_blockstats")
.count();
assertEquals(121, orgs_blocks);
assertEquals(110, pubs_blocks);
assertEquals(21, sw_blocks);
assertEquals(67, ds_blocks);
assertEquals(55, orp_blocks);
assertEquals(549, orgs_blocks);
assertEquals(868, pubs_blocks);
assertEquals(473, sw_blocks);
assertEquals(523, ds_blocks);
assertEquals(756, orp_blocks);
}
}

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree" : {

View File

@ -29,8 +29,7 @@
},
"pace": {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

View File

@ -17,8 +17,7 @@
},
"pace" : {
"clustering" : [
{ "name" : "ngrampairs", "fields" : [ "title" ], "params" : { "max" : "1", "ngramLen" : "3"} },
{ "name" : "suffixprefix", "fields" : [ "title" ], "params" : { "max" : "1", "len" : "3" } },
{ "name" : "wordsStatsSuffixPrefixChain", "fields" : [ "title" ], "params" : { "mod" : "10" } },
{ "name" : "lowercase", "fields" : [ "doi" ], "params" : { } }
],
"decisionTree": {

View File

@ -71,6 +71,11 @@
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-workflows-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.graph.clean;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.BufferedInputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
@ -22,10 +21,8 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.*;
import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;

View File

@ -1,7 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.*;
@ -12,7 +12,7 @@ import org.dom4j.DocumentFactory;
import org.dom4j.DocumentHelper;
import org.dom4j.Node;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.LicenseComparator;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.*;

View File

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;

View File

@ -1,15 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.asString;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.dataInfo;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.journal;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listFields;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.listKeyValues;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.qualifier;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASET_DEFAULT_RESULTTYPE;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DATASOURCE_ORGANIZATION;
import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
@ -52,8 +44,8 @@ import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.DbClient;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -18,7 +16,7 @@ import org.dom4j.Node;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -1,9 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.createOpenaireId;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.field;
import static eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils.structuredProperty;
import static eu.dnetlib.dhp.common.OafMapperUtils.*;
import static eu.dnetlib.dhp.schema.common.ModelConstants.*;
import java.util.ArrayList;
@ -17,7 +15,7 @@ import org.dom4j.Document;
import org.dom4j.Node;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Field;

View File

@ -18,7 +18,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.CleaningRuleMap;
import eu.dnetlib.dhp.common.OafCleaner;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;

View File

@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.Dataset;

View File

@ -27,8 +27,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.common.OafMapperUtils;
import eu.dnetlib.dhp.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.provision;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
@ -20,8 +21,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
@ -31,7 +30,8 @@ public class PrepareRelationsJobTest {
public static final String SUBRELTYPE = "subRelType";
public static final String OUTCOME = "outcome";
public static final String SUPPLEMENT = "supplement";
public static final String PARTICIPATION = "participation";
public static final String AFFILIATION = "affiliation";
private static SparkSession spark;
@ -64,7 +64,7 @@ public class PrepareRelationsJobTest {
@Test
public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception {
final int maxRelations = 10;
final int maxRelations = 20;
PrepareRelationsJob
.main(
new String[] {
@ -73,7 +73,8 @@ public class PrepareRelationsJobTest {
"-outputPath", testPath.toString(),
"-relPartitions", "10",
"-relationFilter", "asd",
"-maxRelations", String.valueOf(maxRelations)
"-sourceMaxRelations", String.valueOf(maxRelations),
"-targetMaxRelations", String.valueOf(maxRelations * 100)
});
Dataset<Relation> out = spark
@ -82,19 +83,31 @@ public class PrepareRelationsJobTest {
.as(Encoders.bean(Relation.class))
.cache();
Assertions.assertEquals(10, out.count());
Assertions.assertEquals(maxRelations, out.count());
Dataset<Row> freq = out
.toDF()
.cube(SUBRELTYPE)
.count()
.filter((FilterFunction<Row>) value -> !value.isNullAt(0));
long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count");
long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count");
Assertions.assertTrue(outcome > supplement);
log.info(freq.collectAsList().toString());
long outcome = getRows(freq, OUTCOME).get(0).getAs("count");
long participation = getRows(freq, PARTICIPATION).get(0).getAs("count");
long affiliation = getRows(freq, AFFILIATION).get(0).getAs("count");
Assertions.assertTrue(participation == outcome);
Assertions.assertTrue(outcome > affiliation);
Assertions.assertTrue(participation > affiliation);
Assertions.assertEquals(7, outcome);
Assertions.assertEquals(3, supplement);
Assertions.assertEquals(7, participation);
Assertions.assertEquals(6, affiliation);
}
protected List<Row> getRows(Dataset<Row> freq, String col) {
return freq.filter(freq.col(SUBRELTYPE).equalTo(col)).collectAsList();
}
}

View File

@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dhp-workflows</artifactId>
<groupId>eu.dnetlib.dhp</groupId>
<version>1.2.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>dhp-workflows-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-schemas</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.HashMap;
@ -7,7 +7,6 @@ import java.util.HashMap;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier;

View File

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.common;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
public class GraphSupport {
private static final Logger log = LoggerFactory.getLogger(GraphSupport.class);
private static <T extends Oaf> void saveGraphTable(Dataset<T> dataset, Class<T> clazz, String outputGraph,
eu.dnetlib.dhp.common.SaveMode saveMode) {
log.info("saving graph in {} mode to {}", outputGraph, saveMode.toString());
final DataFrameWriter<T> writer = dataset.write().mode(SaveMode.Overwrite);
switch (saveMode) {
case JSON:
writer.option("compression", "gzip").json(outputGraph);
break;
case PARQUET:
final String db_table = ModelSupport.tableIdentifier(outputGraph, clazz);
writer.saveAsTable(db_table);
break;
}
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.clean;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.lang.reflect.Field;

View File

@ -1,11 +1,7 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
@ -13,15 +9,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.ExtraInfo;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.Journal;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OAIProvenance;
import eu.dnetlib.dhp.schema.oaf.OriginDescription;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.DHPUtils;
public class OafMapperUtils {

View File

@ -0,0 +1,8 @@
package eu.dnetlib.dhp.common;
public enum SaveMode {
JSON, PARQUET
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.HashMap;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.io.Serializable;
import java.util.*;

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.oa.graph.raw.common;
package eu.dnetlib.dhp.common;
import java.io.Serializable;

View File

@ -17,6 +17,7 @@
<modules>
<module>dhp-workflow-profiles</module>
<module>dhp-workflows-common</module>
<module>dhp-aggregation</module>
<module>dhp-distcp</module>
<module>dhp-actionmanager</module>