diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java index fe4642ee9..c6c5cdcea 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactory.java @@ -57,7 +57,7 @@ public class IdentifierFactory implements Serializable { } public static List getPids(List pid, KeyValue collectedFrom) { - return pidFromInstance(pid, collectedFrom).distinct().collect(Collectors.toList()); + return pidFromInstance(pid, collectedFrom, true).distinct().collect(Collectors.toList()); } public static String createDOIBoostIdentifier(T entity) { @@ -104,7 +104,7 @@ public class IdentifierFactory implements Serializable { checkArgument(StringUtils.isNoneBlank(entity.getId()), "missing entity identifier"); - final Map> pids = extractPids(entity); + final Map> pids = extractPids(entity); return pids .values() @@ -125,7 +125,7 @@ public class IdentifierFactory implements Serializable { .orElseGet(entity::getId); } - private static Map> extractPids(T entity) { + private static Map> extractPids(T entity) { if (entity instanceof Result) { return Optional .ofNullable(((Result) entity).getInstance()) @@ -142,23 +142,24 @@ public class IdentifierFactory implements Serializable { Collectors .groupingBy( p -> p.getQualifier().getClassid(), - Collectors.mapping(p -> p, Collectors.toList()))); + Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new)))); } } - private static Map> mapPids(List instance) { + private static Map> mapPids(List instance) { return instance .stream() - .map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom())) + .map(i -> pidFromInstance(i.getPid(), i.getCollectedfrom(), false)) .flatMap(Function.identity()) .collect( Collectors .groupingBy( p -> p.getQualifier().getClassid(), - Collectors.mapping(p -> p, Collectors.toList()))); + Collectors.mapping(p -> p, Collectors.toCollection(HashSet::new)))); } - private static Stream pidFromInstance(List pid, KeyValue collectedFrom) { + private static Stream pidFromInstance(List pid, KeyValue collectedFrom, + boolean mapHandles) { return Optional .ofNullable(pid) .map( @@ -167,16 +168,16 @@ public class IdentifierFactory implements Serializable { // filter away PIDs provided by a DS that is not considered an authority for the // given PID Type .filter(p -> { - return shouldFilterPid(collectedFrom, p); + return shouldFilterPid(collectedFrom, p, mapHandles); }) .map(CleaningFunctions::normalizePidValue) .filter(CleaningFunctions::pidFilter)) .orElse(Stream.empty()); } - private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p) { + private static boolean shouldFilterPid(KeyValue collectedFrom, StructuredProperty p, boolean mapHandles) { final PidType pType = PidType.tryValueOf(p.getQualifier().getClassid()); - return pType.equals(PidType.handle) || Optional.ofNullable(collectedFrom).isPresent() && + return (mapHandles && pType.equals(PidType.handle)) || Optional.ofNullable(collectedFrom).isPresent() && Optional .ofNullable(PID_AUTHORITY.get(pType)) .map(authorities -> { diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java index 31ef91a7a..935b74b08 100644 --- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/IdentifierFactoryTest.java @@ -31,6 +31,9 @@ public class IdentifierFactoryTest { verifyIdentifier( "publication_doi3.json", "50|pmc_________::94e4cb08c93f8733b48e2445d04002ac", true); + verifyIdentifier( + "publication_doi4.json", "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66", true); + verifyIdentifier( "publication_pmc1.json", "50|DansKnawCris::0829b5191605bdbea36d6502b8c1ce1f", true); diff --git a/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json b/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json new file mode 100644 index 000000000..ac99ca93a --- /dev/null +++ b/dhp-common/src/test/resources/eu/dnetlib/dhp/schema/oaf/utils/publication_doi4.json @@ -0,0 +1,37 @@ +{ + "id": "50|od______2852::38861c44e6052a8d49f59a4c39ba5e66", + "instance": [ + { + "collectedfrom": { + "key": "10|openaire____::1234", + "value": "Zenodo" + }, + "pid": [ + { + "qualifier": {"classid": "doi"}, + "value": "10.1016/j.cmet.2010.03.013" + }, + { + "qualifier": {"classid": "handle"}, + "value": "11012/83840" + } + ] + }, + { + "collectedfrom": { + "key": "10|opendoar____::2852", + "value": "Digital library of Brno University of Technology" + }, + "pid": [ + { + "qualifier": {"classid": "pmc"}, + "value": "21459329" + }, + { + "qualifier": {"classid": "handle"}, + "value": "11012/83840" + } + ] + } + ] +} \ No newline at end of file diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java index 1f9a09c3a..9b59c90db 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java @@ -36,7 +36,6 @@ public class ModelConstants { public static final String DNET_COUNTRY_TYPE = "dnet:countries"; public static final String DNET_REVIEW_LEVELS = "dnet:review_levels"; public static final String DNET_PROGRAMMING_LANGUAGES = "dnet:programming_languages"; - public static final String DNET_PROVENANCEACTIONS = "dnet:provenanceActions"; public static final String DNET_EXTERNAL_REF_TYPES = "dnet:externalReference_typologies"; public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository"; @@ -50,7 +49,7 @@ public class ModelConstants { public static final String PROVENANCE_DEDUP = "sysimport:dedup"; public static final Qualifier PROVENANCE_ACTION_SET_QUALIFIER = qualifier( - SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCEACTIONS, DNET_PROVENANCEACTIONS); + SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS); public static final String DATASET_RESULTTYPE_CLASSID = "dataset"; public static final String PUBLICATION_RESULTTYPE_CLASSID = "publication"; diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index fe9bd74ce..5ba6f6e6d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -82,16 +82,12 @@ public class DedupRecordFactory { final Collection dates = Lists.newArrayList(); final List> authors = Lists.newArrayList(); - final List> bestPids = Lists.newArrayList(); // best pids list entities .forEachRemaining( t -> { T duplicate = t._2(); - // prepare the list of pids to be used for the id generation - bestPids.add(Identifier.newInstance(duplicate)); - entity.mergeFrom(duplicate); if (ModelSupport.isSubClass(duplicate, Result.class)) { Result r1 = (Result) duplicate; @@ -109,7 +105,7 @@ public class DedupRecordFactory { ((Result) entity).setAuthor(AuthorMerger.merge(authors)); } - entity.setId(IdGenerator.generate(bestPids, id)); + entity.setId(id); entity.setLastupdatetimestamp(ts); entity.setDataInfo(dataInfo); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index d870f6256..b41507e95 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -1,6 +1,9 @@ package eu.dnetlib.dhp.oa.dedup; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; + import java.io.IOException; import org.apache.commons.io.IOUtils; @@ -27,8 +30,6 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class); public static final String ROOT_TRUST = "0.8"; - public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; - public static final String PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -94,10 +95,10 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { info.setTrust(ROOT_TRUST); info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); Qualifier provenance = new Qualifier(); - provenance.setClassid(PROVENANCE_ACTION_CLASS); - provenance.setClassname(PROVENANCE_ACTION_CLASS); - provenance.setSchemeid(PROVENANCE_ACTIONS); - provenance.setSchemename(PROVENANCE_ACTIONS); + provenance.setClassid(PROVENANCE_DEDUP); + provenance.setClassname(PROVENANCE_DEDUP); + provenance.setSchemeid(DNET_PROVENANCE_ACTIONS); + provenance.setSchemename(DNET_PROVENANCE_ACTIONS); info.setProvenanceaction(provenance); return info; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index c27464f90..229214229 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -1,18 +1,20 @@ package eu.dnetlib.dhp.oa.dedup; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; + import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.graphx.Edge; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; @@ -23,14 +25,18 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent; import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; @@ -42,9 +48,7 @@ import scala.Tuple2; public class SparkCreateMergeRels extends AbstractSparkAction { - public static final String PROVENANCE_ACTION_CLASS = "sysimport:dedup"; private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); - public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"; public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -92,6 +96,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { for (DedupConfig dedupConf : getConfigurations(isLookUpService, actionSetId)) { final String subEntity = dedupConf.getWf().getSubEntityValue(); + final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity)); log.info("Creating mergerels for: '{}'", subEntity); @@ -100,10 +105,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction { final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity); - final JavaPairRDD vertexes = sc - .textFile(graphBasePath + "/" + subEntity) - .map(s -> MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), s)) - .mapToPair((PairFunction) s -> new Tuple2<>(hash(s), s)); + // + JavaPairRDD vertexes = createVertexes(sc, graphBasePath, subEntity, dedupConf); final RDD> edgeRdd = spark .read() @@ -113,14 +116,42 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .map(it -> new Edge<>(hash(it.getSource()), hash(it.getTarget()), it.getRelClass())) .rdd(); - final Dataset mergeRels = spark + Dataset> rawMergeRels = spark .createDataset( GraphProcessor .findCCs(vertexes.rdd(), edgeRdd, maxIterations, cut) .toJavaRDD() - .filter(k -> k.getDocIds().size() > 1) - .flatMap(cc -> ccToMergeRel(cc, dedupConf)) + .filter(k -> k.getIds().size() > 1) + .flatMap(this::ccToRels) .rdd(), + Encoders.tuple(Encoders.STRING(), Encoders.STRING())); + + Dataset> entities = spark + .read() + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .map( + (MapFunction>) it -> { + OafEntity entity = OBJECT_MAPPER.readValue(it, clazz); + return new Tuple2<>(entity.getId(), entity); + }, + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + + Dataset mergeRels = rawMergeRels + .joinWith(entities, rawMergeRels.col("_2").equalTo(entities.col("_1")), "inner") + // , + .map( + (MapFunction, Tuple2>, Tuple2>) value -> new Tuple2<>( + value._1()._1(), value._2()._2()), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))) + // + .groupByKey( + (MapFunction, String>) Tuple2::_1, Encoders.STRING()) + .mapGroups( + (MapGroupsFunction, ConnectedComponent>) this::generateID, + Encoders.bean(ConnectedComponent.class)) + // + .flatMap( + (FlatMapFunction) cc -> ccToMergeRel(cc, dedupConf), Encoders.bean(Relation.class)); mergeRels.write().mode(SaveMode.Append).parquet(mergeRelPath); @@ -128,9 +159,45 @@ public class SparkCreateMergeRels extends AbstractSparkAction { } } - public Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { + private ConnectedComponent generateID(String key, Iterator> values) { + + List> identifiers = Lists.newArrayList(values).stream().map(v -> { + T entity = v._2(); + Identifier identifier = Identifier.newInstance(entity); + return identifier; + }).collect(Collectors.toList()); + + String rootID = IdGenerator.generate(identifiers, key); + + if (Objects.equals(rootID, key)) + throw new IllegalStateException("generated default ID: " + rootID); + + return new ConnectedComponent(rootID, + identifiers.stream().map(i -> i.getEntity().getId()).collect(Collectors.toSet())); + } + + private JavaPairRDD createVertexes(JavaSparkContext sc, String graphBasePath, String subEntity, + DedupConfig dedupConf) { + + return sc + .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) + .mapToPair(json -> { + String id = MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), json); + return new Tuple2<>(hash(id), id); + }); + } + + private Iterator> ccToRels(ConnectedComponent cc) { return cc - .getDocIds() + .getIds() + .stream() + .map(id -> new Tuple2<>(cc.getCcId(), id)) + .iterator(); + } + + private Iterator ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) { + return cc + .getIds() .stream() .flatMap( id -> { @@ -161,8 +228,8 @@ public class SparkCreateMergeRels extends AbstractSparkAction { info.setInvisible(false); info.setInferenceprovenance(dedupConf.getWf().getConfigurationId()); Qualifier provenanceAction = new Qualifier(); - provenanceAction.setClassid(PROVENANCE_ACTION_CLASS); - provenanceAction.setClassname(PROVENANCE_ACTION_CLASS); + provenanceAction.setClassid(PROVENANCE_DEDUP); + provenanceAction.setClassname(PROVENANCE_DEDUP); provenanceAction.setSchemeid(DNET_PROVENANCE_ACTIONS); provenanceAction.setSchemename(DNET_PROVENANCE_ACTIONS); info.setProvenanceaction(provenanceAction); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java index 3f24adb93..3a986a9dd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent.java @@ -3,44 +3,66 @@ package eu.dnetlib.dhp.oa.dedup.graph; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; +import org.apache.spark.api.java.function.MapFunction; import org.codehaus.jackson.annotate.JsonIgnore; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import eu.dnetlib.dhp.oa.dedup.DedupUtility; +import eu.dnetlib.dhp.oa.dedup.IdGenerator; +import eu.dnetlib.dhp.oa.dedup.model.Identifier; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.util.MapDocumentUtil; import eu.dnetlib.pace.util.PaceException; public class ConnectedComponent implements Serializable { - private Set docIds; private String ccId; + private Set ids; - public ConnectedComponent(Set docIds, final int cut) { - this.docIds = docIds; - createID(); - if (cut > 0 && docIds.size() > cut) { - this.docIds = docIds + private static final String CONNECTED_COMPONENT_ID_PREFIX = "connect_comp"; + + public ConnectedComponent(Set ids, final int cut) { + this.ids = ids; + + this.ccId = createDefaultID(); + + if (cut > 0 && ids.size() > cut) { + this.ids = ids .stream() - .filter(s -> !ccId.equalsIgnoreCase(s)) + .filter(id -> !ccId.equalsIgnoreCase(id)) .limit(cut - 1) .collect(Collectors.toSet()); - this.docIds.add(ccId); +// this.ids.add(ccId); ?? } } - public String createID() { - if (docIds.size() > 1) { + public ConnectedComponent(String ccId, Set ids) { + this.ccId = ccId; + this.ids = ids; + } + + public String createDefaultID() { + if (ids.size() > 1) { final String s = getMin(); String prefix = s.split("\\|")[0]; - ccId = prefix + "|dedup_wf_001::" + DHPUtils.md5(s); + ccId = prefix + "|" + CONNECTED_COMPONENT_ID_PREFIX + "::" + DHPUtils.md5(s); return ccId; } else { - return docIds.iterator().next(); + return ids.iterator().next(); } } @@ -49,15 +71,15 @@ public class ConnectedComponent implements Serializable { final StringBuilder min = new StringBuilder(); - docIds + ids .forEach( - i -> { + id -> { if (StringUtils.isBlank(min.toString())) { - min.append(i); + min.append(id); } else { - if (min.toString().compareTo(i) > 0) { + if (min.toString().compareTo(id) > 0) { min.setLength(0); - min.append(i); + min.append(id); } } }); @@ -74,12 +96,12 @@ public class ConnectedComponent implements Serializable { } } - public Set getDocIds() { - return docIds; + public Set getIds() { + return ids; } - public void setDocIds(Set docIds) { - this.docIds = docIds; + public void setIds(Set ids) { + this.ids = ids; } public String getCcId() { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java index 94a3bf613..d7f0644fd 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/model/Identifier.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.oa.dedup.model; import java.io.Serializable; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index 851e72dee..93caadf93 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -55,7 +55,6 @@ public class SparkDedupTest implements Serializable { private static String testOutputBasePath; private static String testDedupGraphBasePath; private static final String testActionSetId = "test-orchestrator"; - private static String testDedupAssertionsBasePath; @BeforeAll public static void cleanUp() throws IOException, URISyntaxException { @@ -70,10 +69,6 @@ public class SparkDedupTest implements Serializable { testDedupGraphBasePath = createTempDirectory(SparkDedupTest.class.getSimpleName() + "-") .toAbsolutePath() .toString(); - testDedupAssertionsBasePath = Paths - .get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/assertions").toURI()) - .toFile() - .getAbsolutePath(); FileUtils.deleteDirectory(new File(testOutputBasePath)); FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); @@ -353,6 +348,7 @@ public class SparkDedupTest implements Serializable { assertEquals(288, sw_mergerel); assertEquals(472, ds_mergerel); assertEquals(718, orp_mergerel); + } @Test @@ -538,7 +534,7 @@ public class SparkDedupTest implements Serializable { long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count(); - assertEquals(4858, relations); + assertEquals(4862, relations); // check deletedbyinference final Dataset mergeRels = spark diff --git a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh index 57acb2ee7..d04c5ccfd 100644 --- a/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh +++ b/dhp-workflows/dhp-stats-update/src/main/resources/eu/dnetlib/dhp/oa/graph/stats/oozie_app/finalizedb.sh @@ -10,6 +10,7 @@ export SOURCE=$1 export SHADOW=$2 echo "Updating shadow database" +impala-shell -q "invalidate metadata" impala-shell -d ${SOURCE} -q "invalidate metadata" impala-shell -d ${SOURCE} -q "show tables" --delimited | sed "s/^\(.*\)/compute stats ${SOURCE}.\1;/" | impala-shell -c -f - impala-shell -q "create database if not exists ${SHADOW}"