From 1c6a1716332e02c49f1b05639803e9c3f8c83d00 Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 9 Jul 2020 11:02:09 +0200 Subject: [PATCH 1/8] updated pom --- dhp-workflows/dhp-broker-events/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dhp-workflows/dhp-broker-events/pom.xml b/dhp-workflows/dhp-broker-events/pom.xml index 119031b06..1a219c5c9 100644 --- a/dhp-workflows/dhp-broker-events/pom.xml +++ b/dhp-workflows/dhp-broker-events/pom.xml @@ -57,9 +57,9 @@ - eu.dnetlib + eu.dnetlib.dhp dnet-openaire-broker-common - [3.0.4,4.0.0) + [3.0.0-SNAPSHOT,) From 2d742a84aeb699aa67c889f7ed7ffe99c137fdae Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Thu, 9 Jul 2020 12:53:46 +0200 Subject: [PATCH 2/8] DedupConfig as json file --- .../dhp/broker/oa/GenerateEventsJob.java | 36 +----- .../dhp/broker/oa/matchers/UpdateMatcher.java | 4 +- .../dhp/broker/oa/util/EventFinder.java | 4 +- .../dhp/broker/oa/util/TrustUtils.java | 54 ++++++++ .../dhp/broker/oa/util/UpdateInfo.java | 41 +----- .../broker/oa/dedupConfig/dedupConfig.json | 122 ++++++++++++++++++ .../oa/generate_all/oozie_app/workflow.xml | 10 -- .../dhp/broker/oa/generate_events.json | 12 -- .../broker/oa/partial/oozie_app/workflow.xml | 9 -- .../broker/oa/matchers/UpdateMatcherTest.java | 12 +- .../dhp/broker/oa/util/TrustUtilsTest.java | 21 +++ 11 files changed, 208 insertions(+), 117 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java index a2d92e149..cfee360c5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateEventsJob.java @@ -18,8 +18,6 @@ import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.broker.model.Event; import eu.dnetlib.dhp.broker.oa.matchers.UpdateMatcher; @@ -27,9 +25,6 @@ import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; import eu.dnetlib.dhp.broker.oa.util.EventFinder; import eu.dnetlib.dhp.broker.oa.util.EventGroup; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; public class GenerateEventsJob { @@ -52,12 +47,6 @@ public class GenerateEventsJob { final String workingPath = parser.get("workingPath"); log.info("workingPath: {}", workingPath); - final String isLookupUrl = parser.get("isLookupUrl"); - log.info("isLookupUrl: {}", isLookupUrl); - - final String dedupConfigProfileId = parser.get("dedupConfProfile"); - log.info("dedupConfigProfileId: {}", dedupConfigProfileId); - final String eventsPath = workingPath + "/events"; log.info("eventsPath: {}", eventsPath); @@ -72,10 +61,6 @@ public class GenerateEventsJob { final SparkConf conf = new SparkConf(); - // TODO UNCOMMENT - // final DedupConfig dedupConfig = loadDedupConfig(isLookupUrl, dedupConfigProfileId); - final DedupConfig dedupConfig = null; - runWithSparkSession(conf, isSparkSessionManaged, spark -> { ClusterUtils.removeDir(spark, eventsPath); @@ -90,7 +75,7 @@ public class GenerateEventsJob { final Dataset dataset = groups .map( g -> EventFinder - .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, dedupConfig, accumulators), + .generateEvents(g, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist, accumulators), Encoders .bean(EventGroup.class)) .flatMap(g -> g.getData().iterator(), Encoders.bean(Event.class)); @@ -112,23 +97,4 @@ public class GenerateEventsJob { } - private static DedupConfig loadDedupConfig(final String isLookupUrl, final String profId) throws Exception { - - final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookupUrl); - - final String conf = isLookUpService - .getResourceProfileByQuery( - String - .format( - "for $x in /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '%s'] return $x//DEDUPLICATION/text()", - profId)); - - final DedupConfig dedupConfig = new ObjectMapper().readValue(conf, DedupConfig.class); - dedupConfig.getPace().initModel(); - dedupConfig.getPace().initTranslationMap(); - // dedupConfig.getWf().setConfigurationId("???"); - - return dedupConfig; - } - } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java index af6ab30a1..3d688fa1d 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcher.java @@ -17,7 +17,6 @@ import org.apache.spark.util.LongAccumulator; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.dhp.broker.model.Topic; import eu.dnetlib.dhp.broker.oa.util.UpdateInfo; -import eu.dnetlib.pace.config.DedupConfig; public abstract class UpdateMatcher { @@ -37,7 +36,6 @@ public abstract class UpdateMatcher { public Collection> searchUpdatesForRecord(final OaBrokerMainEntity res, final Collection others, - final DedupConfig dedupConfig, final Map accumulators) { final Map> infoMap = new HashMap<>(); @@ -49,7 +47,7 @@ public abstract class UpdateMatcher { if (topic != null) { final UpdateInfo info = new UpdateInfo<>(topic, hl, source, res, getCompileHighlightFunction(), - getHighlightToStringFunction(), dedupConfig); + getHighlightToStringFunction()); final String s = DigestUtils.md5Hex(info.getHighlightValueAsString()); if (!infoMap.containsKey(s) || infoMap.get(s).getTrust() < info.getTrust()) { diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java index 593e66d43..b6328eb95 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/EventFinder.java @@ -37,7 +37,6 @@ import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreOpenAccess; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMorePid; import eu.dnetlib.dhp.broker.oa.matchers.simple.EnrichMoreSubject; import eu.dnetlib.dhp.broker.oa.util.aggregators.simple.ResultGroup; -import eu.dnetlib.pace.config.DedupConfig; public class EventFinder { @@ -76,7 +75,6 @@ public class EventFinder { final Set dsIdWhitelist, final Set dsIdBlacklist, final Set dsTypeWhitelist, - final DedupConfig dedupConfig, final Map accumulators) { final List> list = new ArrayList<>(); @@ -84,7 +82,7 @@ public class EventFinder { for (final OaBrokerMainEntity target : results.getData()) { if (verifyTarget(target, dsIdWhitelist, dsIdBlacklist, dsTypeWhitelist)) { for (final UpdateMatcher matcher : matchers) { - list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), dedupConfig, accumulators)); + list.addAll(matcher.searchUpdatesForRecord(target, results.getData(), accumulators)); } } } diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java index 5338d4f3d..72fe1b204 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/TrustUtils.java @@ -1,8 +1,62 @@ package eu.dnetlib.dhp.broker.oa.util; +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.pace.config.DedupConfig; +import eu.dnetlib.pace.model.MapDocument; +import eu.dnetlib.pace.tree.support.TreeProcessor; +import eu.dnetlib.pace.util.MapDocumentUtil; + public class TrustUtils { + private static final Logger log = LoggerFactory.getLogger(TrustUtils.class); + + private static DedupConfig dedupConfig; + + static { + final ObjectMapper mapper = new ObjectMapper(); + try { + dedupConfig = mapper + .readValue( + DedupConfig.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json"), + DedupConfig.class); + } catch (final IOException e) { + log.error("Error loading dedupConfig, e"); + } + + } + + protected static float calculateTrust(final OaBrokerMainEntity r1, final OaBrokerMainEntity r2) { + + if (dedupConfig == null) { + return BrokerConstants.MIN_TRUST; + } + + try { + final ObjectMapper objectMapper = new ObjectMapper(); + final MapDocument doc1 = MapDocumentUtil + .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1)); + final MapDocument doc2 = MapDocumentUtil + .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2)); + + final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2); + + final double threshold = dedupConfig.getWf().getThreshold(); + + return TrustUtils.rescale(score, threshold); + } catch (final Exception e) { + log.error("Error computing score between results", e); + return BrokerConstants.MIN_TRUST; + } + } + public static float rescale(final double score, final double threshold) { if (score >= BrokerConstants.MAX_TRUST) { return BrokerConstants.MAX_TRUST; diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java index 0586b681e..ef8fb240c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/UpdateInfo.java @@ -4,20 +4,11 @@ package eu.dnetlib.dhp.broker.oa.util; import java.util.function.BiConsumer; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.broker.objects.OaBrokerEventPayload; import eu.dnetlib.broker.objects.OaBrokerInstance; import eu.dnetlib.broker.objects.OaBrokerMainEntity; import eu.dnetlib.broker.objects.OaBrokerProvenance; import eu.dnetlib.dhp.broker.model.Topic; -import eu.dnetlib.pace.config.DedupConfig; -import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.tree.support.TreeProcessor; -import eu.dnetlib.pace.util.MapDocumentUtil; public final class UpdateInfo { @@ -35,20 +26,17 @@ public final class UpdateInfo { private final float trust; - private static final Logger log = LoggerFactory.getLogger(UpdateInfo.class); - public UpdateInfo(final Topic topic, final T highlightValue, final OaBrokerMainEntity source, final OaBrokerMainEntity target, final BiConsumer compileHighlight, - final Function highlightToString, - final DedupConfig dedupConfig) { + final Function highlightToString) { this.topic = topic; this.highlightValue = highlightValue; this.source = source; this.target = target; this.compileHighlight = compileHighlight; this.highlightToString = highlightToString; - this.trust = calculateTrust(dedupConfig, source, target); + this.trust = TrustUtils.calculateTrust(source, target); } public T getHighlightValue() { @@ -63,31 +51,6 @@ public final class UpdateInfo { return target; } - private float calculateTrust(final DedupConfig dedupConfig, - final OaBrokerMainEntity r1, - final OaBrokerMainEntity r2) { - - if (dedupConfig == null) { - return BrokerConstants.MIN_TRUST; - } - - try { - final ObjectMapper objectMapper = new ObjectMapper(); - final MapDocument doc1 = MapDocumentUtil - .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r1)); - final MapDocument doc2 = MapDocumentUtil - .asMapDocumentWithJPath(dedupConfig, objectMapper.writeValueAsString(r2)); - - final double score = new TreeProcessor(dedupConfig).computeScore(doc1, doc2); - final double threshold = dedupConfig.getWf().getThreshold(); - - return TrustUtils.rescale(score, threshold); - } catch (final Exception e) { - log.error("Error computing score between results", e); - return BrokerConstants.MIN_TRUST; - } - } - protected Topic getTopic() { return topic; } diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json new file mode 100644 index 000000000..d0319b441 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/dedupConfig/dedupConfig.json @@ -0,0 +1,122 @@ +{ + "wf": { + + }, + "pace": { + "clustering": [ + { + "name": "wordssuffixprefix", + "fields": [ + "title" + ], + "params": { + "max": "2", + "len": "3" + } + }, + { + "name": "lowercase", + "fields": [ + "doi" + ], + "params": { + + } + } + ], + "decisionTree": { + "start": { + "fields": [ + { + "field": "doi", + "comparator": "exactMatch", + "weight": 1.0, + "countIfUndefined": "false", + "params": { + + } + } + ], + "threshold": 0.5, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "layer1", + "undefined": "layer1", + "ignoreUndefined": "true" + }, + "layer1": { + "fields": [ + { + "field": "title", + "comparator": "titleVersionMatch", + "weight": 0.9, + "countIfUndefined": "false", + "params": { + + } + }, + { + "field": "authors", + "comparator": "sizeMatch", + "weight": 0.9, + "countIfUndefined": "false", + "params": { + + } + } + ], + "threshold": 0.5, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "layer2", + "undefined": "layer2", + "ignoreUndefined": "true" + }, + "layer2": { + "fields": [ + { + "field": "title", + "comparator": "levensteinTitle", + "weight": 1.0, + "countIfUndefined": "true", + "params": { + + } + } + ], + "threshold": 0.99, + "aggregation": "AVG", + "positive": "MATCH", + "negative": "NO_MATCH", + "undefined": "NO_MATCH", + "ignoreUndefined": "true" + } + }, + "model": [ + { + "name": "doi", + "type": "String", + "path": "$.pids[?(@.type == 'doi')].value" + }, + { + "name": "title", + "type": "String", + "path": "$.titles", + "length": 250, + "size": 5 + }, + { + "name": "authors", + "type": "List", + "path": "$.creators[*].fullname", + "size": 200 + } + ], + "blacklists": { + + }, + "synonyms": { + + } + } +} \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index b85c60fdf..2c728cd98 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -8,14 +8,6 @@ workingPath the path where the the generated data will be stored - - - isLookupUrl - the address of the lookUp service - - - dedupConfProfId - the id of a valid Dedup Configuration Profile datasourceIdWhitelist @@ -427,8 +419,6 @@ --conf spark.sql.shuffle.partitions=3840 --workingPath${workingPath} - --isLookupUrl${isLookupUrl} - --dedupConfProfile${dedupConfProfId} --datasourceIdWhitelist${datasourceIdWhitelist} --datasourceTypeWhitelist${datasourceTypeWhitelist} --datasourceIdBlacklist${datasourceIdBlacklist} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json index c545884f9..bab808193 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_events.json @@ -5,18 +5,6 @@ "paramDescription": "the path where the generated events will be stored", "paramRequired": true }, - { - "paramName": "lu", - "paramLongName": "isLookupUrl", - "paramDescription": "the address of the ISLookUpService", - "paramRequired": true - }, - { - "paramName": "d", - "paramLongName": "dedupConfProfile", - "paramDescription": "the id of a valid Dedup Configuration Profile", - "paramRequired": true - }, { "paramName": "datasourceIdWhitelist", "paramLongName": "datasourceIdWhitelist", diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index 392271260..d19ad6c5a 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -9,15 +9,6 @@ workingPath the path where the the generated data will be stored - - isLookupUrl - the address of the lookUp service - - - dedupConfProfId - the id of a valid Dedup Configuration Profile - - sparkDriverMemory memory for driver process diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java index 93bc5617f..82374b335 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/matchers/UpdateMatcherTest.java @@ -30,7 +30,7 @@ class UpdateMatcherTest { final OaBrokerMainEntity p4 = new OaBrokerMainEntity(); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -46,7 +46,7 @@ class UpdateMatcherTest { res.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -62,7 +62,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } @@ -79,7 +79,7 @@ class UpdateMatcherTest { p2.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -98,7 +98,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.isEmpty()); } @@ -117,7 +117,7 @@ class UpdateMatcherTest { p4.setPublicationdate("2018"); final Collection> list = matcher - .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null, null); + .searchUpdatesForRecord(res, Arrays.asList(p1, p2, p3, p4), null); assertTrue(list.size() == 1); } diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java index bb23d6085..974baa28b 100644 --- a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/util/TrustUtilsTest.java @@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.Test; +import eu.dnetlib.broker.objects.OaBrokerAuthor; +import eu.dnetlib.broker.objects.OaBrokerMainEntity; +import eu.dnetlib.broker.objects.OaBrokerTypedValue; + public class TrustUtilsTest { private static final double THRESHOLD = 0.95; @@ -64,6 +68,23 @@ public class TrustUtilsTest { verifyValue(2.00, BrokerConstants.MAX_TRUST); } + @Test + public void test() throws Exception { + final OaBrokerMainEntity r1 = new OaBrokerMainEntity(); + r1.getTitles().add("D-NET Service Package: Data Import"); + r1.getPids().add(new OaBrokerTypedValue("doi", "123")); + r1.getCreators().add(new OaBrokerAuthor("Michele Artini", null)); + r1.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null)); + + final OaBrokerMainEntity r2 = new OaBrokerMainEntity(); + r2.getTitles().add("D-NET Service Package: Data Import"); + // r2.getPids().add(new OaBrokerTypedValue("doi", "123")); + r2.getCreators().add(new OaBrokerAuthor("Michele Artini", null)); + // r2.getCreators().add(new OaBrokerAuthor("Claudio Atzori", null)); + + System.out.println("TRUST: " + TrustUtils.calculateTrust(r1, r2)); + } + private void verifyValue(final double originalScore, final float expectedTrust) { final float trust = TrustUtils.rescale(originalScore, THRESHOLD); System.out.println(trust); From ff4d6214f19292d90076d8ff6ec86a885861c1af Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 10:06:41 +0200 Subject: [PATCH 3/8] experimenting with pruning of relations --- .../dhp/oa/provision/PrepareRelationsJob.java | 344 +++++++++--------- 1 file changed, 174 insertions(+), 170 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index eb63d4423..e1f7386e9 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -59,200 +59,204 @@ import scala.Tuple2; */ public class PrepareRelationsJob { - private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final int MAX_RELS = 100; + public static final int MAX_RELS = 100; - public static final int DEFAULT_NUM_PARTITIONS = 3000; + public static final int DEFAULT_NUM_PARTITIONS = 3000; - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareRelationsJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + PrepareRelationsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputRelationsPath = parser.get("inputRelationsPath"); - log.info("inputRelationsPath: {}", inputRelationsPath); + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - int relPartitions = Optional - .ofNullable(parser.get("relPartitions")) - .map(Integer::valueOf) - .orElse(DEFAULT_NUM_PARTITIONS); - log.info("relPartitions: {}", relPartitions); + int relPartitions = Optional + .ofNullable(parser.get("relPartitions")) + .map(Integer::valueOf) + .orElse(DEFAULT_NUM_PARTITIONS); + log.info("relPartitions: {}", relPartitions); - Set relationFilter = Optional - .ofNullable(parser.get("relationFilter")) - .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) - .orElse(new HashSet<>()); - log.info("relationFilter: {}", relationFilter); + Set relationFilter = Optional + .ofNullable(parser.get("relationFilter")) + .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) + .orElse(new HashSet<>()); + log.info("relationFilter: {}", relationFilter); - int maxRelations = Optional - .ofNullable(parser.get("maxRelations")) - .map(Integer::valueOf) - .orElse(MAX_RELS); - log.info("maxRelations: {}", maxRelations); + int maxRelations = Optional + .ofNullable(parser.get("maxRelations")) + .map(Integer::valueOf) + .orElse(MAX_RELS); + log.info("maxRelations: {}", maxRelations); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); - }); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareRelationsRDD( + spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); + }); + } + + /** + * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering + * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are + * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. + * + * @param spark the spark session + * @param inputRelationsPath source path for the graph relations + * @param outputPath output path for the processed relations + * @param relationFilter set of relation filters applied to the `relClass` field + * @param maxRelations maximum number of allowed outgoing edges + * @param relPartitions number of partitions for the output RDD + */ + private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, + Set relationFilter, int maxRelations, int relPartitions) { + + JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath); + + JavaRDD pruned = pruneRels( + pruneRels(rels, relationFilter, maxRelations, relPartitions, (Function) r -> r.getSource()), + relationFilter, maxRelations, relPartitions, (Function) r -> r.getTarget()); + spark + .createDataset(pruned.rdd(), Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static JavaRDD pruneRels(JavaRDD rels, Set relationFilter, int maxRelations, int relPartitions, Function idFn) { + return rels + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator).map(Tuple2::_2); } - /** - * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering - * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are - * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - * @param relPartitions number of partitions for the output RDD - */ - private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, - Set relationFilter, int maxRelations, int relPartitions) { + // experimental + private static void prepareRelationsDataset( + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { + spark + .read() + .textFile(inputRelationsPath) + .repartition(relPartitions) + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.kryo(Relation.class)) + .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) + .groupByKey( + (MapFunction) Relation::getSource, + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> Iterables + .limit(t._2().getRelations(), maxRelations) + .iterator(), + Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } - // group by SOURCE and apply limit - RDD bySource = readPathRelationRDD(spark, inputRelationsPath) - .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) - .groupBy(Tuple2::_1) - .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) - .flatMap(Iterable::iterator) - .map(Tuple2::_2) - .rdd(); + public static class RelationAggregator + extends Aggregator { - spark - .createDataset(bySource, Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } + private int maxRelations; - // experimental - private static void prepareRelationsDataset( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, - int relPartitions) { - spark - .read() - .textFile(inputRelationsPath) - .repartition(relPartitions) - .map( - (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), - Encoders.kryo(Relation.class)) - .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) - .groupByKey( - (MapFunction) Relation::getSource, - Encoders.STRING()) - .agg(new RelationAggregator(maxRelations).toColumn()) - .flatMap( - (FlatMapFunction, Relation>) t -> Iterables - .limit(t._2().getRelations(), maxRelations) - .iterator(), - Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } + public RelationAggregator(int maxRelations) { + this.maxRelations = maxRelations; + } - public static class RelationAggregator - extends Aggregator { + @Override + public RelationList zero() { + return new RelationList(); + } - private int maxRelations; + @Override + public RelationList reduce(RelationList b, Relation a) { + b.getRelations().add(a); + return getSortableRelationList(b); + } - public RelationAggregator(int maxRelations) { - this.maxRelations = maxRelations; - } + @Override + public RelationList merge(RelationList b1, RelationList b2) { + b1.getRelations().addAll(b2.getRelations()); + return getSortableRelationList(b1); + } - @Override - public RelationList zero() { - return new RelationList(); - } + @Override + public RelationList finish(RelationList r) { + return getSortableRelationList(r); + } - @Override - public RelationList reduce(RelationList b, Relation a) { - b.getRelations().add(a); - return getSortableRelationList(b); - } + private RelationList getSortableRelationList(RelationList b1) { + RelationList sr = new RelationList(); + sr + .setRelations( + b1 + .getRelations() + .stream() + .limit(maxRelations) + .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); + return sr; + } - @Override - public RelationList merge(RelationList b1, RelationList b2) { - b1.getRelations().addAll(b2.getRelations()); - return getSortableRelationList(b1); - } + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(RelationList.class); + } - @Override - public RelationList finish(RelationList r) { - return getSortableRelationList(r); - } + @Override + public Encoder outputEncoder() { + return Encoders.kryo(RelationList.class); + } + } - private RelationList getSortableRelationList(RelationList b1) { - RelationList sr = new RelationList(); - sr - .setRelations( - b1 - .getRelations() - .stream() - .limit(maxRelations) - .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); - return sr; - } + /** + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * file, + * + * @param spark + * @param inputPath + * @return the JavaRDD containing all the relationships + */ + private static JavaRDD readPathRelationRDD( + SparkSession spark, final String inputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); + } - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(RelationList.class); - } - - @Override - public Encoder outputEncoder() { - return Encoders.kryo(RelationList.class); - } - } - - /** - * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text - * file, - * - * @param spark - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private static JavaRDD readPathRelationRDD( - SparkSession spark, final String inputPath) { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); - } - - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } } From b21866a2da7be44106713c3f3982e3ee646d22d4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 13:59:48 +0200 Subject: [PATCH 4/8] allow to set different to relations cut points by source and by target; adjusted weight assigned to relationship types --- .../dhp/oa/provision/PrepareRelationsJob.java | 358 +++++++++--------- .../provision/model/SortableRelationKey.java | 22 +- .../input_params_prepare_relations.json | 13 +- 3 files changed, 206 insertions(+), 187 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index e1f7386e9..da0a81021 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -59,204 +59,216 @@ import scala.Tuple2; */ public class PrepareRelationsJob { - private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final int MAX_RELS = 100; + public static final int MAX_RELS = 100; - public static final int DEFAULT_NUM_PARTITIONS = 3000; + public static final int DEFAULT_NUM_PARTITIONS = 3000; - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareRelationsJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + PrepareRelationsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputRelationsPath = parser.get("inputRelationsPath"); - log.info("inputRelationsPath: {}", inputRelationsPath); + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - int relPartitions = Optional - .ofNullable(parser.get("relPartitions")) - .map(Integer::valueOf) - .orElse(DEFAULT_NUM_PARTITIONS); - log.info("relPartitions: {}", relPartitions); + int relPartitions = Optional + .ofNullable(parser.get("relPartitions")) + .map(Integer::valueOf) + .orElse(DEFAULT_NUM_PARTITIONS); + log.info("relPartitions: {}", relPartitions); - Set relationFilter = Optional - .ofNullable(parser.get("relationFilter")) - .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) - .orElse(new HashSet<>()); - log.info("relationFilter: {}", relationFilter); + Set relationFilter = Optional + .ofNullable(parser.get("relationFilter")) + .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) + .orElse(new HashSet<>()); + log.info("relationFilter: {}", relationFilter); - int maxRelations = Optional - .ofNullable(parser.get("maxRelations")) - .map(Integer::valueOf) - .orElse(MAX_RELS); - log.info("maxRelations: {}", maxRelations); + int sourceMaxRelations = Optional + .ofNullable(parser.get("sourceMaxRelations")) + .map(Integer::valueOf) + .orElse(MAX_RELS); + log.info("sourceMaxRelations: {}", sourceMaxRelations); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); + int targetMaxRelations = Optional + .ofNullable(parser.get("targetMaxRelations")) + .map(Integer::valueOf) + .orElse(MAX_RELS); + log.info("targetMaxRelations: {}", targetMaxRelations); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); - }); - } + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); - /** - * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering - * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are - * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - * @param relPartitions number of partitions for the output RDD - */ - private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, - Set relationFilter, int maxRelations, int relPartitions) { - - JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath); - - JavaRDD pruned = pruneRels( - pruneRels(rels, relationFilter, maxRelations, relPartitions, (Function) r -> r.getSource()), - relationFilter, maxRelations, relPartitions, (Function) r -> r.getTarget()); - spark - .createDataset(pruned.rdd(), Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static JavaRDD pruneRels(JavaRDD rels, Set relationFilter, int maxRelations, int relPartitions, Function idFn) { - return rels - .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r)) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) - .groupBy(Tuple2::_1) - .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) - .flatMap(Iterable::iterator).map(Tuple2::_2); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareRelationsRDD( + spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations, + relPartitions); + }); } - // experimental - private static void prepareRelationsDataset( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, - int relPartitions) { - spark - .read() - .textFile(inputRelationsPath) - .repartition(relPartitions) - .map( - (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), - Encoders.kryo(Relation.class)) - .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) - .groupByKey( - (MapFunction) Relation::getSource, - Encoders.STRING()) - .agg(new RelationAggregator(maxRelations).toColumn()) - .flatMap( - (FlatMapFunction, Relation>) t -> Iterables - .limit(t._2().getRelations(), maxRelations) - .iterator(), - Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } + /** + * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering + * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are + * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. + * + * @param spark the spark session + * @param inputRelationsPath source path for the graph relations + * @param outputPath output path for the processed relations + * @param relationFilter set of relation filters applied to the `relClass` field + * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source + * @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target + * @param relPartitions number of partitions for the output RDD + */ + private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, + Set relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) { - public static class RelationAggregator - extends Aggregator { + JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false); - private int maxRelations; + JavaRDD pruned = pruneRels( + pruneRels( + rels, + sourceMaxRelations, relPartitions, (Function) r -> r.getSource()), + targetMaxRelations, relPartitions, (Function) r -> r.getTarget()); + spark + .createDataset(pruned.rdd(), Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } - public RelationAggregator(int maxRelations) { - this.maxRelations = maxRelations; - } + private static JavaRDD pruneRels(JavaRDD rels, int maxRelations, + int relPartitions, Function idFn) { + return rels + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2); + } - @Override - public RelationList zero() { - return new RelationList(); - } + // experimental + private static void prepareRelationsDataset( + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { + spark + .read() + .textFile(inputRelationsPath) + .repartition(relPartitions) + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.kryo(Relation.class)) + .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) + .groupByKey( + (MapFunction) Relation::getSource, + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> Iterables + .limit(t._2().getRelations(), maxRelations) + .iterator(), + Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } - @Override - public RelationList reduce(RelationList b, Relation a) { - b.getRelations().add(a); - return getSortableRelationList(b); - } + public static class RelationAggregator + extends Aggregator { - @Override - public RelationList merge(RelationList b1, RelationList b2) { - b1.getRelations().addAll(b2.getRelations()); - return getSortableRelationList(b1); - } + private int maxRelations; - @Override - public RelationList finish(RelationList r) { - return getSortableRelationList(r); - } + public RelationAggregator(int maxRelations) { + this.maxRelations = maxRelations; + } - private RelationList getSortableRelationList(RelationList b1) { - RelationList sr = new RelationList(); - sr - .setRelations( - b1 - .getRelations() - .stream() - .limit(maxRelations) - .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); - return sr; - } + @Override + public RelationList zero() { + return new RelationList(); + } - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(RelationList.class); - } + @Override + public RelationList reduce(RelationList b, Relation a) { + b.getRelations().add(a); + return getSortableRelationList(b); + } - @Override - public Encoder outputEncoder() { - return Encoders.kryo(RelationList.class); - } - } + @Override + public RelationList merge(RelationList b1, RelationList b2) { + b1.getRelations().addAll(b2.getRelations()); + return getSortableRelationList(b1); + } - /** - * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text - * file, - * - * @param spark - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private static JavaRDD readPathRelationRDD( - SparkSession spark, final String inputPath) { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); - } + @Override + public RelationList finish(RelationList r) { + return getSortableRelationList(r); + } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } + private RelationList getSortableRelationList(RelationList b1) { + RelationList sr = new RelationList(); + sr + .setRelations( + b1 + .getRelations() + .stream() + .limit(maxRelations) + .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); + return sr; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(RelationList.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(RelationList.class); + } + } + + /** + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * file, + * + * @param spark + * @param inputPath + * @return the JavaRDD containing all the relationships + */ + private static JavaRDD readPathRelationRDD( + SparkSession spark, final String inputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index bf7f9330d..bd7b4d78e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable, Ser private static final Map weights = Maps.newHashMap(); static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("review", 2); - weights.put("citation", 3); - weights.put("affiliation", 4); - weights.put("relationship", 5); - weights.put("publicationDataset", 6); - weights.put("similarity", 7); + weights.put("participation", 0); - weights.put("provision", 8); - weights.put("participation", 9); - weights.put("dedup", 10); + weights.put("outcome", 1); + weights.put("affiliation", 2); + weights.put("dedup", 3); + weights.put("publicationDataset", 4); + weights.put("citation", 5); + weights.put("supplement", 6); + weights.put("review", 7); + weights.put("relationship", 8); + weights.put("provision", 9); + weights.put("similarity", 10); } private static final long serialVersionUID = 3232323; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json index 71b2becc4..33fa1dc8d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json @@ -30,9 +30,16 @@ "paramRequired": false }, { - "paramName": "mr", - "paramLongName": "maxRelations", - "paramDescription": "maximum number of relations allowed for a each entity", + "paramName": "smr", + "paramLongName": "sourceMaxRelations", + "paramDescription": "maximum number of relations allowed for a each entity grouping by source", + "paramRequired": false + }, + { + "paramName": "tmr", + "paramLongName": "targetMaxRelations", + "paramDescription": "maximum number of relations allowed for a each entity grouping by target", "paramRequired": false } + ] From e1ae964bc462e532cd6e61369899677941dc2a4a Mon Sep 17 00:00:00 2001 From: "michele.artini" Date: Fri, 10 Jul 2020 16:12:08 +0200 Subject: [PATCH 5/8] stats --- .../dhp/broker/oa/GenerateStatsJob.java | 63 +++++++++++++++++++ .../dnetlib/dhp/broker/oa/JoinStep2Job.java | 5 +- .../aggregators/stats/DatasourceStats.java | 61 ++++++++++++++++++ .../aggregators/stats/StatsAggregator.java | 59 +++++++++++++++++ .../broker/oa/partial/oozie_app/workflow.xml | 8 +-- .../dhp/oa/dedup/EntityMergerTest.java | 5 +- .../oa/graph/clean/CleanGraphSparkJob.java | 5 +- 7 files changed, 194 insertions(+), 12 deletions(-) create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java create mode 100644 dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java new file mode 100644 index 000000000..a51601cd7 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateStatsJob.java @@ -0,0 +1,63 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.TypedColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats; +import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator; + +public class GenerateStatsJob { + + private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + IndexOnESJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/common_params.json"))); + parser.parseArgument(args); + + final Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("workingPath") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String statsPath = parser.get("workingPath") + "/stats"; + log.info("stats: {}", statsPath); + + final TypedColumn aggr = new StatsAggregator().toColumn(); + + runWithSparkSession(conf, isSparkSessionManaged, spark -> { + + final Dataset stats = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .groupByKey(e -> e.getMap().getTargetDatasourceId(), Encoders.STRING()) + .agg(aggr) + .map(t -> t._2, Encoders.bean(DatasourceStats.class)); + + ClusterUtils.save(stats, statsPath, DatasourceStats.class, null); + }); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java index cdcf0add4..55ab497f0 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/JoinStep2Job.java @@ -7,7 +7,6 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.TypedColumn; @@ -65,9 +64,7 @@ public class JoinStep2Job { final Dataset dataset = sources .joinWith(typedRels, sources.col("openaireId").equalTo(typedRels.col("source")), "left_outer") - .groupByKey( - (MapFunction, String>) t -> t._1.getOpenaireId(), - Encoders.STRING()) + .groupByKey(t -> t._1.getOpenaireId(), Encoders.STRING()) .agg(aggr) .map(t -> t._2, Encoders.bean(OaBrokerMainEntity.class)); diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java new file mode 100644 index 000000000..8b628809d --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats.java @@ -0,0 +1,61 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class DatasourceStats implements Serializable { + + /** + * + */ + private static final long serialVersionUID = -282112564184047677L; + + private String id; + private String name; + private String type; + private Map topics = new HashMap<>(); + + public String getId() { + return id; + } + + public void setId(final String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(final String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(final String type) { + this.type = type; + } + + public Map getTopics() { + return topics; + } + + public void setTopics(final Map topics) { + this.topics = topics; + } + + public void incrementTopic(final String topic, final long inc) { + if (topics.containsKey(topic)) { + topics.put(topic, topics.get(topic) + inc); + } else { + topics.put(topic, inc); + } + + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java new file mode 100644 index 000000000..5aa6698e3 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/util/aggregators/stats/StatsAggregator.java @@ -0,0 +1,59 @@ + +package eu.dnetlib.dhp.broker.oa.util.aggregators.stats; + +import org.apache.commons.lang.StringUtils; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.expressions.Aggregator; + +import eu.dnetlib.dhp.broker.model.Event; + +public class StatsAggregator extends Aggregator { + + /** + * + */ + private static final long serialVersionUID = 6652105853037330529L; + + @Override + public DatasourceStats zero() { + return new DatasourceStats(); + } + + @Override + public DatasourceStats reduce(final DatasourceStats stats, final Event e) { + stats.setId(e.getMap().getTargetDatasourceId()); + stats.setName(e.getMap().getTargetDatasourceName()); + stats.setType(e.getMap().getTargetDatasourceType()); + stats.incrementTopic(e.getTopic(), 1l); + return stats; + } + + @Override + public DatasourceStats merge(final DatasourceStats stats0, final DatasourceStats stats1) { + if (StringUtils.isBlank(stats0.getId())) { + stats0.setId(stats1.getId()); + stats0.setName(stats1.getName()); + stats0.setType(stats1.getType()); + } + stats1.getTopics().entrySet().forEach(e -> stats0.incrementTopic(e.getKey(), e.getValue())); + return stats0; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.bean(DatasourceStats.class); + + } + + @Override + public DatasourceStats finish(final DatasourceStats stats) { + return stats; + } + + @Override + public Encoder outputEncoder() { + return Encoders.bean(DatasourceStats.class); + + } +} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml index d19ad6c5a..b4155f93f 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/partial/oozie_app/workflow.xml @@ -64,19 +64,19 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + yarn cluster - Count - eu.dnetlib.dhp.broker.oa.CheckDuplictedIdsJob + GenerateStatsJob + eu.dnetlib.dhp.broker.oa.GenerateStatsJob dhp-broker-events-${projectVersion}.jar --executor-cores=${sparkExecutorCores} diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java index 513e14f07..3d45f666b 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/EntityMergerTest.java @@ -47,10 +47,11 @@ public class EntityMergerTest implements Serializable { @Test public void softwareMergerTest() throws InstantiationException, IllegalAccessException { - List> softwares = readSample(testEntityBasePath + "/software_merge.json", Software.class); + List> softwares = readSample( + testEntityBasePath + "/software_merge.json", Software.class); Software merged = DedupRecordFactory - .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class); + .entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class); System.out.println(merged.getBestaccessright().getClassid()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index fd707e949..7091d9740 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -8,7 +8,6 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -24,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.graph.raw.AbstractMdRecordToOafMapper; import eu.dnetlib.dhp.oa.graph.raw.common.OafMapperUtils; import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; @@ -151,7 +151,8 @@ public class CleanGraphSparkJob { if (Objects.isNull(r.getBestaccessright()) || StringUtils.isBlank(r.getBestaccessright().getClassid())) { Qualifier bestaccessrights = AbstractMdRecordToOafMapper.createBestAccessRights(r.getInstance()); if (Objects.isNull(bestaccessrights)) { - r.setBestaccessright( + r + .setBestaccessright( qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES)); } else { r.setBestaccessright(bestaccessrights); From 4c3836f62e3358d173163aef8de11ead52fcc707 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 19:00:44 +0200 Subject: [PATCH 6/8] materialize the related entities before joining them --- .../CreateRelatedEntitiesJob_phase1.java | 25 ++++++++++++++++--- .../CreateRelatedEntitiesJob_phase2.java | 17 +++++-------- .../dhp/oa/provision/ProvisionConstants.java | 14 +++++++++++ 3 files changed, 42 insertions(+), 14 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 80b800017..57dca7bb1 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -9,6 +9,7 @@ import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) + final String relatedEntityPath = outputPath + "_relatedEntity"; + readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( (MapFunction) value -> asRelatedEntity(value, clazz), Encoders.kryo(RelatedEntity.class)) + .repartition(5000) + .write() + .mode(SaveMode.Overwrite) + .parquet(relatedEntityPath); + + Dataset> entities = spark + .read() + .load(relatedEntityPath) + .as(Encoders.kryo(RelatedEntity.class)) .map( (MapFunction>) e -> new Tuple2<>(e.getId(), e), Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) @@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 { Result result = (Result) entity; if (result.getTitle() != null && !result.getTitle().isEmpty()) { - re.setTitle(result.getTitle().stream().findFirst().get()); + final StructuredProperty title = result.getTitle().stream().findFirst().get(); + title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH)); + re.setTitle(title); } re.setDateofacceptance(getValue(result.getDateofacceptance())); re.setPublisher(getValue(result.getPublisher())); re.setResulttype(result.getResulttype()); - re.setInstances(result.getInstance()); + re + .setInstances( + result + .getInstance() + .stream() + .limit(ProvisionConstants.MAX_INSTANCES) + .collect(Collectors.toList())); // TODO still to be mapped // re.setCodeRepositoryUrl(j.read("$.coderepositoryurl")); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index bfcc648a3..7e175121e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final int MAX_EXTERNAL_ENTITIES = 50; - private static final int MAX_AUTHORS = 200; - private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; - private static final int MAX_TITLE_LENGTH = 5000; - private static final int MAX_ABSTRACT_LENGTH = 100000; - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 { List refs = r .getExternalReference() .stream() - .limit(MAX_EXTERNAL_ENTITIES) + .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES) .collect(Collectors.toList()); r.setExternalReference(refs); } if (r.getAuthor() != null) { List authors = Lists.newArrayList(); for (Author a : r.getAuthor()) { - a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH)); - if (authors.size() < MAX_AUTHORS || hasORCID(a)) { + a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH)); + if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) { authors.add(a); } } @@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 { .stream() .filter(Objects::nonNull) .map(d -> { - d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH)); + d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH)); return d; }) .collect(Collectors.toList()); @@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 { .stream() .filter(Objects::nonNull) .map(t -> { - t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH)); + t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH)); return t; }) + .limit(ProvisionConstants.MAX_TITLES) .collect(Collectors.toList()); r.setTitle(titles); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java new file mode 100644 index 000000000..9bc3706cd --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java @@ -0,0 +1,14 @@ + +package eu.dnetlib.dhp.oa.provision; + +public class ProvisionConstants { + + public static final int MAX_EXTERNAL_ENTITIES = 50; + public static final int MAX_AUTHORS = 200; + public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000; + public static final int MAX_TITLE_LENGTH = 5000; + public static final int MAX_TITLES = 10; + public static final int MAX_ABSTRACT_LENGTH = 100000; + public static final int MAX_INSTANCES = 10; + +} From cc77446dc4d092503bff388994a005e17e61dcfc Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 19:01:50 +0200 Subject: [PATCH 7/8] added dbSchema parameter to the raw_db workflow --- .../eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml index 575f9229e..eea8d0a5a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml @@ -16,6 +16,11 @@ postgresPassword the password postgres + + dbSchema + beta + the database schema according to the D-Net infrastructure (beta or production) + isLookupUrl the address of the lookUp service @@ -93,6 +98,7 @@ --postgresUser${postgresUser} --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} + --dbschema${dbSchema} @@ -109,6 +115,7 @@ --postgresUser${postgresUser} --postgresPassword${postgresPassword} --isLookupUrl${isLookupUrl} + --dbschema${dbSchema} --actionclaims From 06c1913062541062b4b5ed49044870acc1d3bd56 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 19:03:33 +0200 Subject: [PATCH 8/8] added different limits for grouping by source and by target, incremented spark.sql.shuffle.partitions for the join operations --- .../dhp/oa/provision/oozie_app/workflow.xml | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index faa81ad64..32bf7ce83 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -18,8 +18,12 @@ filter applied reading relations (by relClass) - maxRelations - maximum number of relations allowed for a each entity + sourceMaxRelations + maximum number of relations allowed for a each entity grouping by source + + + targetMaxRelations + maximum number of relations allowed for a each entity grouping by target otherDsTypeId @@ -133,7 +137,8 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation - --maxRelations${maxRelations} + --sourceMaxRelations${sourceMaxRelations} + --targetMaxRelations${targetMaxRelations} --relationFilter${relationFilter} --relPartitions5000 @@ -166,7 +171,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -193,7 +198,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=15000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -220,7 +225,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=10000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -247,7 +252,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -274,7 +279,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -301,7 +306,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -328,7 +333,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputRelationsPath${workingDir}/relation @@ -367,7 +372,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 + --conf spark.sql.shuffle.partitions=15000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/publication @@ -395,7 +400,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=10000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/dataset @@ -423,7 +428,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=10000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/otherresearchproduct @@ -451,7 +456,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/software @@ -479,7 +484,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=8000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/datasource @@ -507,7 +512,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=10000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/organization @@ -535,7 +540,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 + --conf spark.sql.shuffle.partitions=5000 --conf spark.network.timeout=${sparkNetworkTimeout} --inputEntityPath${inputGraphRootPath}/project @@ -607,5 +612,4 @@ - \ No newline at end of file