forked from antonis.lempesis/dnet-hadoop
[Enrichment: Propagation through parent-child relationships] Added counters, and changed constraint to verify if filtering out the relation (from classname = harvested to classid != propagation)
This commit is contained in:
parent
2aca6bfa0a
commit
b9d124bb7c
|
@ -6,9 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.collections.iterators.ArrayListIterator;
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.*;
|
import org.apache.spark.api.java.function.*;
|
||||||
|
@ -17,15 +15,12 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.sun.tools.internal.ws.processor.model.Model;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.KeyValueSet;
|
import eu.dnetlib.dhp.KeyValueSet;
|
||||||
import eu.dnetlib.dhp.PropagationConstant;
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import net.sf.saxon.expr.instruct.ForEach;
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
|
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.*;
|
import static eu.dnetlib.dhp.PropagationConstant.*;
|
||||||
import static eu.dnetlib.dhp.common.Constants.*;
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
@ -22,12 +22,13 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.KeyValueSet;
|
import eu.dnetlib.dhp.KeyValueSet;
|
||||||
import eu.dnetlib.dhp.PropagationConstant;
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
|
|
||||||
|
|
||||||
public class SparkResultToOrganizationFromSemRel implements Serializable {
|
public class SparkResultToOrganizationFromSemRel implements Serializable {
|
||||||
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
|
private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
|
||||||
private static final int MAX_ITERATION = 5;
|
private static final int MAX_ITERATION = 5;
|
||||||
|
@ -113,7 +114,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
|
||||||
String resultOrganizationPath, String graphPath, String workingPath, String outputPath,
|
String resultOrganizationPath, String graphPath, String workingPath, String outputPath,
|
||||||
PropagationCounter propagationCounter) {
|
PropagationCounter propagationCounter) {
|
||||||
int iteration = 0;
|
int iteration = 0;
|
||||||
long leavesCount = 0;
|
long leavesCount;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
iteration++;
|
iteration++;
|
||||||
|
@ -199,30 +200,6 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
|
||||||
.json(outputPath);
|
.json(outputPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
// per ogni figlio nel file delle organizzazioni
|
|
||||||
// devo fare una roba iterativa che legge info da un file e le cambia via via
|
|
||||||
// passo 1: creo l'informazione iniale: organizzazioni che non hanno figli con almeno un padre
|
|
||||||
// ogni organizzazione punta alla lista di padri
|
|
||||||
// eseguo la propagazione dall'organizzazione figlio all'organizzazione padre
|
|
||||||
// ricerco nel dataset delle relazioni se l'organizzazione a cui ho propagato ha, a sua volta, dei padri
|
|
||||||
// e propago anche a quelli e cosi' via fino a che arrivo ad organizzazione senza padre
|
|
||||||
|
|
||||||
// organizationFile:
|
|
||||||
// f => {p1, p2, ..., pn}
|
|
||||||
// resultFile
|
|
||||||
// o => {r1, r2, ... rm}
|
|
||||||
|
|
||||||
// supponiamo che f => {r1, r2} e che nessuno dei padri abbia gia' l'associazione con questi result
|
|
||||||
// quindi
|
|
||||||
// p1 => {r1, r2}
|
|
||||||
// p2 => {r1, r2}
|
|
||||||
// pn => {r1, r2}
|
|
||||||
|
|
||||||
// mi serve un file con tutta la gerarchia per organizzazioni
|
|
||||||
// un file con le organizzazioni foglia da joinare con l'altro file
|
|
||||||
// un file con le associazioni organizzazione -> result forse meglio result -> organization
|
|
||||||
|
|
||||||
// eseguo gli step fino a che ho foglie nel set
|
|
||||||
// quando non ne ho piu' creo relazioni doppio verso per le nuove propagate che ho introdotto
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,11 +131,11 @@ public class StepActions implements Serializable {
|
||||||
if (relationList
|
if (relationList
|
||||||
.stream()
|
.stream()
|
||||||
.filter(
|
.filter(
|
||||||
rel -> rel
|
rel -> !rel
|
||||||
.getDataInfo()
|
.getDataInfo()
|
||||||
.getProvenanceaction()
|
.getProvenanceaction()
|
||||||
.getClassname()
|
.getClassid()
|
||||||
.equals(ModelConstants.HARVESTED))
|
.equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID))
|
||||||
.count() > 0) {
|
.count() > 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,7 +176,7 @@
|
||||||
<arg>--graphPath</arg><arg>${sourcePath}</arg>
|
<arg>--graphPath</arg><arg>${sourcePath}</arg>
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
|
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
|
||||||
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
|
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/leavesPath</arg>
|
||||||
<arg>--leavesPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>
|
<arg>--childParentPath</arg><arg>${workingDir}/preparedInfo/childParentPath</arg>
|
||||||
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
|
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
|
||||||
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
|
||||||
<arg>--workingDir</arg><arg>${workingDir}/working</arg>
|
<arg>--workingDir</arg><arg>${workingDir}/working</arg>
|
||||||
|
|
Loading…
Reference in New Issue