diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000000..0b5dbe0b49 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,21 @@ +style = defaultWithAlign + +align.openParenCallSite = false +align.openParenDefnSite = false +align.tokens = [{code = "->"}, {code = "<-"}, {code = "=>", owner = "Case"}] +continuationIndent.callSite = 2 +continuationIndent.defnSite = 2 +danglingParentheses = true +indentOperator = spray +maxColumn = 120 +newlines.alwaysBeforeTopLevelStatements = true +project.excludeFilters = [".*\\.sbt"] +rewrite.rules = [AvoidInfix] +rewrite.rules = [ExpandImportSelectors] +rewrite.rules = [RedundantBraces] +rewrite.rules = [RedundantParens] +rewrite.rules = [SortImports] +rewrite.rules = [SortModifiers] +rewrite.rules = [PreferCurlyFors] +spaces.inImportCurlyBraces = false +unindentTopLevelOperators = true \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/.scalafmt.conf b/dhp-workflows/dhp-aggregation/.scalafmt.conf new file mode 100644 index 0000000000..0b5dbe0b49 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/.scalafmt.conf @@ -0,0 +1,21 @@ +style = defaultWithAlign + +align.openParenCallSite = false +align.openParenDefnSite = false +align.tokens = [{code = "->"}, {code = "<-"}, {code = "=>", owner = "Case"}] +continuationIndent.callSite = 2 +continuationIndent.defnSite = 2 +danglingParentheses = true +indentOperator = spray +maxColumn = 120 +newlines.alwaysBeforeTopLevelStatements = true +project.excludeFilters = [".*\\.sbt"] +rewrite.rules = [AvoidInfix] +rewrite.rules = [ExpandImportSelectors] +rewrite.rules = [RedundantBraces] +rewrite.rules = [RedundantParens] +rewrite.rules = [SortImports] +rewrite.rules = [SortModifiers] +rewrite.rules = [PreferCurlyFors] +spaces.inImportCurlyBraces = false +unindentTopLevelOperators = true \ No newline at end of file diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java new file mode 100644 index 0000000000..6b8d60f404 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJob.java @@ -0,0 +1,192 @@ + +package eu.dnetlib.dhp.broker.oa; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.util.LongAccumulator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Notification; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +public class GenerateNotificationsJob { + + private static final Logger log = LoggerFactory.getLogger(GenerateNotificationsJob.class); + + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateNotificationsJob.class + .getResourceAsStream("/eu/dnetlib/dhp/broker/oa/generate_notifications.json"))); + parser.parseArgument(args); + + final SparkConf conf = new SparkConf(); + + final String eventsPath = parser.get("outputDir") + "/events"; + log.info("eventsPath: {}", eventsPath); + + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); + + final String brokerApiBaseUrl = parser.get("brokerApiBaseUrl"); + log.info("brokerApiBaseUrl: {}", brokerApiBaseUrl); + + final SparkSession spark = SparkSession.builder().config(conf).getOrCreate(); + + final LongAccumulator total = spark.sparkContext().longAccumulator("total_notifications"); + + final long startTime = new Date().getTime(); + + final List subscriptions = listSubscriptions(brokerApiBaseUrl); + + log.info("Number of subscriptions: " + subscriptions.size()); + + if (subscriptions.size() > 0) { + final Map>> conditionsMap = prepareConditionsMap(subscriptions); + + log.info("ConditionsMap: " + new ObjectMapper().writeValueAsString(conditionsMap)); + + final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); + final Encoder nEncoder = Encoders.bean(Notification.class); + final Dataset notifications = ClusterUtils + .readPath(spark, eventsPath, Event.class) + .map( + (MapFunction) e -> generateNotifications( + e, subscriptions, conditionsMap, startTime), + ngEncoder) + .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + + ClusterUtils.save(notifications, notificationsPath, Notification.class, total); + } + } + + protected static Map>> prepareConditionsMap( + final List subscriptions) { + final Map>> map = new HashMap<>(); + subscriptions.forEach(s -> map.put(s.getSubscriptionId(), s.conditionsAsMap())); + return map; + } + + protected static NotificationGroup generateNotifications(final Event e, + final List subscriptions, + final Map>> conditionsMap, + final long date) { + final List list = subscriptions + .stream() + .filter( + s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) + .filter(s -> verifyConditions(e.getMap(), conditionsMap.get(s.getSubscriptionId()))) + .map(s -> generateNotification(s, e, date)) + .collect(Collectors.toList()); + + return new NotificationGroup(list); + } + + private static Notification generateNotification(final Subscription s, final Event e, final long date) { + final Notification n = new Notification(); + n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); + n.setSubscriptionId(s.getSubscriptionId()); + n.setEventId(e.getEventId()); + n.setProducerId(e.getProducerId()); + n.setTopic(e.getTopic()); + n.setPayload(e.getPayload()); + n.setMap(e.getMap()); + n.setDate(date); + return n; + } + + private static boolean verifyConditions(final MappedFields map, + final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + + if (conditions.containsKey("trust") + && !SubscriptionUtils + .verifyFloatRange( + map.getTrust(), conditions.get("trust").get(0).getValue(), + conditions.get("trust").get(0).getOtherValue())) { + return false; + } + + if (conditions.containsKey("targetDateofacceptance") && !conditions + .get("targetDateofacceptance") + .stream() + .anyMatch( + c -> SubscriptionUtils + .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { + return false; + } + + if (conditions.containsKey("targetResultTitle") + && !conditions + .get("targetResultTitle") + .stream() + .anyMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { + return false; + } + + if (conditions.containsKey("targetAuthors") + && !conditions + .get("targetAuthors") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { + return false; + } + + return !conditions.containsKey("targetSubjects") + || conditions + .get("targetSubjects") + .stream() + .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); + + } + + private static List listSubscriptions(final String brokerApiBaseUrl) throws Exception { + final String url = brokerApiBaseUrl + "/api/subscriptions"; + final HttpGet req = new HttpGet(url); + + final ObjectMapper mapper = new ObjectMapper(); + + try (final CloseableHttpClient client = HttpClients.createDefault()) { + try (final CloseableHttpResponse response = client.execute(req)) { + final String s = IOUtils.toString(response.getEntity().getContent()); + return mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); + } + } + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java index e8ef5dd3e9..a2aa300921 100644 --- a/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java +++ b/dhp-workflows/dhp-broker-events/src/main/java/eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.java @@ -2,15 +2,10 @@ package eu.dnetlib.dhp.broker.oa; import java.io.IOException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; @@ -18,10 +13,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; @@ -33,10 +25,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.broker.model.*; +import eu.dnetlib.dhp.broker.model.Notification; import eu.dnetlib.dhp.broker.oa.util.ClusterUtils; -import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; -import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; public class IndexNotificationsJob { @@ -53,8 +43,8 @@ public class IndexNotificationsJob { final SparkConf conf = new SparkConf(); - final String eventsPath = parser.get("outputDir") + "/events"; - log.info("eventsPath: {}", eventsPath); + final String notificationsPath = parser.get("outputDir") + "/notifications"; + log.info("notificationsPath: {}", notificationsPath); final String index = parser.get("index"); log.info("index: {}", index); @@ -81,143 +71,41 @@ public class IndexNotificationsJob { final LongAccumulator total = spark.sparkContext().longAccumulator("total_indexed"); - final long startTime = new Date().getTime(); + final Long date = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .first() + .getDate(); - final List subscriptions = listSubscriptions(brokerApiBaseUrl); + final JavaRDD toIndexRdd = ClusterUtils + .readPath(spark, notificationsPath, Notification.class) + .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) + .javaRDD(); - log.info("Number of subscriptions: {}", subscriptions.size()); + final Map esCfg = new HashMap<>(); - if (!subscriptions.isEmpty()) { - final Encoder ngEncoder = Encoders.bean(NotificationGroup.class); - final Encoder nEncoder = Encoders.bean(Notification.class); - final Dataset notifications = ClusterUtils - .readPath(spark, eventsPath, Event.class) - .map( - (MapFunction) e -> generateNotifications(e, subscriptions, startTime), - ngEncoder) - .flatMap((FlatMapFunction) g -> g.getData().iterator(), nEncoder); + esCfg.put("es.index.auto.create", "false"); + esCfg.put("es.nodes", indexHost); + esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY + esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); + esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); + esCfg.put("es.batch.size.entries", esBatchSizeEntries); + esCfg.put("es.nodes.wan.only", esNodesWanOnly); - final JavaRDD inputRdd = notifications - .map((MapFunction) n -> prepareForIndexing(n, total), Encoders.STRING()) - .javaRDD(); + log.info("*** Start indexing"); + JavaEsSpark.saveJsonToEs(toIndexRdd, index, esCfg); + log.info("*** End indexing"); - final Map esCfg = new HashMap<>(); + log.info("*** Deleting old notifications"); + final String message = deleteOldNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** Deleted notifications: {}", message); - esCfg.put("es.index.auto.create", "false"); - esCfg.put("es.nodes", indexHost); - esCfg.put("es.mapping.id", "notificationId"); // THE PRIMARY KEY - esCfg.put("es.batch.write.retry.count", esBatchWriteRetryCount); - esCfg.put("es.batch.write.retry.wait", esBatchWriteRetryWait); - esCfg.put("es.batch.size.entries", esBatchSizeEntries); - esCfg.put("es.nodes.wan.only", esNodesWanOnly); - - log.info("*** Start indexing"); - JavaEsSpark.saveJsonToEs(inputRdd, index, esCfg); - log.info("*** End indexing"); - - log.info("*** Deleting old notifications"); - final String message = deleteOldNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** Deleted notifications: {}", message); - - log.info("*** sendNotifications (emails, ...)"); - sendNotifications(brokerApiBaseUrl, startTime - 1000); - log.info("*** ALL done."); - - } - } - - private static NotificationGroup generateNotifications(final Event e, - final List subscriptions, - final long date) { - final List list = subscriptions - .stream() - .filter( - s -> StringUtils.isBlank(s.getTopic()) || s.getTopic().equals("*") || s.getTopic().equals(e.getTopic())) - .filter(s -> verifyConditions(e.getMap(), s.conditionsAsMap())) - .map(s -> generateNotification(s, e, date)) - .collect(Collectors.toList()); - - return new NotificationGroup(list); - } - - private static Notification generateNotification(final Subscription s, final Event e, final long date) { - final Notification n = new Notification(); - n.setNotificationId("ntf-" + DigestUtils.md5Hex(s.getSubscriptionId() + "@@@" + e.getEventId())); - n.setSubscriptionId(s.getSubscriptionId()); - n.setEventId(e.getEventId()); - n.setProducerId(e.getProducerId()); - n.setTopic(e.getTopic()); - n.setPayload(e.getPayload()); - n.setMap(e.getMap()); - n.setDate(date); - return n; - } - - private static boolean verifyConditions(final MappedFields map, - final Map> conditions) { - if (conditions.containsKey("targetDatasourceName") - && !SubscriptionUtils - .verifyExact(map.getTargetDatasourceName(), conditions.get("targetDatasourceName").get(0).getValue())) { - return false; - } - - if (conditions.containsKey("trust") - && !SubscriptionUtils - .verifyFloatRange( - map.getTrust(), conditions.get("trust").get(0).getValue(), - conditions.get("trust").get(0).getOtherValue())) { - return false; - } - - if (conditions.containsKey("targetDateofacceptance") && conditions - .get("targetDateofacceptance") - .stream() - .noneMatch( - c -> SubscriptionUtils - .verifyDateRange(map.getTargetDateofacceptance(), c.getValue(), c.getOtherValue()))) { - return false; - } - - if (conditions.containsKey("targetResultTitle") - && conditions - .get("targetResultTitle") - .stream() - .noneMatch(c -> SubscriptionUtils.verifySimilar(map.getTargetResultTitle(), c.getValue()))) { - return false; - } - - if (conditions.containsKey("targetAuthors") - && conditions - .get("targetAuthors") - .stream() - .noneMatch(c -> SubscriptionUtils.verifyListSimilar(map.getTargetAuthors(), c.getValue()))) { - return false; - } - - return !conditions.containsKey("targetSubjects") - || conditions - .get("targetSubjects") - .stream() - .allMatch(c -> SubscriptionUtils.verifyListExact(map.getTargetSubjects(), c.getValue())); + log.info("*** sendNotifications (emails, ...)"); + sendNotifications(brokerApiBaseUrl, date - 1000); + log.info("*** ALL done."); } - private static List listSubscriptions(final String brokerApiBaseUrl) throws IOException { - final String url = brokerApiBaseUrl + "/api/subscriptions"; - final HttpGet req = new HttpGet(url); - - final ObjectMapper mapper = new ObjectMapper(); - - try (final CloseableHttpClient client = HttpClients.createDefault()) { - try (final CloseableHttpResponse response = client.execute(req)) { - final String s = IOUtils.toString(response.getEntity().getContent()); - return mapper - .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, Subscription.class)); - } - } - } - - private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws IOException { + private static String deleteOldNotifications(final String brokerApiBaseUrl, final long l) throws Exception { final String url = brokerApiBaseUrl + "/api/notifications/byDate/0/" + l; final HttpDelete req = new HttpDelete(url); diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml index ea80c3acf6..bc6778f52c 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_all/oozie_app/workflow.xml @@ -115,6 +115,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -498,7 +503,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -542,6 +547,30 @@ --dbPassword${brokerDbPassword} --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + @@ -556,7 +585,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json new file mode 100644 index 0000000000..6e12783b91 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/generate_notifications.json @@ -0,0 +1,14 @@ +[ + { + "paramName": "o", + "paramLongName": "outputDir", + "paramDescription": "the dir that contains the events folder", + "paramRequired": true + }, + { + "paramName": "broker", + "paramLongName": "brokerApiBaseUrl", + "paramDescription": "the url of the broker service api", + "paramRequired": true + } +] diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml index 248326d57e..0d226d78e1 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/notifications_only/oozie_app/workflow.xml @@ -98,6 +98,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -119,12 +124,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + GenerateNotificationsJob + eu.dnetlib.dhp.broker.oa.GenerateNotificationsJob + dhp-broker-events-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=3840 + + --outputDir${outputDir} + --brokerApiBaseUrl${brokerApiBaseUrl} + + + + + yarn @@ -135,7 +164,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml index 9095004ad5..87adfffaa5 100644 --- a/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-broker-events/src/main/resources/eu/dnetlib/dhp/broker/oa/reindex/oozie_app/workflow.xml @@ -75,6 +75,11 @@ spark2EventLogDir spark 2.* event log dir location + + sparkMaxExecutorsForIndexing + 8 + Max number of workers for ElasticSearch indexing + @@ -112,7 +117,7 @@ --executor-memory=${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} - --conf spark.dynamicAllocation.maxExecutors="8" + --conf spark.dynamicAllocation.maxExecutors=${sparkMaxExecutorsForIndexing} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJobTest.java new file mode 100644 index 0000000000..233963e2fa --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/GenerateNotificationsJobTest.java @@ -0,0 +1,133 @@ + +package eu.dnetlib.dhp.broker.oa; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.Event; +import eu.dnetlib.dhp.broker.model.MappedFields; +import eu.dnetlib.dhp.broker.model.Subscription; +import eu.dnetlib.dhp.broker.oa.util.NotificationGroup; + +class GenerateNotificationsJobTest { + + private List subscriptions; + + private Map>> conditionsMap; + + private static final int N_TIMES = 1_000_000; + + @BeforeEach + void setUp() throws Exception { + final Subscription s = new Subscription(); + s.setTopic("ENRICH/MISSING/PID"); + s + .setConditions( + "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"); + subscriptions = Arrays.asList(s); + conditionsMap = GenerateNotificationsJob.prepareConditionsMap(subscriptions); + } + + @Test + void testGenerateNotifications_invalid_topic() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + final NotificationGroup res = GenerateNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(0, res.getData().size()); + } + + @Test + void testGenerateNotifications_topic_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + final NotificationGroup res = GenerateNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(1, res.getData().size()); + } + + @Test + void testGenerateNotifications_topic_no_match() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + final NotificationGroup res = GenerateNotificationsJob + .generateNotifications(event, subscriptions, conditionsMap, 0); + assertEquals(0, res.getData().size()); + } + + @Test + void testGenerateNotifications_invalid_topic_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PROJECT"); + + // warm up + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println(String.format("no topic - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + + } + + @Test + void testGenerateNotifications_topic_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("reposiTUm"); + event.getMap().setTrust(0.8f); + + // warm up + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println(String.format("topic match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + } + + @Test + void testGenerateNotifications_topic_no_match_repeated() { + final Event event = new Event(); + event.setTopic("ENRICH/MISSING/PID"); + event.setMap(new MappedFields()); + event.getMap().setTargetDatasourceName("Puma"); + event.getMap().setTrust(0.8f); + + // warm up + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + + final long start = System.currentTimeMillis(); + for (int i = 0; i < N_TIMES; i++) { + GenerateNotificationsJob.generateNotifications(event, subscriptions, conditionsMap, 0); + } + final long end = System.currentTimeMillis(); + System.out + .println( + String.format("topic no match - repeated %s times - execution time: %s ms ", N_TIMES, end - start)); + } + +} diff --git a/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java new file mode 100644 index 0000000000..a6d1c89d35 --- /dev/null +++ b/dhp-workflows/dhp-broker-events/src/test/java/eu/dnetlib/dhp/broker/oa/samples/SimpleVariableJobTest.java @@ -0,0 +1,132 @@ + +package eu.dnetlib.dhp.broker.oa.samples; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.broker.model.ConditionParams; +import eu.dnetlib.dhp.broker.model.MapCondition; +import eu.dnetlib.dhp.broker.oa.util.SubscriptionUtils; + +@Disabled +public class SimpleVariableJobTest { + + private static final Logger log = LoggerFactory.getLogger(SimpleVariableJobTest.class); + + private static Path workingDir; + + private static SparkSession spark; + + private final static List inputList = new ArrayList<>(); + + private static final Map>> staticMap = new HashMap<>(); + + @BeforeAll + public static void beforeAll() throws IOException { + + workingDir = Files.createTempDirectory(SimpleVariableJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + final SparkConf conf = new SparkConf(); + conf.setAppName(SimpleVariableJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + // conf.set("spark.sql.warehouse.dir", workingDir.toString()); + // conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(SimpleVariableJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + + for (int i = 0; i < 1_000_000; i++) { + inputList.add("record " + i); + } + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testSimpleVariableJob() throws Exception { + final Map>> map = fillMap(); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(map.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + @Test + public void testSimpleVariableJob_static() throws Exception { + + staticMap.putAll(fillMap()); + + final long n = spark + .createDataset(inputList, Encoders.STRING()) + .filter(s -> filter(staticMap.get(s))) + .map((MapFunction) s -> s.toLowerCase(), Encoders.STRING()) + .count(); + + System.out.println(n); + } + + private static Map>> fillMap() + throws JsonParseException, JsonMappingException, IOException { + final String s = "[{\"field\":\"targetDatasourceName\",\"fieldType\":\"STRING\",\"operator\":\"EXACT\",\"listParams\":[{\"value\":\"reposiTUm\"}]},{\"field\":\"trust\",\"fieldType\":\"FLOAT\",\"operator\":\"RANGE\",\"listParams\":[{\"value\":\"0\",\"otherValue\":\"1\"}]}]"; + + final ObjectMapper mapper = new ObjectMapper(); + final List list = mapper + .readValue(s, mapper.getTypeFactory().constructCollectionType(List.class, MapCondition.class)); + final Map> conditions = list + .stream() + .filter(mc -> !mc.getListParams().isEmpty()) + .collect(Collectors.toMap(MapCondition::getField, MapCondition::getListParams)); + + final Map>> map = new HashMap<>(); + inputList.forEach(i -> map.put(i, conditions)); + return map; + } + + private static boolean filter(final Map> conditions) { + if (conditions.containsKey("targetDatasourceName") + && !SubscriptionUtils + .verifyExact("reposiTUm", conditions.get("targetDatasourceName").get(0).getValue())) { + return false; + } + return true; + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/.scalafmt.conf b/dhp-workflows/dhp-graph-mapper/.scalafmt.conf new file mode 100644 index 0000000000..0b5dbe0b49 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/.scalafmt.conf @@ -0,0 +1,21 @@ +style = defaultWithAlign + +align.openParenCallSite = false +align.openParenDefnSite = false +align.tokens = [{code = "->"}, {code = "<-"}, {code = "=>", owner = "Case"}] +continuationIndent.callSite = 2 +continuationIndent.defnSite = 2 +danglingParentheses = true +indentOperator = spray +maxColumn = 120 +newlines.alwaysBeforeTopLevelStatements = true +project.excludeFilters = [".*\\.sbt"] +rewrite.rules = [AvoidInfix] +rewrite.rules = [ExpandImportSelectors] +rewrite.rules = [RedundantBraces] +rewrite.rules = [RedundantParens] +rewrite.rules = [SortImports] +rewrite.rules = [SortModifiers] +rewrite.rules = [PreferCurlyFors] +spaces.inImportCurlyBraces = false +unindentTopLevelOperators = true \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index a33a45517a..48e5945c00 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -52,8 +52,11 @@ public class CreateRelatedEntitiesJob_phase1 { final String jsonConfiguration = IOUtils .toString( - PrepareRelationsJob.class - .getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json")); + Objects + .requireNonNull( + CreateRelatedEntitiesJob_phase1.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase1.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -75,6 +78,7 @@ public class CreateRelatedEntitiesJob_phase1 { final String graphTableClassName = parser.get("graphTableClassName"); log.info("graphTableClassName: {}", graphTableClassName); + @SuppressWarnings("unchecked") final Class entityClazz = (Class) Class.forName(graphTableClassName); final SparkConf conf = new SparkConf(); @@ -101,22 +105,12 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); - readPathEntity(spark, inputEntityPath, clazz) + final Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) .filter("dataInfo.invisible == false") .map( (MapFunction>) e -> new Tuple2<>(e.getId(), asRelatedEntity(e, clazz)), - Encoders - .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) - .write() - .mode(SaveMode.Overwrite) - .save("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()); - - final Dataset> entities = spark - .read() - .load("/tmp/beta_provision/working_dir/update_solr/join_partial/relatedEntities/" + clazz.getSimpleName()) - .as( - Encoders - .tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))); + Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class))) + .cache(); relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") @@ -149,8 +143,10 @@ public class CreateRelatedEntitiesJob_phase1 { re.setId(entity.getId()); re.setType(EntityType.fromClass(clazz).name()); - if (entity.getPid() != null) + // TODO move the max number of PIDs to eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits + if (Objects.nonNull(entity.getPid())) { re.setPid(entity.getPid().stream().limit(400).collect(Collectors.toList())); + } re.setCollectedfrom(entity.getCollectedfrom()); switch (EntityType.fromClass(clazz)) { @@ -212,7 +208,7 @@ public class CreateRelatedEntitiesJob_phase1 { final List> f = p.getFundingtree(); if (!f.isEmpty()) { - re.setFundingtree(f.stream().map(s -> s.getValue()).collect(Collectors.toList())); + re.setFundingtree(f.stream().map(Field::getValue).collect(Collectors.toList())); } break; } @@ -227,15 +223,16 @@ public class CreateRelatedEntitiesJob_phase1 { return Optional .ofNullable(f) .filter(Objects::nonNull) - .map(x -> x.getValue()) + .map(Field::getValue) .orElse(defaultValue); } /** - * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text file, + * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * file * - * @param spark - * @param relationPath + * @param spark the SparkSession + * @param relationPath the path storing the relation objects * @return the Dataset containing all the relationships */ private static Dataset readPathRelation( diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java index 1df980643e..2142737f03 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/XmlRecordFactoryTest.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.io.StringReader; import java.util.List; +import eu.dnetlib.dhp.schema.oaf.Datasource; import org.apache.commons.io.IOUtils; import org.dom4j.Document; import org.dom4j.DocumentException; @@ -139,4 +140,33 @@ public class XmlRecordFactoryTest { System.out.println(doc.asXML()); assertEquals("", doc.valueOf("//rel/validated")); } + + @Test + public void testDatasource() throws IOException, DocumentException { + final ContextMapper contextMapper = new ContextMapper(); + + final XmlRecordFactory xmlRecordFactory = new XmlRecordFactory(contextMapper, false, + XmlConverterJob.schemaLocation); + + final Datasource d = OBJECT_MAPPER + .readValue(IOUtils.toString(getClass().getResourceAsStream("datasource.json")), Datasource.class); + + final String xml = xmlRecordFactory.build(new JoinedEntity<>(d)); + + assertNotNull(xml); + + final Document doc = new SAXReader().read(new StringReader(xml)); + + assertNotNull(doc); + + System.out.println(doc.asXML()); + + // TODO add assertions based of values extracted from the XML record + + assertEquals("National", doc.valueOf("//jurisdiction/@classname")); + assertEquals("true", doc.valueOf("//thematic")); + assertEquals("Journal article", doc.valueOf("//contentpolicy/@classname")); + assertEquals("Journal archive", doc.valueOf("//datasourcetypeui/@classname")); + + } } diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/datasource.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/datasource.json new file mode 100644 index 0000000000..ae069b8b5f --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/datasource.json @@ -0,0 +1 @@ +{"collectedfrom":[{"key":"10|openaire____::13068d7823ea0bd86516ac2cb66e96ba","value":"Jurnal Fakultas Sastra Universitas Ekasakti","dataInfo":null}],"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}},"lastupdatetimestamp":1645012035118,"id":"10|274269ac6f3b::2a2e2793b500f3f7b47ef24b1a9277b7","originalId":["274269ac6f3b::2579-5449","piwik:13"],"pid":[],"dateofcollection":"2020-01-21","dateoftransformation":null,"extraInfo":[],"oaiprovenance":null,"datasourcetype":{"classid":"pubsrepository::journal","classname":"pubsrepository::journal","schemeid":"dnet:datasource_typologies","schemename":"dnet:datasource_typologies"},"datasourcetypeui":{"classid":"pubsrepository::journal","classname":"Journal archive","schemeid":"dnet:datasource_typologies_ui","schemename":"dnet:datasource_typologies_ui"},"openairecompatibility":{"classid":"hostedBy","classname":"hostedBy","schemeid":"dnet:datasourceCompatibilityLevel","schemename":"dnet:datasourceCompatibilityLevel"},"officialname":{"value":"Jurnal Ilmiah Pendidikan Scholastic","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"englishname":{"value":"Jurnal Ilmiah Pendidikan Scholastic","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"websiteurl":{"value":"http://e-journal.sastra-unes.com/index.php/JIPS/index","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"logourl":null,"contactemail":{"value":"test@test.it","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"namespaceprefix":{"value":"ojs_25795449","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"latitude":{"value":"0.0","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"longitude":{"value":"0.0","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"dateofvalidation":null,"description":null,"subjects":[],"odnumberofitems":{"value":"0.0","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"odnumberofitemsdate":null,"odpolicies":null,"odlanguages":[],"odcontenttypes":[{"value":"Journal articles","dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}}],"accessinfopackage":[],"releasestartdate":null,"releaseenddate":null,"missionstatementurl":null,"dataprovider":{"value":false,"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"serviceprovider":{"value":false,"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"databaseaccesstype":null,"datauploadtype":null,"databaseaccessrestriction":null,"datauploadrestriction":null,"versioning":{"value":false,"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"citationguidelineurl":null,"qualitymanagementkind":null,"pidsystems":null,"certificates":null,"policies":[],"journal":{"name":"Jurnal Ilmiah Pendidikan Scholastic","issnPrinted":"2579-5449","issnOnline":"2579-5448","issnLinking":"2579-5447","ep":null,"iss":null,"sp":null,"vol":null,"edition":null,"conferenceplace":null,"conferencedate":null,"dataInfo":{"invisible":false,"inferred":false,"deletedbyinference":false,"trust":"0.900","inferenceprovenance":null,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"sysimport:crosswalk:entityregistry","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"}}},"providedentitytypes":null,"providedproducttypes":null,"jurisdiction":{"classid":"National","classname":"National","schemeid":"eosc:jurisdictions","schemename":"eosc:jurisdictions"},"thematic":true,"knowledgegraph":true,"contentpolicies":[{"classid":"Journal article","classname":"Journal article","schemeid":"eosc:contentpolicies","schemename":"eosc:contentpolicies"}]}