From dcff9cecdf2267b2689bc71dbe67a15bb15b7f34 Mon Sep 17 00:00:00 2001 From: miconis Date: Mon, 12 Apr 2021 15:55:27 +0200 Subject: [PATCH 1/4] bug fix: ids in self mergerels are not marked deletedbyinference=true --- .../main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java index 779fb91d6..b65822635 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java @@ -13,6 +13,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -91,6 +92,7 @@ public class SparkUpdateEntity extends AbstractSparkAction { final JavaPairRDD mergedIds = rel .where("relClass == 'merges'") + .where("source != target") .select(rel.col("target")) .distinct() .toJavaRDD() From 03d36fadea1d301975fe929f780eda80918a10ac Mon Sep 17 00:00:00 2001 From: antleb Date: Thu, 15 Apr 2021 13:34:22 +0300 Subject: [PATCH 2/4] properly invalidating impala metadata --- .../eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh index 57acb2ee7..d04c5ccfd 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh @@ -10,6 +10,7 @@ export SOURCE=$1 export SHADOW=$2 echo "Updating shadow database" +impala-shell -q "invalidate metadata" impala-shell -d ${SOURCE} -q "invalidate metadata" impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f - impala-shell -q "create database if not exists ${SHADOW}" From 4ae6fba01d40f53ad09b7cf29768abce8a170374 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 23 Apr 2021 12:09:19 +0200 Subject: [PATCH 3/4] refactoring --- .../main/java/eu/dnetlib/dhp/schema/oaf/Relation.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java index 8825d7137..adfc6af95 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java @@ -1,8 +1,6 @@ package eu.dnetlib.dhp.schema.oaf; -import eu.dnetlib.dhp.schema.common.ModelSupport; - import static com.google.common.base.Preconditions.checkArgument; import java.text.ParseException; @@ -10,6 +8,8 @@ import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; +import eu.dnetlib.dhp.schema.common.ModelSupport; + /** * Relation models any edge between two nodes in the OpenAIRE graph. It has a source id and a target id pointing to * graph node identifiers and it is further characterised by the semantic of the link through the fields relType, @@ -137,7 +137,10 @@ public class Relation extends Oaf { try { setValidationDate(ModelSupport.oldest(getValidationDate(), r.getValidationDate())); } catch (ParseException e) { - throw new IllegalArgumentException(String.format("invalid validation date format in relation [s:%s, t:%s]: %s", getSource(), getTarget(), getValidationDate())); + throw new IllegalArgumentException(String + .format( + "invalid validation date format in relation [s:%s, t:%s]: %s", getSource(), getTarget(), + getValidationDate())); } super.mergeFrom(r); From 72e5aa3b42ff8011d19d81eb11805ee8d5f594e1 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 23 Apr 2021 12:10:30 +0200 Subject: [PATCH 4/4] refactoring --- .../PrepareResultInstRepoAssociation.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index 92c09fb28..a41399627 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -4,6 +4,11 @@ package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; import static eu.dnetlib.dhp.PropagationConstant.*; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -22,11 +27,6 @@ import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.schema.oaf.Relation; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; - public class PrepareResultInstRepoAssociation { private static final Logger log = LoggerFactory.getLogger(PrepareResultInstRepoAssociation.class); @@ -56,9 +56,10 @@ public class PrepareResultInstRepoAssociation { final String alreadyLinkedPath = parser.get("alreadyLinkedPath"); log.info("alreadyLinkedPath {}: ", alreadyLinkedPath); - List blacklist = Optional.ofNullable(parser.get("blacklist")) - .map(v -> Arrays.asList(v.split(";"))) - .orElse(new ArrayList<>()); + List blacklist = Optional + .ofNullable(parser.get("blacklist")) + .map(v -> Arrays.asList(v.split(";"))) + .orElse(new ArrayList<>()); SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); @@ -91,14 +92,13 @@ public class PrepareResultInstRepoAssociation { private static void prepareDatasourceOrganization( SparkSession spark, String datasourceOrganizationPath, List blacklist) { String blacklisted = ""; - if(blacklist.size() > 0 ){ + if (blacklist.size() > 0) { blacklisted = " AND d.id != '" + blacklist.get(0) + "'"; for (int i = 1; i < blacklist.size(); i++) { blacklisted += " AND d.id != '" + blacklist.get(i) + "'"; } } - String query = "SELECT source datasourceId, target organizationId " + "FROM ( SELECT id " + "FROM datasource "