forked from D-Net/dnet-hadoop
minor changes and bug fix
This commit is contained in:
parent
28c1cdd132
commit
2355cc4e9b
|
@ -13,6 +13,11 @@ public class OrganizationPidComparator implements Comparator<StructuredProperty>
|
||||||
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
|
PidType lClass = PidType.tryValueOf(left.getQualifier().getClassid());
|
||||||
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
|
PidType rClass = PidType.tryValueOf(right.getQualifier().getClassid());
|
||||||
|
|
||||||
|
if (lClass.equals(PidType.openorgs))
|
||||||
|
return -1;
|
||||||
|
if (rClass.equals(PidType.openorgs))
|
||||||
|
return 1;
|
||||||
|
|
||||||
if (lClass.equals(PidType.GRID))
|
if (lClass.equals(PidType.GRID))
|
||||||
return -1;
|
return -1;
|
||||||
if (rClass.equals(PidType.GRID))
|
if (rClass.equals(PidType.GRID))
|
||||||
|
|
|
@ -9,7 +9,7 @@ public enum PidType {
|
||||||
doi, pmid, pmc, handle, arXiv, nct, pdb,
|
doi, pmid, pmc, handle, arXiv, nct, pdb,
|
||||||
|
|
||||||
// Organization
|
// Organization
|
||||||
GRID, mag_id, urn,
|
openorgs, corda, corda_h2020, GRID, mag_id, urn,
|
||||||
|
|
||||||
// Used by dedup
|
// Used by dedup
|
||||||
undefined, original;
|
undefined, original;
|
||||||
|
|
|
@ -99,6 +99,10 @@ abstract class AbstractSparkAction implements Serializable {
|
||||||
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
|
dataset.write().option("compression", "gzip").mode(mode).json(outPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected static <T> void saveParquet(Dataset<T> dataset, String outPath, SaveMode mode) {
|
||||||
|
dataset.write().option("compression", "gzip").mode(mode).parquet(outPath);
|
||||||
|
}
|
||||||
|
|
||||||
protected static void removeOutputDir(SparkSession spark, String path) {
|
protected static void removeOutputDir(SparkSession spark, String path) {
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class DedupRecordFactory {
|
||||||
t -> {
|
t -> {
|
||||||
T duplicate = t._2();
|
T duplicate = t._2();
|
||||||
|
|
||||||
// prepare the list of pids to use for the id generation
|
// prepare the list of pids to be used for the id generation
|
||||||
bestPids.add(Identifier.newInstance(duplicate));
|
bestPids.add(Identifier.newInstance(duplicate));
|
||||||
|
|
||||||
entity.mergeFrom(duplicate);
|
entity.mergeFrom(duplicate);
|
||||||
|
|
|
@ -36,7 +36,14 @@ public class IdGenerator implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String dedupify(String ns) {
|
private static String dedupify(String ns) {
|
||||||
StringBuilder prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
|
|
||||||
|
StringBuilder prefix;
|
||||||
|
if (PidType.valueOf(substringBefore(ns, "_")) == PidType.openorgs) {
|
||||||
|
prefix = new StringBuilder(substringBefore(ns, "_"));
|
||||||
|
} else {
|
||||||
|
prefix = new StringBuilder(substringBefore(ns, "_")).append("_dedup");
|
||||||
|
}
|
||||||
|
|
||||||
while (prefix.length() < 12) {
|
while (prefix.length() < 12) {
|
||||||
prefix.append("_");
|
prefix.append("_");
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,13 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.config.DedupConfig;
|
import eu.dnetlib.pace.config.DedupConfig;
|
||||||
|
import net.sf.saxon.ma.trie.Tuple2;
|
||||||
|
|
||||||
//copy simrels (verified) from relation to the workdir in order to make them available for the deduplication
|
|
||||||
public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkCopyOpenorgsMergeRels.class);
|
||||||
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
|
public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup";
|
||||||
|
@ -84,24 +85,32 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
.map(patchRelFn(), Encoders.bean(Relation.class))
|
.map(patchRelFn(), Encoders.bean(Relation.class))
|
||||||
.toJavaRDD()
|
.toJavaRDD()
|
||||||
.filter(this::isOpenorgs)
|
.filter(this::isOpenorgs)
|
||||||
.filter(this::filterOpenorgsRels)
|
.filter(this::filterOpenorgsRels);
|
||||||
.filter(this::excludeOpenorgsMesh)
|
|
||||||
.filter(this::excludeNonOpenorgs); // excludes relations with no openorgs id involved
|
JavaRDD<Relation> selfRawRels = rawRels
|
||||||
|
.map(r -> r.getSource())
|
||||||
|
.distinct()
|
||||||
|
.map(s -> rel(s, s, "isSimilarTo", dedupConf));
|
||||||
|
|
||||||
log.info("Number of raw Openorgs Relations collected: {}", rawRels.count());
|
log.info("Number of raw Openorgs Relations collected: {}", rawRels.count());
|
||||||
|
|
||||||
// turn openorgs isSimilarTo relations into mergerels
|
// turn openorgs isSimilarTo relations into mergerels
|
||||||
JavaRDD<Relation> mergeRelsRDD = rawRels.flatMap(rel -> {
|
JavaRDD<Relation> mergeRelsRDD = rawRels
|
||||||
List<Relation> mergerels = new ArrayList<>();
|
.union(selfRawRels)
|
||||||
|
.map(r -> {
|
||||||
|
r.setSource(createDedupID(r.getSource())); // create the dedup_id to align it to the openaire dedup
|
||||||
|
// format
|
||||||
|
return r;
|
||||||
|
})
|
||||||
|
.flatMap(rel -> {
|
||||||
|
|
||||||
String openorgsId = rel.getSource().contains("openorgs____") ? rel.getSource() : rel.getTarget();
|
List<Relation> mergerels = new ArrayList<>();
|
||||||
String mergedId = rel.getSource().contains("openorgs____") ? rel.getTarget() : rel.getSource();
|
|
||||||
|
|
||||||
mergerels.add(rel(openorgsId, mergedId, "merges", dedupConf));
|
mergerels.add(rel(rel.getSource(), rel.getTarget(), "merges", dedupConf));
|
||||||
mergerels.add(rel(mergedId, openorgsId, "isMergedIn", dedupConf));
|
mergerels.add(rel(rel.getTarget(), rel.getSource(), "isMergedIn", dedupConf));
|
||||||
|
|
||||||
return mergerels.iterator();
|
return mergerels.iterator();
|
||||||
});
|
});
|
||||||
|
|
||||||
log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count());
|
log.info("Number of Openorgs Merge Relations created: {}", mergeRelsRDD.count());
|
||||||
|
|
||||||
|
@ -144,22 +153,6 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean excludeOpenorgsMesh(Relation rel) {
|
|
||||||
|
|
||||||
if (rel.getSource().contains("openorgsmesh") || rel.getTarget().contains("openorgsmesh")) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean excludeNonOpenorgs(Relation rel) {
|
|
||||||
|
|
||||||
if (rel.getSource().contains("openorgs____") || rel.getTarget().contains("openorgs____")) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
|
private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) {
|
||||||
|
|
||||||
String entityType = dedupConf.getWf().getEntityType();
|
String entityType = dedupConf.getWf().getEntityType();
|
||||||
|
@ -189,4 +182,10 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
|
||||||
r.setDataInfo(info);
|
r.setDataInfo(info);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String createDedupID(String id) {
|
||||||
|
|
||||||
|
String prefix = id.split("\\|")[0];
|
||||||
|
return prefix + "|dedup_wf_001::" + DHPUtils.md5(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,7 +82,7 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
|
||||||
.map(patchRelFn(), Encoders.bean(Relation.class))
|
.map(patchRelFn(), Encoders.bean(Relation.class))
|
||||||
.filter(this::filterOpenorgsRels);
|
.filter(this::filterOpenorgsRels);
|
||||||
|
|
||||||
save(rawRels, outputPath, SaveMode.Append);
|
saveParquet(rawRels, outputPath, SaveMode.Append);
|
||||||
|
|
||||||
log.info("Copied " + rawRels.count() + " Similarity Relations");
|
log.info("Copied " + rawRels.count() + " Similarity Relations");
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class SparkCreateSimRels extends AbstractSparkAction {
|
||||||
.rdd(),
|
.rdd(),
|
||||||
Encoders.bean(Relation.class));
|
Encoders.bean(Relation.class));
|
||||||
|
|
||||||
save(simRels, outputPath, SaveMode.Append);
|
saveParquet(simRels, outputPath, SaveMode.Append);
|
||||||
|
|
||||||
log.info("Generated " + simRels.count() + " Similarity Relations");
|
log.info("Generated " + simRels.count() + " Similarity Relations");
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
@ -22,11 +23,10 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
@ -103,12 +103,22 @@ public class SparkUpdateEntity extends AbstractSparkAction {
|
||||||
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
|
MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
|
||||||
JavaRDD<String> map = entitiesWithId
|
JavaRDD<String> map = entitiesWithId
|
||||||
.leftOuterJoin(mergedIds)
|
.leftOuterJoin(mergedIds)
|
||||||
.map(
|
.map(k -> {
|
||||||
k -> k._2()._2().isPresent()
|
if (k._2()._2().isPresent()) {
|
||||||
? updateDeletedByInference(k._2()._1(), clazz)
|
return updateDeletedByInference(k._2()._1(), clazz);
|
||||||
: k._2()._1());
|
}
|
||||||
|
return k._2()._1();
|
||||||
|
});
|
||||||
|
|
||||||
|
if (type == EntityType.organization) // exclude openorgs with deletedbyinference=true
|
||||||
|
map = map.filter(it -> {
|
||||||
|
Organization org = OBJECT_MAPPER.readValue(it, Organization.class);
|
||||||
|
return !org.getId().contains("openorgs____") || (org.getId().contains("openorgs____")
|
||||||
|
&& !org.getDataInfo().getDeletedbyinference());
|
||||||
|
});
|
||||||
|
|
||||||
sourceEntity = map.union(sc.textFile(dedupRecordPath));
|
sourceEntity = map.union(sc.textFile(dedupRecordPath));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
|
sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
|
||||||
|
|
|
@ -83,7 +83,7 @@
|
||||||
</configuration>
|
</configuration>
|
||||||
</global>
|
</global>
|
||||||
|
|
||||||
<start to="resetWorkingPath"/>
|
<start to="testOpenorgs"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
@ -98,16 +98,14 @@
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<!--<action name="testOpenorgs">-->
|
<action name="testOpenorgs">
|
||||||
<!--<fs>-->
|
<fs>
|
||||||
<!--<delete path="${workingPath}/${actionSetIdOpenorgs}/organization_simrel"/>-->
|
<delete path="${workingPath}/${actionSetIdOpenorgs}"/>
|
||||||
<!--<delete path="${workingPath}/${actionSetIdOpenorgs}/organization_mergerel"/>-->
|
<delete path="${dedupGraphPath}"/>
|
||||||
<!--<delete path="${workingPath}/${actionSetIdOpenorgs}/organization_deduprecord"/>-->
|
</fs>
|
||||||
<!--<delete path="${dedupGraphPath}"/>-->
|
<ok to="CopyOpenorgsMergeRels"/>
|
||||||
<!--</fs>-->
|
<error to="Kill"/>
|
||||||
<!--<ok to="CopyOpenorgsMergeRels"/>-->
|
</action>
|
||||||
<!--<error to="Kill"/>-->
|
|
||||||
<!--</action>-->
|
|
||||||
|
|
||||||
<action name="CreateSimRel">
|
<action name="CreateSimRel">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
@ -213,17 +211,16 @@
|
||||||
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
|
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
|
||||||
<arg>--numPartitions</arg><arg>8000</arg>
|
<arg>--numPartitions</arg><arg>8000</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="CopyOpenorgs"/>
|
<ok to="CreateOrgsDedupRecord"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
<!-- copy openorgs to the working dir (in the organization_deduprecord dir)-->
|
<action name="CreateOrgsDedupRecord">
|
||||||
<action name="CopyOpenorgs">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Copy Openorgs Entities</name>
|
<name>Create Organizations Dedup Records</name>
|
||||||
<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgs</class>
|
<class>eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord</class>
|
||||||
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory=${sparkExecutorMemory}
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
@ -237,12 +234,40 @@
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--isLookUpUrl</arg><arg>${isLookUpUrl}</arg>
|
||||||
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
|
<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="UpdateEntity"/>
|
<ok to="UpdateEntity"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
</action>
|
</action>
|
||||||
|
|
||||||
|
<!--TODO replace with job for the creation of deduprecord for openorgs organizations -->
|
||||||
|
<!-- copy openorgs to the working dir (in the organization_deduprecord dir)-->
|
||||||
|
<!--<action name="CopyOpenorgs">-->
|
||||||
|
<!--<spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||||
|
<!--<master>yarn</master>-->
|
||||||
|
<!--<mode>cluster</mode>-->
|
||||||
|
<!--<name>Copy Openorgs Entities</name>-->
|
||||||
|
<!--<class>eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgs</class>-->
|
||||||
|
<!--<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>-->
|
||||||
|
<!--<spark-opts>-->
|
||||||
|
<!----executor-memory=${sparkExecutorMemory}-->
|
||||||
|
<!----executor-cores=${sparkExecutorCores}-->
|
||||||
|
<!----driver-memory=${sparkDriverMemory}-->
|
||||||
|
<!----conf spark.extraListeners=${spark2ExtraListeners}-->
|
||||||
|
<!----conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
|
||||||
|
<!----conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
|
||||||
|
<!----conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
|
||||||
|
<!----conf spark.sql.shuffle.partitions=3840-->
|
||||||
|
<!--</spark-opts>-->
|
||||||
|
<!--<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>-->
|
||||||
|
<!--<arg>--workingPath</arg><arg>${workingPath}</arg>-->
|
||||||
|
<!--<arg>--actionSetId</arg><arg>${actionSetIdOpenorgs}</arg>-->
|
||||||
|
<!--</spark>-->
|
||||||
|
<!--<ok to="UpdateEntity"/>-->
|
||||||
|
<!--<error to="Kill"/>-->
|
||||||
|
<!--</action>-->
|
||||||
|
|
||||||
<action name="UpdateEntity">
|
<action name="UpdateEntity">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<master>yarn</master>
|
<master>yarn</master>
|
||||||
|
|
|
@ -112,7 +112,7 @@ public class EntityMergerTest implements Serializable {
|
||||||
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
|
assertEquals("2018-09-30", pub_merged.getDateofacceptance().getValue());
|
||||||
|
|
||||||
// verify authors
|
// verify authors
|
||||||
assertEquals(9, pub_merged.getAuthor().size());
|
assertEquals(13, pub_merged.getAuthor().size());
|
||||||
assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor()));
|
assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor()));
|
||||||
|
|
||||||
// verify title
|
// verify title
|
||||||
|
|
|
@ -36,6 +36,8 @@ public class IdGeneratorTest {
|
||||||
private static List<Identifier<Publication>> bestIds2;
|
private static List<Identifier<Publication>> bestIds2;
|
||||||
private static List<Identifier<Publication>> bestIds3;
|
private static List<Identifier<Publication>> bestIds3;
|
||||||
|
|
||||||
|
private static List<Identifier<Organization>> bestIdsOrg;
|
||||||
|
|
||||||
private static String testEntityBasePath;
|
private static String testEntityBasePath;
|
||||||
|
|
||||||
@BeforeAll
|
@BeforeAll
|
||||||
|
@ -48,6 +50,8 @@ public class IdGeneratorTest {
|
||||||
bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
|
bestIds = createBestIds(testEntityBasePath + "/publication_idgeneration.json", Publication.class);
|
||||||
bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class);
|
bestIds2 = createBestIds(testEntityBasePath + "/publication_idgeneration2.json", Publication.class);
|
||||||
bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class);
|
bestIds3 = createBestIds(testEntityBasePath + "/publication_idgeneration3.json", Publication.class);
|
||||||
|
|
||||||
|
bestIdsOrg = createBestIds(testEntityBasePath + "/organization_idgeneration.json", Organization.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -76,6 +80,13 @@ public class IdGeneratorTest {
|
||||||
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
|
assertEquals("50|dedup_wf_001::0829b5191605bdbea36d6502b8c1ce1g", id2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void generateIdOrganizationTest() {
|
||||||
|
String id1 = IdGenerator.generate(bestIdsOrg, "20|defaultID");
|
||||||
|
|
||||||
|
assertEquals("20|openorgs____::599c15a70fcb03be6ba08f75f14d6076", id1);
|
||||||
|
}
|
||||||
|
|
||||||
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
|
protected static <T extends OafEntity> List<Identifier<T>> createBestIds(String path, Class<T> clazz) {
|
||||||
final Stream<Identifier<T>> ids = readSample(path, clazz)
|
final Stream<Identifier<T>> ids = readSample(path, clazz)
|
||||||
.stream()
|
.stream()
|
||||||
|
|
|
@ -174,27 +174,27 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
long orgs_simrel = spark
|
long orgs_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "organization"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
long pubs_simrel = spark
|
long pubs_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel")
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "publication"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
long sw_simrel = spark
|
long sw_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/software_simrel")
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "software"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
long ds_simrel = spark
|
long ds_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel")
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "dataset"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
long orp_simrel = spark
|
long orp_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
|
.load(DedupUtility.createSimRelPath(testOutputBasePath, testActionSetId, "otherresearchproduct"))
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(3082, orgs_simrel);
|
assertEquals(3082, orgs_simrel);
|
||||||
|
@ -204,6 +204,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
assertEquals(6750, orp_simrel);
|
assertEquals(6750, orp_simrel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Disabled
|
||||||
@Test
|
@Test
|
||||||
@Order(2)
|
@Order(2)
|
||||||
public void collectSimRelsTest() throws Exception {
|
public void collectSimRelsTest() throws Exception {
|
||||||
|
@ -254,9 +255,15 @@ public class SparkDedupTest implements Serializable {
|
||||||
|
|
||||||
long orp_simrel = spark
|
long orp_simrel = spark
|
||||||
.read()
|
.read()
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
|
.json(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
|
// System.out.println("orgs_simrel = " + orgs_simrel);
|
||||||
|
// System.out.println("pubs_simrel = " + pubs_simrel);
|
||||||
|
// System.out.println("sw_simrel = " + sw_simrel);
|
||||||
|
// System.out.println("ds_simrel = " + ds_simrel);
|
||||||
|
// System.out.println("orp_simrel = " + orp_simrel);
|
||||||
|
|
||||||
assertEquals(3672, orgs_simrel);
|
assertEquals(3672, orgs_simrel);
|
||||||
assertEquals(10459, pubs_simrel);
|
assertEquals(10459, pubs_simrel);
|
||||||
assertEquals(3767, sw_simrel);
|
assertEquals(3767, sw_simrel);
|
||||||
|
@ -456,7 +463,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(84, orgs_deduprecord);
|
assertEquals(85, orgs_deduprecord);
|
||||||
assertEquals(65, pubs_deduprecord);
|
assertEquals(65, pubs_deduprecord);
|
||||||
assertEquals(51, sw_deduprecord);
|
assertEquals(51, sw_deduprecord);
|
||||||
assertEquals(97, ds_deduprecord);
|
assertEquals(97, ds_deduprecord);
|
||||||
|
@ -540,7 +547,7 @@ public class SparkDedupTest implements Serializable {
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(896, publications);
|
assertEquals(896, publications);
|
||||||
assertEquals(837, organizations);
|
assertEquals(838, organizations);
|
||||||
assertEquals(100, projects);
|
assertEquals(100, projects);
|
||||||
assertEquals(100, datasource);
|
assertEquals(100, datasource);
|
||||||
assertEquals(200, softwares);
|
assertEquals(200, softwares);
|
||||||
|
|
|
@ -110,6 +110,7 @@ public class SparkOpenorgsTest implements Serializable {
|
||||||
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
"/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Disabled
|
||||||
@Test
|
@Test
|
||||||
public void copyOpenorgsTest() throws Exception {
|
public void copyOpenorgsTest() throws Exception {
|
||||||
|
|
||||||
|
@ -162,7 +163,7 @@ public class SparkOpenorgsTest implements Serializable {
|
||||||
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
.load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(6, orgs_mergerel);
|
assertEquals(384, orgs_mergerel);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -191,7 +192,7 @@ public class SparkOpenorgsTest implements Serializable {
|
||||||
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
|
.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_simrel")
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
assertEquals(96, orgs_simrel);
|
assertEquals(73, orgs_simrel);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
File diff suppressed because one or more lines are too long
|
@ -163,14 +163,25 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
.execute(
|
.execute(
|
||||||
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
"queryProjectOrganization.sql", smdbe::processProjectOrganization, verifyNamespacePrefix);
|
||||||
break;
|
break;
|
||||||
case openorgs:
|
case openorgs_dedup:
|
||||||
log.info("Processing Openorgs...");
|
log.info("Processing Openorgs...");
|
||||||
smdbe
|
smdbe
|
||||||
.execute(
|
.execute(
|
||||||
"queryOrganizationsFromOpenOrgsDB.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
"queryOpenOrgsForOrgsDedup.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
log.info("Processing Openorgs Merge Rels...");
|
log.info("Processing Openorgs Merge Rels...");
|
||||||
smdbe.execute("querySimilarityFromOpenOrgsDB.sql", smdbe::processOrgOrgSimRels);
|
smdbe.execute("queryOpenOrgsSimilarityForOrgsDedup.sql", smdbe::processOrgOrgSimRels);
|
||||||
|
|
||||||
|
break;
|
||||||
|
|
||||||
|
case openorgs:
|
||||||
|
log.info("Processing Openorgs For Provision...");
|
||||||
|
smdbe
|
||||||
|
.execute(
|
||||||
|
"queryOpenOrgsForProvision.sql", smdbe::processOrganization, verifyNamespacePrefix);
|
||||||
|
|
||||||
|
log.info("Processing Openorgs Merge Rels...");
|
||||||
|
smdbe.execute("queryOpenOrgsSimilarityForProvision.sql", smdbe::processOrgOrgSimRels);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -647,17 +658,19 @@ public class MigrateDbEntitiesApplication extends AbstractMigrationApplication i
|
||||||
r1.setDataInfo(info);
|
r1.setDataInfo(info);
|
||||||
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
r1.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||||
|
|
||||||
final Relation r2 = new Relation();
|
// removed because there's no difference between two sides //TODO
|
||||||
r2.setRelType(ORG_ORG_RELTYPE);
|
// final Relation r2 = new Relation();
|
||||||
r2.setSubRelType(ORG_ORG_SUBRELTYPE);
|
// r2.setRelType(ORG_ORG_RELTYPE);
|
||||||
r2.setRelClass(relClass);
|
// r2.setSubRelType(ORG_ORG_SUBRELTYPE);
|
||||||
r2.setSource(orgId2);
|
// r2.setRelClass(relClass);
|
||||||
r2.setTarget(orgId1);
|
// r2.setSource(orgId2);
|
||||||
r2.setCollectedfrom(collectedFrom);
|
// r2.setTarget(orgId1);
|
||||||
r2.setDataInfo(info);
|
// r2.setCollectedfrom(collectedFrom);
|
||||||
r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
// r2.setDataInfo(info);
|
||||||
|
// r2.setLastupdatetimestamp(lastUpdateTimestamp);
|
||||||
|
// return Arrays.asList(r1, r2);
|
||||||
|
|
||||||
return Arrays.asList(r1, r2);
|
return Arrays.asList(r1);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,8 @@ package eu.dnetlib.dhp.oa.graph.raw.common;
|
||||||
//enum to specify the different actions available for the MigrateDbEntitiesApplication job
|
//enum to specify the different actions available for the MigrateDbEntitiesApplication job
|
||||||
public enum MigrateAction {
|
public enum MigrateAction {
|
||||||
claims, // migrate claims to the raw graph
|
claims, // migrate claims to the raw graph
|
||||||
openorgs, // migrate organizations from openorgs to the raw graph
|
openorgs_dedup, // migrate organizations from openorgs to the raw graph
|
||||||
|
openorgs, // migrate organization from openorgs to the raw graph for provision
|
||||||
openaire, // migrate openaire entities to the raw graph
|
openaire, // migrate openaire entities to the raw graph
|
||||||
openaire_organizations // migrate openaire organizations entities to the raw graph
|
openaire_organizations // migrate openaire organizations entities to the raw graph
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,7 +156,7 @@
|
||||||
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
|
<arg>--postgresUser</arg><arg>${postgresOpenOrgsUser}</arg>
|
||||||
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
|
<arg>--postgresPassword</arg><arg>${postgresOpenOrgsPassword}</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>--action</arg><arg>openorgs</arg>
|
<arg>--action</arg><arg>openorgs_dedup</arg>
|
||||||
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
<arg>--dbschema</arg><arg>${dbSchema}</arg>
|
||||||
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
<arg>--nsPrefixBlacklist</arg><arg>${nsPrefixBlacklist}</arg>
|
||||||
</java>
|
</java>
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
SELECT
|
||||||
|
o.id AS organizationid,
|
||||||
|
coalesce((array_agg(a.acronym))[1], o.name) AS legalshortname,
|
||||||
|
o.name AS legalname,
|
||||||
|
array_agg(DISTINCT n.name) AS "alternativeNames",
|
||||||
|
(array_agg(u.url))[1] AS websiteurl,
|
||||||
|
'' AS logourl,
|
||||||
|
o.creation_date AS dateofcollection,
|
||||||
|
o.modification_date AS dateoftransformation,
|
||||||
|
false AS inferred,
|
||||||
|
false AS deletedbyinference,
|
||||||
|
0.95 AS trust,
|
||||||
|
'' AS inferenceprovenance,
|
||||||
|
'openaire____::openorgs' AS collectedfromid,
|
||||||
|
'OpenOrgs Database' AS collectedfromname,
|
||||||
|
o.country || '@@@dnet:countries' AS country,
|
||||||
|
'sysimport:crosswalk:entityregistry@@@dnet:provenance_actions' AS provenanceaction,
|
||||||
|
array_agg(DISTINCT i.otherid || '###' || i.type || '@@@dnet:pid_types') AS pid,
|
||||||
|
null AS eclegalbody,
|
||||||
|
null AS eclegalperson,
|
||||||
|
null AS ecnonprofit,
|
||||||
|
null AS ecresearchorganization,
|
||||||
|
null AS echighereducation,
|
||||||
|
null AS ecinternationalorganizationeurinterests,
|
||||||
|
null AS ecinternationalorganization,
|
||||||
|
null AS ecenterprise,
|
||||||
|
null AS ecsmevalidated,
|
||||||
|
null AS ecnutscode
|
||||||
|
FROM organizations o
|
||||||
|
LEFT OUTER JOIN acronyms a ON (a.id = o.id)
|
||||||
|
LEFT OUTER JOIN urls u ON (u.id = o.id)
|
||||||
|
LEFT OUTER JOIN other_ids i ON (i.id = o.id)
|
||||||
|
LEFT OUTER JOIN other_names n ON (n.id = o.id)
|
||||||
|
WHERE
|
||||||
|
o.status = 'approved'
|
||||||
|
GROUP BY
|
||||||
|
o.id,
|
||||||
|
o.name,
|
||||||
|
o.creation_date,
|
||||||
|
o.modification_date,
|
||||||
|
o.country;
|
|
@ -23,7 +23,7 @@ SELECT
|
||||||
false AS deletedbyinference,
|
false AS deletedbyinference,
|
||||||
0.99 AS trust,
|
0.99 AS trust,
|
||||||
'' AS inferenceprovenance,
|
'' AS inferenceprovenance,
|
||||||
'isSimilarTo' AS relclass
|
'isSimilarTo' AS relclass
|
||||||
FROM other_names n
|
FROM other_names n
|
||||||
LEFT OUTER JOIN organizations o ON (n.id = o.id)
|
LEFT OUTER JOIN organizations o ON (n.id = o.id)
|
||||||
|
|
||||||
|
@ -40,8 +40,4 @@ SELECT
|
||||||
0.99 AS trust,
|
0.99 AS trust,
|
||||||
'' AS inferenceprovenance,
|
'' AS inferenceprovenance,
|
||||||
'isDifferentFrom' AS relclass
|
'isDifferentFrom' AS relclass
|
||||||
FROM oa_duplicates WHERE reltype = 'is_different'
|
FROM oa_duplicates WHERE reltype = 'is_different'
|
||||||
|
|
||||||
|
|
||||||
--TODO ???
|
|
||||||
--Creare relazioni isDifferentFrom anche tra i suggerimenti: (A is_similar B) and (A is_different C) => (B is_different C)
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
-- relations approved by the user and suggested by the dedup
|
||||||
|
SELECT
|
||||||
|
local_id AS id1,
|
||||||
|
oa_original_id AS id2,
|
||||||
|
'openaire____::openorgs' AS collectedfromid,
|
||||||
|
'OpenOrgs Database' AS collectedfromname,
|
||||||
|
false AS inferred,
|
||||||
|
false AS deletedbyinference,
|
||||||
|
0.99 AS trust,
|
||||||
|
'' AS inferenceprovenance,
|
||||||
|
'isSimilarTo' AS relclass
|
||||||
|
FROM oa_duplicates WHERE reltype = 'is_similar' OR reltype = 'suggested';
|
Loading…
Reference in New Issue